hive指定多个字符作为列分隔符的问题说明
生活随笔
收集整理的這篇文章主要介紹了
hive指定多个字符作为列分隔符的问题说明
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
1、問題:HDFS文件上列分隔符是##,hive建表時直接用##,發現輸出的字段和文件不一致。
? ? ? 建表語句如下:
? ? ??
ROW FORMAT DELIMITED FIELDS TERMINATED BY '##' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION'hdfs://nameservice-ha/pgw/gz'2、原因:hive創建表指定分隔符時,不支持多個字符作為分隔符。
? ? ?上述就只能用#,簡單解決辦法就是寫個MR程序將兩個##改成一個#。
3、解決:Hive要支持多個字符作為分割符,需要自定義InputFormat.,重寫next方法。
? ? ?代碼如下:
? ??
package com.hive;import java.io.IOException;import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConfigurable; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import org.apache.hadoop.mapred.TextInputFormat;public class DefTextInputFormat extends TextInputFormat implements JobConfigurable {public RecordReader<LongWritable, Text> getRecordReader(InputSplit genericSplit, JobConf job, Reporter reporter) throws IOException {reporter.setStatus(genericSplit.toString());return new DefRecordReader((FileSplit)genericSplit, job);} }package com.hive;import java.io.IOException; import java.io.InputStream;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.util.LineReader;public class DefRecordReader implements RecordReader<LongWritable, Text> {private CompressionCodecFactory compressionCodecs = null;private long start;private long pos;private long end;private LineReader lineReader;int maxLineLength;// 構造方法public DefRecordReader(FileSplit inputSplit, Configuration job) throws IOException {maxLineLength = job.getInt("mapred.mutilCharRecordReader.maxlength",Integer.MAX_VALUE);start = inputSplit.getStart();end = start + inputSplit.getLength();final Path file = inputSplit.getPath();// 創建壓縮器compressionCodecs = new CompressionCodecFactory(job);final CompressionCodec codec = compressionCodecs.getCodec(file);// 打開文件系統FileSystem fs = file.getFileSystem(job);FSDataInputStream fileIn = fs.open(file);boolean skipFirstLine = false;if (codec != null) {lineReader = new LineReader(codec.createInputStream(fileIn), job);end = Long.MAX_VALUE;} else {if (start != 0) {skipFirstLine = true;--start;fileIn.seek(start);}lineReader = new LineReader(fileIn, job);}if (skipFirstLine) {start += lineReader.readLine(new Text(), 0,(int) Math.min((long) Integer.MAX_VALUE, end - start));}this.pos = start;}public DefRecordReader(InputStream in, long offset, long endOffset, int maxLineLength) {this.maxLineLength = maxLineLength;this.start = offset;this.lineReader = new LineReader(in);this.pos = offset;this.end = endOffset;}public DefRecordReader(InputStream in, long offset, long endOffset, Configuration job) throws IOException {this.maxLineLength = job.getInt("mapred.mutilCharRecordReader.maxlength", Integer.MAX_VALUE);this.lineReader = new LineReader(in, job);this.start = offset;this.end = endOffset;}@Overridepublic void close() throws IOException {if (lineReader != null)lineReader.close();}@Overridepublic LongWritable createKey() {return new LongWritable();}@Overridepublic Text createValue() {return new Text();}@Overridepublic long getPos() throws IOException {return pos;}@Overridepublic float getProgress() throws IOException {if (start == end) {return 0.0f;} else {return Math.min(1.0f, (pos - start) / (float) (end - start));}}@Override//重構next方法,處理行中字符,將多個列分割字符變成1個列分割字符public boolean next(LongWritable key, Text value) throws IOException {while (pos < end) {key.set(pos);int newSize = lineReader.readLine(value, maxLineLength,Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),maxLineLength));// 把字符串中的"##"轉變為"#"String strReplace = value.toString().replace("##", "#");Text txtReplace = new Text();txtReplace.set(strReplace);value.set(txtReplace.getBytes(), 0, txtReplace.getLength());if (newSize == 0)return false;pos += newSize;if (newSize < maxLineLength)return true;}return false;} }
在建表時,指定com.hive.DefTextInputFormat類為INPUTFORMAT 。
當然要先將這兩個類打包成jar部署到Hive的運行環境中,可參考http://blog.csdn.net/fjssharpsword/article/details/70271671
總結
以上是生活随笔為你收集整理的hive指定多个字符作为列分隔符的问题说明的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Hadoop平台作业参数设置关于mapr
- 下一篇: Hive自定义UDF的JAR包加入运行环