Hadoop生成HFile直接入库HBase心得
生活随笔
收集整理的這篇文章主要介紹了
Hadoop生成HFile直接入库HBase心得
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
轉載請標明出處:http://blackwing.iteye.com/blog/1991380?
hbase自帶了ImportTsv類,可以直接把tsv格式(官方教材顯示,是\t分割各個字段的文本格式)生成HFile,并且使用另外一個類org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles直接把HFile移動到hbase對應的hdfs目錄。?
PS:網上看到一個XD說,直接生成HFile并入庫HBase效率不如先生成HFile,再通過LoadIncrementalHFiles移動文件到hbase目錄高,這點沒有驗證,我的做法也是先生成,再move。?
官方教材在此:?
Java代碼?? http://hbase.apache.org/book/ops_mgt.html#importtsv??
但ImportTsv功能對我來說不適合,例如文件格式為:?
Java代碼?? topsid???uid???roler_num???typ????????time?? 10??????111111???255?????????0???????1386553377000??
ImportTsv導入的命令為:?
Java代碼?? bin/hbase?org.apache.hadoop.hbase.mapreduce.ImportTsv?-Dimporttsv.columns=HBASE_ROW_KEY,kq:topsid,kq:uid,kq:roler_num,kq:type?-Dimporttsv.bulk.output=hdfs://storefile-outputdir?<hdfs-data-inputdir>??
它生成的表格式為:?
Java代碼?? row?: 10??? cf??:??kq?? qualifier:?topsid?? value:?10?? .....??
而我要求的格式是:?
Java代碼?? row?: 10-111111-255?? cf??:??kq?? qualifier:?0?? value:?1??
所以還是自己寫MR處理數據方便。?
Mapper:?
Java代碼?? /*? ?*?adminOnOff.log?文件格式:? ?*?topsid???uid???roler_num???typ???time? ?*?*/?? public?class?HFileImportMapper2?extends?? ????????Mapper<LongWritable,?Text,?ImmutableBytesWritable,?KeyValue>?{?? ????protected?SimpleDateFormat?sdf?=?new?SimpleDateFormat("yyyyMMdd");?? ????protected?final?String?CF_KQ="kq";//考勤?? ????protected?final?int?ONE=1;?? ????@Override?? ????protected?void?map(LongWritable?key,?Text?value,Context?context)?? ????????????throws?IOException,?InterruptedException?{?? ????????String?line?=?value.toString();?? ????????System.out.println("line?:?"+line);?? ????????String[]?datas?=?line.split("\\s+");?? ????????//?row格式為:yyyyMMdd-sid-uid-role_num-timestamp-typ?? ????????String?row?=?sdf.format(new?Date(Long.parseLong(datas[4])))?? ????????????????+?"-"?+?datas[0]?+?"-"?+?datas[1]?+?"-"?+?datas[2]?? ????????????????+?"-"?+?datas[4]?+?"-"?+?datas[3];?? ????????ImmutableBytesWritable?rowkey?=?new?ImmutableBytesWritable(?? ????????????????Bytes.toBytes(row));?? ????????KeyValue?kv?=?new?KeyValue(Bytes.toBytes(row),this.CF_KQ.getBytes(),?datas[3].getBytes(),Bytes.toBytes(this.ONE));?? ????????context.write(rowkey,?kv);?? ????????}?? }??
job:?
Java代碼?? public?class?GenHFile2?{?? ????public?static?void?main(String[]?args)?{?? ????????Configuration?conf?=?new?Configuration();?? ????????conf.addResource("myConf.xml");?? ????????String?input?=?conf.get("input");?? ????????String?output?=?conf.get("output");?? ????????String?tableName?=?conf.get("source_table");?? ????????System.out.println("table?:?"+tableName);?? ????????HTable?table;?? ????????try?{?? ????????????//運行前,刪除已存在的中間輸出目錄?? ????????????try?{?? ????????????????FileSystem?fs?=?FileSystem.get(URI.create(output),?conf);?? ????????????????fs.delete(new?Path(output),true);?? ????????????????fs.close();?? ????????????}?catch?(IOException?e1)?{?? ????????????????e1.printStackTrace();?? ????????????}?? ?????????????? ????????????table?=?new?HTable(conf,tableName.getBytes());?? ????????????Job?job?=?new?Job(conf);?? ????????????job.setJobName("Generate?HFile");?? ?????????????? ????????????job.setJarByClass(HFileImportMapper2.class);?? ????????????job.setInputFormatClass(TextInputFormat.class);?? ????????????job.setMapperClass(HFileImportMapper2.class);?? ????????????FileInputFormat.setInputPaths(job,?input);?? ?????????????? //job.setReducerClass(KeyValueSortReducer.class);?? //job.setMapOutputKeyClass(ImmutableBytesWritable.class);?? //job.setMapOutputValueClass(KeyValue.class);?? ????????????job.getConfiguration().set("mapred.mapoutput.key.class",?"org.apache.hadoop.hbase.io.ImmutableBytesWritable");?? ????????????job.getConfiguration().set("mapred.mapoutput.value.class",?"org.apache.hadoop.hbase.KeyValue");?? ?????????????? //job.setOutputFormatClass(HFileOutputFormat.class);?? FileOutputFormat.setOutputPath(job,?new?Path(output));?? ????//job.setPartitionerClass(SimpleTotalOrderPartitioner.class);?? HFileOutputFormat.configureIncrementalLoad(job,table);?? ????????????try?{?? ????????????????job.waitForCompletion(true);?? ????????????}?catch?(InterruptedException?e)?{?? ????????????????e.printStackTrace();?? ????????????}?catch?(ClassNotFoundException?e)?{?? ????????????????e.printStackTrace();?? ????????????}?? ????????}?catch?(IOException?e)?{?? ????????????e.printStackTrace();?? ????????}?? ????}?? }??
生成的HFile文件在hdfs的/output目錄下,已經根據cf名稱建好文件目錄:?
Java代碼?? hdfs://namenode/output/kq/601c5029fb264dc8869a635043c24560??
其中:?
Java代碼?? HFileOutputFormat.configureIncrementalLoad(job,table);??
根據其源碼知道,會自動為job設置好以下參數:?
Java代碼?? public?static?void?configureIncrementalLoad(Job?job,?HTable?table)?? throws?IOException?{?? ??Configuration?conf?=?job.getConfiguration();?? ?? ??job.setOutputKeyClass(ImmutableBytesWritable.class);?? ??job.setOutputValueClass(KeyValue.class);?? ??job.setOutputFormatClass(HFileOutputFormat.class);?? ?? ??//?Based?on?the?configured?map?output?class,?set?the?correct?reducer?to?properly?? ??//?sort?the?incoming?values.?? ??//?TODO?it?would?be?nice?to?pick?one?or?the?other?of?these?formats.?? ??if?(KeyValue.class.equals(job.getMapOutputValueClass()))?{?? ????job.setReducerClass(KeyValueSortReducer.class);?? ??}?else?if?(Put.class.equals(job.getMapOutputValueClass()))?{?? ????job.setReducerClass(PutSortReducer.class);?? ??}?else?if?(Text.class.equals(job.getMapOutputValueClass()))?{?? ????job.setReducerClass(TextSortReducer.class);?? ??}?else?{?? ????LOG.warn("Unknown?map?output?value?type:"?+?job.getMapOutputValueClass());?? ??}?? ?? ??conf.setStrings("io.serializations",?conf.get("io.serializations"),?? ??????MutationSerialization.class.getName(),?ResultSerialization.class.getName(),?? ??????KeyValueSerialization.class.getName());?? ?? ??//?Use?table's?region?boundaries?for?TOP?split?points.?? ??LOG.info("Looking?up?current?regions?for?table?"?+?Bytes.toString(table.getTableName()));?? ??List<ImmutableBytesWritable>?startKeys?=?getRegionStartKeys(table);?? ??LOG.info("Configuring?"?+?startKeys.size()?+?"?reduce?partitions?"?+?? ??????"to?match?current?region?count");?? ??job.setNumReduceTasks(startKeys.size());?? ?? ??configurePartitioner(job,?startKeys);?? ??//?Set?compression?algorithms?based?on?column?families?? ??configureCompression(table,?conf);?? ??configureBloomType(table,?conf);?? ??configureBlockSize(table,?conf);?? ?? ??TableMapReduceUtil.addDependencyJars(job);?? ??TableMapReduceUtil.initCredentials(job);?? ??LOG.info("Incremental?table?"?+?Bytes.toString(table.getTableName())?+?"?output?configured.");?? }??
HFileOutputFormat只支持寫單個column family,如果有多個cf,則需要寫多個job來實現了。?
hbase自帶了ImportTsv類,可以直接把tsv格式(官方教材顯示,是\t分割各個字段的文本格式)生成HFile,并且使用另外一個類org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles直接把HFile移動到hbase對應的hdfs目錄。?
PS:網上看到一個XD說,直接生成HFile并入庫HBase效率不如先生成HFile,再通過LoadIncrementalHFiles移動文件到hbase目錄高,這點沒有驗證,我的做法也是先生成,再move。?
官方教材在此:?
Java代碼??
但ImportTsv功能對我來說不適合,例如文件格式為:?
Java代碼??
ImportTsv導入的命令為:?
Java代碼??
它生成的表格式為:?
Java代碼??
而我要求的格式是:?
Java代碼??
所以還是自己寫MR處理數據方便。?
Mapper:?
Java代碼??
job:?
Java代碼??
生成的HFile文件在hdfs的/output目錄下,已經根據cf名稱建好文件目錄:?
Java代碼??
其中:?
Java代碼??
根據其源碼知道,會自動為job設置好以下參數:?
Java代碼??
HFileOutputFormat只支持寫單個column family,如果有多個cf,則需要寫多個job來實現了。?
總結
以上是生活随笔為你收集整理的Hadoop生成HFile直接入库HBase心得的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: hadoop 之DefaultStrin
- 下一篇: HDFS 的Trash回收站功能的配置、