读HDFS文件
2019獨角獸企業(yè)重金招聘Python工程師標(biāo)準(zhǔn)>>>
不帶壓縮的讀取方式
hdfs = FileSystem.get(new URI("hdfs://SANDBOX-HADOOP-01.whh.net:8022"), conf, "bigdata"); package com.whh.bigdata.xetl.test;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.io.compress.CompressionInputStream;import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.util.*;/*** Created by whh on 2017/9/29.*/public class ReadHDFS {private static final String utf8 = "UTF-8";/***讀取了HDFS上的一個txt文件里的內(nèi)容,按行讀取;返回一個map(行的hash值,行內(nèi)容),把重復(fù)的行打出來* @param txtFilePath* @param conf* @return*/public static Map<Integer,List<Integer>> getStringByTXT(String txtFilePath, Configuration conf){Map<Integer,List<Integer>> map = new HashMap<Integer,List<Integer>>();StringBuffer buffer = new StringBuffer();FSDataInputStream fsr = null;//輸入流BufferedReader bufferedReader = null;String lineTxt = null;try{FileSystem fs = FileSystem.get(URI.create(txtFilePath),conf);fsr = fs.open(new Path(txtFilePath));bufferedReader = new BufferedReader(new InputStreamReader(fsr));int lineCount = 1;while ((lineTxt = bufferedReader.readLine()) != null){int hc=lineTxt.hashCode();List <Integer>list=new ArrayList();list.add(lineCount);if (map.containsKey(hc)) //重復(fù)的行打印出來{System.out.println(lineCount +":" + lineTxt);map.get(hc).add(lineCount);}else map.put(hc,list);lineCount++;}} catch (Exception e){e.printStackTrace();} finally{if (bufferedReader != null){try{bufferedReader.close();} catch (IOException e){e.printStackTrace();}}}return map;}/*** @param args*/public static void main(String[] args) {// TODO Auto-generated method stubConfiguration conf = new Configuration();String txtFilePath = "hdfs://SANDBOX-HADOOP-01.whh.net:8022/log_data/stg_log_1600005/day=2017-11-19/-r-00001";// String mbline = getStringByTXT(txtFilePath, conf);Map <Integer,List<Integer>>map=new HashMap();map=getStringByTXT(txtFilePath, conf);for (Map.Entry<Integer, List<Integer>> entry : map.entrySet()){if(entry.getValue().size()>1){System.out.println(entry.getKey()+":"+entry.getValue());}}}}帶壓縮的文件讀取
package com.whh.bigdata.xetl.test;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.compress.CompressionCodec; import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.apache.hadoop.io.compress.CompressionInputStream;import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.net.URI; import java.util.*;/*** Created by whh on 2017/9/29.*/public class ReadHDFS {private static final String utf8 = "UTF-8";/*** 獲取文件的壓縮方式* @param file* @return*/public String getCodec(Path file) {String filename = file.getName();String reversedFilename = (new StringBuilder(filename)).reverse().toString();return reversedFilename;}/*** 讀取壓縮文件的第一行,打印出來* @param txtFilePath* @param conf* @return*/public static String getStringByTXT1(String txtFilePath, Configuration conf){StringBuffer buffer = new StringBuffer();FSDataInputStream fsr = null;BufferedReader bufferedReader = null;String lineTxt = null;try{FileSystem fs = FileSystem.get(URI.create(txtFilePath),conf);fsr = fs.open(new Path(txtFilePath));CompressionCodecFactory factory = new CompressionCodecFactory(conf);CompressionCodec codec = factory.getCodec(new Path(txtFilePath));System.out.println("codec="+codec);CompressionInputStream compin=codec.createInputStream(fsr);//BufferedReader br= new BufferedReader(new InputStreamReader(compin));bufferedReader = new BufferedReader(new InputStreamReader(compin));while ((lineTxt = bufferedReader.readLine()) != null){//if(lineTxt.split("\t")[0].trim().equals("00067")){return lineTxt;//}}} catch (Exception e){e.printStackTrace();} finally{if (bufferedReader != null){try{bufferedReader.close();} catch (IOException e){e.printStackTrace();}}}return lineTxt;}public static void main(String[] args) {// TODO Auto-generated method stubConfiguration conf = new Configuration();String txtFilePath = "hdfs://SANDBOX-HADOOP-01.whh.net:8022/collect_data/userlog/20170925/kp_diag_2017092523_10.1.11.171.1506354616660.1549.log.gz";System.out.println(mbline);}}轉(zhuǎn)載于:https://my.oschina.net/u/3267050/blog/1619492
總結(jié)
- 上一篇: 关于报错stale element re
- 下一篇: SVN与git的区别【图文经典版】