`
sunasheng
  • 浏览: 119062 次
  • 性别: Icon_minigender_1
  • 来自: 北京
文章分类
社区版块
存档分类
最新评论

Mapreduce构建Hbase索引

阅读更多

该博客已经完全转移到http://sunhs.me

 

中并增加更多新的技术内容(hadoop为

 

主),欢迎访问!

package test;

import java.io.IOException;
import java.util.HashMap;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.util.GenericOptionsParser;

public class IndexBuilder {
	// 索引表唯一的一列为INDEX:ROW,其中INDEX为列族
	public static final byte[] INDEX_COLUMN = Bytes.toBytes("INDEX");
	public static final byte[] INDEX_QUALIFIER = Bytes.toBytes("ROW");

	public static class Map
			extends
			Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, Writable> {
		private byte[] family;
		// 存储了“列名”到“表名-列名”的映射
		// 前者用于获取某列的值,并作为索引表的键值,后者用于作为表的表名
		private HashMap<byte[], ImmutableBytesWritable> indexes;

		@Override
		protected void map(ImmutableBytesWritable rowKey, Result result,
				Context context) throws IOException, InterruptedException {
			for (java.util.Map.Entry<byte[], ImmutableBytesWritable> index : indexes
					.entrySet()) {
				byte[] qualifier = index.getKey();// 获得列名
				ImmutableBytesWritable tableName = index.getValue();// 索引表的表名
				byte[] value = result.getValue(family, qualifier);// 根据“列族:列名”获得元素的值
				if (value != null) {
					// 以列值作为行键,在列“INDEX:ROW”中插入行键
					Put put = new Put(value);
					put.add(INDEX_COLUMN, INDEX_QUALIFIER, rowKey.get());
					// 在tableName表上执行put操作
					// 使用MultiOutputFormat时,第二个参数必须是Put或者Delete类型
					context.write(tableName, put);
				}
			}
		}

		/**
		 * setup为Mapper中的方法,该方法只在任务初始化时执行一次
		 */
		@Override
		protected void setup(Context context) throws IOException,
				InterruptedException {
			Configuration configuration = context.getConfiguration();
			String tableName = configuration.get("index.tablename");
			String[] fields = configuration.getStrings("index.fields");
			// fields内为需要做索引的列名
			String familyName = configuration.get("index.familyname");
			family = Bytes.toBytes(familyName);

			// 初始化indexes方法
			indexes = new HashMap<byte[], ImmutableBytesWritable>();
			for (String field : fields) {
				// 如果给name做索引,则索引表的名称为“heroes-name”
				indexes.put(Bytes.toBytes(field), new ImmutableBytesWritable(
						Bytes.toBytes(tableName + "-" + field)));
			}
		}
	}

	public static Job configureJob(Configuration conf, String[] args)
			throws IOException {
		String tableName = args[0];
		String columnFamily = args[1];
		System.out.println("****" + tableName);
		// 通过Configuration.set()方法传递参数
		conf.set(TableInputFormat.SCAN,
				TableMapReduceUtil.convertScanToString(new Scan()));
		conf.set(TableInputFormat.INPUT_TABLE, tableName);
		conf.set("index.tablename", tableName);
		conf.set("index.familyname", columnFamily);
		String[] fields = new String[args.length - 2];
		for (int i = 0; i < fields.length; i++) {
			fields[i] = args[i + 2];
		}
		conf.setStrings("index.fields", fields);
		conf.set("index.familyname", "attributes");

		// 配置任务的运行参数
		Job job = new Job(conf, tableName);
		job.setJarByClass(IndexBuilder.class);
		job.setMapperClass(Map.class);
		job.setNumReduceTasks(0);
		job.setInputFormatClass(TableInputFormat.class);
		job.setOutputFormatClass(MultiTableOutputFormat.class);
		return job;
	}

	public static void main(String[] args) throws Exception {
		Configuration conf = HBaseConfiguration.create();
		String[] otherArgs = new GenericOptionsParser(conf, args)
				.getRemainingArgs();
		if (otherArgs.length < 3) {
			System.err.println("Only " + otherArgs.length
					+ " arguments supplied, required: 3");
			System.err
					.println("Usage: IndexBuilder <TABLE_NAME> <COLUMN_FAMILY> <ATTR> [<ATTR> ...]");
			System.exit(-1);
		}
		Job job = configureJob(conf, otherArgs);
		System.exit(job.waitForCompletion(true) ? 0 : 1);
	}
}
// 运行:
// 如果要对heroes中的name和email列构建索引,则运行参数设为heroes info name email

 

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics