MapReduce程序读写HBase表
可利用MR程序的并行计算能力读写HBase中的数据。既可以将HBase作为数据源,也可以作为Data Sink,或者两者兼具。
下面的示例演示了通过MR处理后的单词计数结果,保存在HBase中的过程。
【示例】HBase和MapReduce。
1、编写mapper
import java.io.IOException; import java.util.StringTokenizer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class WordCountHBaseMapper extends Mapper
2、编写reducer
import java.io.IOException;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
public class WcHBaseReducer extends TableReducer<Text, IntWritable, ImmutableBytesWritable> {
	public void reduce(Text key, Iterable<IntWritable> values, Context context)  
								throws IOException, InterruptedException {
		int sum = 0;
		for (IntWritable val : values) {			// 遍历求和
			sum += val.get();
		}
		
		Put put = new Put(key.getBytes());    	// put实例化,每一个词存一行
		// 列族为content,列限定符为count,列值为数目
		put.addColumn("content".getBytes(), "count".getBytes(), String.valueOf(sum).getBytes());
		
		context.write(new ImmutableBytesWritable(key.getBytes()), put);		// 输出求和后的
	}
}
 
3、编写driver驱动程序。
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
public class WordCountDriver extends Configured implements Tool {
	static final String TABLE_NAME = "wordcount";		// 表名
	static final String COLUMN_FAMILY = "content";	// 列族
	@Override
	public int run(String[] args) throws Exception {
		if (args.length != 1) {
			System.err.println("语法: WordCountDriver ");
			System.exit(2);
		}
		Configuration conf = HBaseConfiguration.create();
		Connection connection = ConnectionFactory.createConnection(conf);
		// 1、创建表
		TableName tableName = TableName.valueOf(TABLE_NAME);
		Admin admin = connection.getAdmin();
		if (admin.tableExists(tableName)) {
			System.out.println("表已经存在!正在重新创建......");
			admin.disableTable(tableName); // 先禁用表
			admin.deleteTable(tableName); // 再删除表
		}
		HTableDescriptor htd = new HTableDescriptor(tableName);
		HColumnDescriptor tcd = new HColumnDescriptor(COLUMN_FAMILY);
		htd.addFamily(tcd); 		// 创建列族
		admin.createTable(htd); 	// 创建表
		// 2、执行job
		Job job = Job.getInstance(conf, "WordCountHBase");
		job.setJarByClass(WordCountDriver.class);
		// 使用WordCountHbaseMapper类完成Map过程;
		job.setMapperClass(WordCountHBaseMapper.class);
		TableMapReduceUtil.initTableReducerJob(TABLE_NAME, WcHBaseReducer.class, job);
		// 设置了Map过程和Reduce过程的输出类型,其中设置key的输出类型为Text;
		job.setOutputKeyClass(Text.class);
		// 设置了Map过程和Reduce过程的输出类型,其中设置value的输出类型为IntWritable;
		job.setOutputValueClass(IntWritable.class);
		// 设置任务数据的输入路径;
		FileInputFormat.addInputPath(job, new Path(args[0]));
		// 调用job.waitForCompletion(true) 执行任务
		return job.waitForCompletion(true) ? 0 : 1;
	}
	public static void main(String[] args) throws Exception {
		int status = ToolRunner.run(new WordCountDriver(), args);
		System.exit(status);
	}
}
 
4、运行测试。请自行提交作业执行,并查看表中的结果。