该博客已经完全转移到http://sunhs.me
中并增加更多新的技术内容(hadoop为
主),欢迎访问!
package test; import java.io.IOException; import java.util.HashMap; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.util.GenericOptionsParser; public class IndexBuilder { // 索引表唯一的一列为INDEX:ROW,其中INDEX为列族 public static final byte[] INDEX_COLUMN = Bytes.toBytes("INDEX"); public static final byte[] INDEX_QUALIFIER = Bytes.toBytes("ROW"); public static class Map extends Mapper<ImmutableBytesWritable, Result, ImmutableBytesWritable, Writable> { private byte[] family; // 存储了“列名”到“表名-列名”的映射 // 前者用于获取某列的值,并作为索引表的键值,后者用于作为表的表名 private HashMap<byte[], ImmutableBytesWritable> indexes; @Override protected void map(ImmutableBytesWritable rowKey, Result result, Context context) throws IOException, InterruptedException { for (java.util.Map.Entry<byte[], ImmutableBytesWritable> index : indexes .entrySet()) { byte[] qualifier = index.getKey();// 获得列名 ImmutableBytesWritable tableName = index.getValue();// 索引表的表名 byte[] value = result.getValue(family, qualifier);// 根据“列族:列名”获得元素的值 if (value != null) { // 以列值作为行键,在列“INDEX:ROW”中插入行键 Put put = new Put(value); put.add(INDEX_COLUMN, INDEX_QUALIFIER, rowKey.get()); // 在tableName表上执行put操作 // 使用MultiOutputFormat时,第二个参数必须是Put或者Delete类型 context.write(tableName, put); } } } /** * setup为Mapper中的方法,该方法只在任务初始化时执行一次 */ @Override protected void setup(Context context) throws IOException, InterruptedException { Configuration configuration = context.getConfiguration(); String tableName = configuration.get("index.tablename"); String[] fields = configuration.getStrings("index.fields"); // fields内为需要做索引的列名 String familyName = configuration.get("index.familyname"); family = Bytes.toBytes(familyName); // 初始化indexes方法 indexes = new HashMap<byte[], ImmutableBytesWritable>(); for (String field : fields) { // 如果给name做索引,则索引表的名称为“heroes-name” indexes.put(Bytes.toBytes(field), new ImmutableBytesWritable( Bytes.toBytes(tableName + "-" + field))); } } } public static Job configureJob(Configuration conf, String[] args) throws IOException { String tableName = args[0]; String columnFamily = args[1]; System.out.println("****" + tableName); // 通过Configuration.set()方法传递参数 conf.set(TableInputFormat.SCAN, TableMapReduceUtil.convertScanToString(new Scan())); conf.set(TableInputFormat.INPUT_TABLE, tableName); conf.set("index.tablename", tableName); conf.set("index.familyname", columnFamily); String[] fields = new String[args.length - 2]; for (int i = 0; i < fields.length; i++) { fields[i] = args[i + 2]; } conf.setStrings("index.fields", fields); conf.set("index.familyname", "attributes"); // 配置任务的运行参数 Job job = new Job(conf, tableName); job.setJarByClass(IndexBuilder.class); job.setMapperClass(Map.class); job.setNumReduceTasks(0); job.setInputFormatClass(TableInputFormat.class); job.setOutputFormatClass(MultiTableOutputFormat.class); return job; } public static void main(String[] args) throws Exception { Configuration conf = HBaseConfiguration.create(); String[] otherArgs = new GenericOptionsParser(conf, args) .getRemainingArgs(); if (otherArgs.length < 3) { System.err.println("Only " + otherArgs.length + " arguments supplied, required: 3"); System.err .println("Usage: IndexBuilder <TABLE_NAME> <COLUMN_FAMILY> <ATTR> [<ATTR> ...]"); System.exit(-1); } Job job = configureJob(conf, otherArgs); System.exit(job.waitForCompletion(true) ? 0 : 1); } } // 运行: // 如果要对heroes中的name和email列构建索引,则运行参数设为heroes info name email
相关推荐
基于Hadoop的mapreduce 在hbase上的使用,基于Hadoop的mapreduce 在hbase上的使用
mapreduce方式入库hbase hive hdfs,速度很快,里面详细讲述了代码的编写过程,值得下载
对Hadoop中的HDFS、MapReduce、Hbase系列知识的介绍。如果想初略了解Hadoop 可下载观看
google mapreduce bigtable hbase 三篇最著名的中文论文
通过这一阶段的调研总结,从内部机理的角度详细分析,HDFS、MapReduce、Hbase、Hive是如何运行,以及基于Hadoop数据仓库的构建和分布式数据库内部具体实现。如有不足,后续及时修改。整个Hadoop的体系结构主要是通过...
#资源达人分享计划#
hadoop基础,hdfs,hive,mapreduce,hbase
NULL 博文链接:https://jsh0401.iteye.com/blog/2096103
基于Python+SpringBoot+Vue+HDFS+MapReduce+HBase+Hive+Kafka+Sp
#资源达人分享计划#
利用hadoop的mapreduce和Hbase,基于lucene做的简单的搜索引擎 ## 基本介绍 - InjectDriver 将本地的url注入到hbase数据库中等待下一步执行 - FetchDriver 负责抓取url对应的网页内容 - ParserUrlDriver 解析所抓取...
利用hadoop的mapreduce和Hbase,基于lucene做的简单的搜索引擎 基本介绍 InjectDriver 将本地的url注入到hbase数据库中等待下一步执行 FetchDriver 负责抓取url对应的网页内容 ParserUrlDriver 解析所抓取网页内容...
hbase导入hbase导入
Hadoop-0.20.0-HDFS+MapReduce+Hive+HBase十分钟快速入门
Indexer 索引和搜索不会影响 HBase 运行的稳定性和 HBase 数据写入的吞吐量,因为索引和 搜索过程是完全分开并且异步的。Lily HBase Indexer 在 CDH5 中运行必须依赖 HBase、 SolrCloud 和 Zookeeper 服务。
#资源达人分享计划#
在Hadoop上使用MapReduce构建反向索引器 脚步 在Makefile中更改netid(默认为jguo7) $cd src 字数 $cd wordCount $make init: build up the directories in the HDFS, pre-process the input file and put it into...
倒排索引源于实际应用中需要根据属性的值来查找记录。这种索引表中的每一项都包括一个属性值和具有该属性值的各记录的地址。由于不是由记录来确定属性值,而是由属性值来确定记录的位置,因而称为倒排索引(inverted ...
Mapreduce实验报告 前言和简介 MapReduce是Google提出的一种编程模型,在这个模型的支持下可以实现大规模并行化计 算。在Mapreduce框架下一个计算机群通过统一的任务调度将一个巨型任务分成许多部分 ,分别解决然后...
Hadoop_学习MapReduce,HBase,协处理器的学习与实现