Scala _11SparkWordCountscala语言java语言
生活随笔
收集整理的這篇文章主要介紹了
Scala _11SparkWordCountscala语言java语言
小編覺得挺不錯的,現在分享給大家,幫大家做個參考.
創建Maven項目
pom.xml文件?
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><groupId>com.henu</groupId><artifactId>MySpark01</artifactId><version>1.0-SNAPSHOT</version><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target></properties><dependencies><!-- Spark-core --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-core_2.11</artifactId><version>2.3.1</version></dependency><!-- SparkSQL --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-sql_2.11</artifactId><version>2.3.1</version></dependency><!-- SparkSQL ON Hive--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-hive_2.11</artifactId><version>2.3.1</version></dependency><!--mysql依賴的jar包--><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.47</version></dependency><!--SparkStreaming--><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>2.3.1</version><!--<scope>provided</scope>--></dependency><!-- SparkStreaming + Kafka --><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.11</artifactId><version>2.3.1</version></dependency><!-- 向kafka 生產數據需要包 --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>0.10.0.0</version></dependency><!--連接 Redis 需要的包--><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>2.6.1</version></dependency><!-- Scala 包--><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>2.11.7</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-compiler</artifactId><version>2.11.7</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-reflect</artifactId><version>2.11.7</version></dependency><dependency><groupId>log4j</groupId><artifactId>log4j</artifactId><version>1.2.12</version></dependency><dependency><groupId>com.google.collections</groupId><artifactId>google-collections</artifactId><version>1.0</version></dependency></dependencies><build><plugins><!-- 在maven項目中既有java又有scala代碼時配置 maven-scala-plugin 插件打包時可以將兩類代碼一起打包 --><plugin><groupId>org.scala-tools</groupId><artifactId>maven-scala-plugin</artifactId><version>2.15.2</version><executions><execution><goals><goal>compile</goal><goal>testCompile</goal></goals></execution></executions></plugin><!-- maven 打jar包需要插件 --><plugin><artifactId>maven-assembly-plugin</artifactId><version>2.4</version><configuration><!-- 設置false后是去掉 MySpark-1.0-SNAPSHOT-jar-with-dependencies.jar 后的 “-jar-with-dependencies” --><!--<appendAssemblyId>false</appendAssemblyId>--><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs><archive><manifest><mainClass>com.bjsxt.scalaspark.sql.windows.OverFunctionOnHive</mainClass></manifest></archive></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>assembly</goal></goals></execution></executions></plugin><!-- 以上assembly可以將依賴的包打入到一個jar包中,下面這種方式是使用maven原生的方式打jar包,不將依賴的包打入到最終的jar包中 --><!--<plugin>--><!--<groupId>org.apache.maven.plugins</groupId>--><!--<artifactId>maven-jar-plugin</artifactId>--><!--<version>2.4</version>--><!--<configuration>--><!--<archive>--><!--<manifest>--><!--<addClasspath>true</addClasspath>--><!--<!– 指定當前主類運行時找依賴的jar包時 所有依賴的jar包存放路徑的前綴 –>--><!--<classpathPrefix>/alljars/lib</classpathPrefix>--><!--<mainClass>com.bjsxt.javaspark.sql.CreateDataSetFromHive</mainClass>--><!--</manifest>--><!--</archive>--><!--</configuration>--><!--</plugin>--><!-- 拷貝依賴的jar包到lib目錄 --><!--<plugin>--><!--<groupId>org.apache.maven.plugins</groupId>--><!--<artifactId>maven-dependency-plugin</artifactId>--><!--<executions>--><!--<execution>--><!--<id>copy</id>--><!--<phase>package</phase>--><!--<goals>--><!--<goal>copy-dependencies</goal>--><!--</goals>--><!--<configuration>--><!--<outputDirectory>--><!--<!– 將依賴的jar 包復制到target/lib下–>--><!--${project.build.directory}/lib--><!--</outputDirectory>--><!--</configuration>--><!--</execution>--><!--</executions>--><!--</plugin>--></plugins></build></project>WordCount
package com.henu.sparkimport org.apache.spark.{SparkConf, SparkContext}object WordCount {def main(args: Array[String]): Unit = {//conf可以設置SparkApplication的名稱,設置spark的運行模式val conf = new SparkConf()conf.setAppName("wordcount")conf.setMaster("local")//SparkContext是通過spark集群的唯一通道val sc = new SparkContext(conf)val lines = sc.textFile("d://data.txt")val words = lines.flatMap(line => {line.split(" ")})val pairWords = words.map(word => {new Tuple2(word, 1)})val result = pairWords.reduceByKey((v1:Int,v2:Int) => {v1+v2})result.foreach(one => {println(one)})} }見證scala 和 spark 的魅力的時候到了【三行搞定】
package com.henu.sparkimport org.apache.spark.{SparkConf, SparkContext}object MinWordCount {def main(args: Array[String]): Unit = {val conf = new SparkConf().setAppName("wordcount").setMaster("local")val sc = new SparkContext(conf)sc.textFile("d://data.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).foreach(println)} }對了,data.txt文件
hello hello hello world hello george hello honey pi ba兩者的運行結果一致:
?
【強力補充、直接秒懂】
?
上面采用的是scala語言,下面使用java語言:
package ddd;import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2;import java.util.Arrays; import java.util.Iterator;/*** @author George* @description**/ public class JavaWordCount {public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local");conf.setAppName("wc");JavaSparkContext sc = new JavaSparkContext(conf);JavaRDD<String> lines = sc.textFile("d://data.txt");JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {@Overridepublic Iterator<String> call(String line) throws Exception {return Arrays.asList(line.split(" ")).iterator();}});JavaPairRDD<String,Integer> pairWords = words.mapToPair(new PairFunction<String, String, Integer>() {@Overridepublic Tuple2<String, Integer> call(String s) throws Exception {return new Tuple2<>(s,1);}});JavaPairRDD<String,Integer> result = pairWords.reduceByKey(new Function2<Integer, Integer, Integer>() {@Overridepublic Integer call(Integer integer, Integer integer2) throws Exception {return integer+integer2;}});result.foreach(new VoidFunction<Tuple2<String, Integer>>() {@Overridepublic void call(Tuple2<String, Integer> tp) throws Exception {System.out.println(tp);}});sc.stop();} }將其簡化為lambda表達式
package ddd;import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2;import java.util.Arrays; import java.util.Iterator;/*** @author George* @description**/ public class JavaWordCount {public static void main(String[] args) {SparkConf conf = new SparkConf();conf.setMaster("local");conf.setAppName("wc");JavaSparkContext sc = new JavaSparkContext(conf);JavaRDD<String> lines = sc.textFile("d://data.txt");JavaRDD<String> words = lines.flatMap((FlatMapFunction<String, String>) line -> Arrays.asList(line.split(" ")).iterator());JavaPairRDD<String,Integer> pairWords = words.mapToPair((PairFunction<String, String, Integer>) s -> new Tuple2<>(s,1));JavaPairRDD<String,Integer> result = pairWords.reduceByKey((Function2<Integer, Integer, Integer>) (integer, integer2) -> integer+integer2);result.foreach((VoidFunction<Tuple2<String, Integer>>) tp -> System.out.println(tp));sc.stop();} }輸出結果一致:
?
【注】當然原理也是一致,就不多說了。
總結
以上是生活随笔為你收集整理的Scala _11SparkWordCountscala语言java语言的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Hadoop权威指南 _03_第I部分H
- 下一篇: 最详细的创建虚拟机_创建_安装配置_配置