清洗弹幕数据,去不相关的列和空值,MapReduce
生活随笔
收集整理的這篇文章主要介紹了
清洗弹幕数据,去不相关的列和空值,MapReduce
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
原始數據:
話不多說,直接上代碼!
老樣子先pom.xml
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.henu</groupId><artifactId>ETL</artifactId><version>1.0-SNAPSHOT</version><dependencies><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>2.6.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.6.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>2.6.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-yarn-common</artifactId><version>2.6.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-yarn-client</artifactId><version>2.6.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-yarn-server-common</artifactId><version>2.6.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-yarn-server-resourcemanager</artifactId><version>2.6.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-yarn-server-nodemanager</artifactId><version>2.6.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-yarn-server-applicationhistoryservice</artifactId><version>2.6.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId><version>2.6.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-shuffle</artifactId><version>2.6.0</version></dependency></dependencies></project>ETLUtil
package com.wc;/*** @author George* @description etl工具類**/ public class ETLUtil {public static String oriString2ETLString(String ori){StringBuilder etlString = new StringBuilder();if (ori.startsWith("0")) {String[] splits = ori.split("\t");for (String split : splits) {if (!"25".equals(split) && !"".equals(split)) {etlString.append(split + "#");}}}return etlString.toString();} /*public static void main(String[] args) throws IOException {BufferedReader br = new BufferedReader(new FileReader("./data/test"));String str = "";while ((str = br.readLine())!=null){String string = oriString2ETLString(str);System.out.println(string);}br.close();}*/ }BSETLMapper
package com.wc;import org.apache.commons.lang.StringUtils; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper;import java.io.IOException;/*** @author George* @description map階段**/ public class BSETLMapper extends Mapper<Object, Text, NullWritable,Text> {Text text = new Text();@Overrideprotected void map(Object key, Text value, Context context) throws IOException, InterruptedException {String etlString = ETLUtil.oriString2ETLString(value.toString());//檢查字符串是否為空白、空("")或nullif (StringUtils.isBlank(etlString))return;text.set(etlString);context.write(NullWritable.get(),text);} }BSETLRunner
?
package com.wc; import com.AccountRegisterETL.AccountRegisterETLMapper; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner;import java.io.IOException;/*** @author George* @description**/ public class BSETLRunner implements Tool {private Configuration conf = null;public void setConf(Configuration conf) {this.conf = conf;}public Configuration getConf() {return this.conf;}public int run(String[] args) throws Exception {conf = this.getConf();conf.set("inpath", args[0]);conf.set("outpath", args[1]);Job job = Job.getInstance(conf);job.setJarByClass(BSETLRunner.class);job.setMapperClass(BSETLMapper.class);job.setMapOutputKeyClass(NullWritable.class);job.setMapOutputValueClass(Text.class);job.setNumReduceTasks(0);this.initJobInputPath(job);this.initJobOutputPath(job);return job.waitForCompletion(true) ? 0 : 1;}private void initJobOutputPath(Job job) throws IOException {Configuration conf = job.getConfiguration();String outPathString = conf.get("outpath");FileSystem fs = FileSystem.get(conf);Path outPath = new Path(outPathString);if(fs.exists(outPath)){fs.delete(outPath, true);}FileOutputFormat.setOutputPath(job, outPath);}private void initJobInputPath(Job job) throws IOException {Configuration conf = job.getConfiguration();String inPathString = conf.get("inpath");FileSystem fs = FileSystem.get(conf);Path inPath = new Path(inPathString);if(fs.exists(inPath)){FileInputFormat.addInputPath(job, inPath);}else{throw new RuntimeException("HDFS中該文件目錄不存在:" + inPathString);}}public static void main(String[] args) {try {int resultCode = ToolRunner.run(new BSETLRunner(), args);if(resultCode == 0){System.out.println("Success!");}else{System.out.println("Fail!");}System.exit(resultCode);} catch (Exception e) {e.printStackTrace();System.exit(1);}} }啟動集群!!!?
上傳jar包,
上傳數據:
[root@henu2 ~]# hdfs dfs -put data.txt /運行jar包:?
[root@henu2 ~]# hdfs dfs -mkdir /out [root@henu2 ~]# yarn jar ETL-1.0-SNAPSHOT.jar com.wc.BSETLRunner /data.txt /out/得到結果數據:
結果展示:
?
總結
以上是生活随笔為你收集整理的清洗弹幕数据,去不相关的列和空值,MapReduce的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 游戏数仓分析(三)SpringBoot项
- 下一篇: 手绘风格的数据可视化 (萌萌风)Sket