Eclipse的下载、安装和WordCount的初步使用(本地模式和集群模式)
?
?
包括:
? ?Eclipse的下載
Eclipse的安裝
? ? ? ?Eclipse的使用
? 本地模式或集群模式
?
?
?
? ? ? ?Scala IDE for Eclipse的下載、安裝和WordCount的初步使用(本地模式和集群模式)
? ? ? ?IntelliJ IDEA的下載、安裝和WordCount的初步使用(本地模式和集群模式)
?
?
?
? ? ?我們知道,對于開發而言,IDE是有很多個選擇的版本。如我們大部分人經常用的是如下。
Eclipse?*版本
Eclipse?*下載
? ? ?現在啊,在業界,用java語言,開發是霸主地位。
? ? ?比如,一個高級的高手人員,在企業里,做了一個大開發,他走了之后,一般java,還算比較好其余的人,熟悉和做二次開發。
?
?
?
Eclipse的使用
?
?
創建Maven工程
這里,其實,可以跳過,參考我的博客
Eclipse下新建Maven項目、自動打依賴jar包
新建包?com.zhouls.spark.SparkApps.cores
新建WordCount.java
?
?Ctrl + 2 ,再選擇 Quick Assist - Assign to local variable ?Ctrl + 2,L 。
?
?
在spark里,貌似不可以,本人目前還沒找到原因。
?
?
注意,不同語言編寫,創建sparkcontext,是不同的。
比如,這里,是java語言,則是
?
?
? ? ? ? ?這個,因為是java語言編寫的,所以,就沒有像scala那樣具有自動推導。
說白了,就是,我們java語言編寫,其實就是一層外衣而已。
?
?
?繼續,編程,本地local模式下的用java語言編寫的WordCount
?
?
?
?
?成功,上述不是 錯誤,
?
?
?
?
Spark-Java版本WordCount示例(本地local模式)
WordCount.java
?
package com.zhouls.spark.SparkApps.cores;import java.util.Arrays;import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction;import scala.Tuple2;/** * Spark的WordCount程序 * @author zhouls * */ public class WordCount {public static void main(String[] args) {/** * 第1步:創建spark的配置對象SparkConf,設置spark程序的運行時的配置信息, * 例如說通過setMaster來設置程序要連接的spark集群的Master的URI,如果設置為local,則代表spark程序在本地運行, * 特別適合機器配置條件非常差(例如只有1G內存)的初學者 */ SparkConf conf = new SparkConf().setAppName("Spark WordCount written by Java").setMaster("local");/** * 2、創建SparkContext對象,Java開發使用JavaSparkContext;Scala開發使用SparkContext * 在Spark中,SparkContext負責連接Spark集群,創建RDD、累積量和廣播量等。 * Master參數是為了創建TaskSchedule(較低級的調度器,高層次的調度器為DAGSchedule),如下: * 如果setMaster("local")則創建LocalSchedule; * 如果setMaster("spark")則創建SparkDeploySchedulerBackend。在SparkDeploySchedulerBackend的start函數,會啟動一個Client對象,連接到Spark集群。 * /* * 第2步:創建SparkContext對象 * SparkContext是Spark程序所有功能的唯一入口,無論是采用scala、Java、Python、R等都必須有一個SparkContext(不同的語言具體的類名稱不同,如果是Java的話則為JavaSparkContext) * SparkContext核心作用,初始化Spark應用程序運行所需要的核心組件,包括DAGScheduler、TaskScheduler、SchedulerBackend * 同時還會負責Spark程序往Master注冊程序等 * SparkContext是整個Spark應用程序中最為至關重要的一個對象 */ JavaSparkContext sc = new JavaSparkContext(conf);//其實,底層實際上就是scala的SparkContext/** * 第3步: sc中提供了textFile方法是SparkContext中定義的,如下: * def textFile(path: String): JavaRDD[String] = sc.textFile(path) * 用來讀取HDFS上的文本文件、集群中節點的本地文本文件或任何支持Hadoop的文件系統上的文本文件,它的返回值是JavaRDD[String],是文本文件每一行 * * 第3步:根據具體的數據來源(如HDFS、HBase、Local FS、DB、S3等)通過JavaSparkContext來創建JavaRDD * JavaRDD的創建基本有三種方式:根據外部的數據來源(例如HDFS)、根據Scala集合、由其它的RDD操作 * 數據會被RDD劃分成一系列的Partitions,分配到每個Partition的數據屬于一個Task的處理范疇 */ JavaRDD<String> lines = sc.textFile("D://SoftWare//spark-1.5.2-bin-hadoop2.6//README.md");/** * 4、將行文本內容拆分為多個單詞 * lines調用flatMap這個transformation算子(參數類型是FlatMapFunction接口實現類)返回每一行的每個單詞 * /* * 第4步:對初始的JavaRDD進行Transformation級別的處理,例如map、filter等高階函數等的編程,來進行具體的數據計算 * 第4.1步:將每一行的字符串拆分成單個的單詞 */ JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String,String>(){ @Override public Iterable<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")); } });/** * 第4.2步、將每個單詞的初始數量都標記為1個 * words調用mapToPair這個transformation算子(參數類型是PairFunction接口實現類,PairFunction<String, String, Integer>的三個參數是<輸入單詞, Tuple2的key, Tuple2的value>),返回一個新的RDD,即JavaPairRDD * * * 第4.2步:在單詞拆分的基礎上對每個單詞實例計數為1,也就是word => (word,1) */ JavaPairRDD<String,Integer> pairs =words.mapToPair(new PairFunction<String,String,Integer>(){ @Override public Tuple2<String,Integer> call(String word)throws Exception{ return new Tuple2<String,Integer>(word,1); } });/** * 第4.3步、計算每個相同單詞出現的次數 * pairs調用reduceByKey這個transformation算子(參數是Function2接口實現類)對每個key的value進行reduce操作,返回一個JavaPairRDD,這個JavaPairRDD中的每一個Tuple的key是單詞、value則是相同單詞次數的和 * * * 第4.3步:在每個單詞實例計數為1基礎之上統計每個單詞在文件中出現的總次數 */ JavaPairRDD<String,Integer> wordsCount = pairs.reduceByKey(new Function2<Integer,Integer,Integer>(){ @Override public Integer call(Integer v1,Integer v2)throws Exception{ return v1 + v2; } });/** * 第5步: 使用foreach這個action算子提交Spark應用程序 * 在Spark中,每個應用程序都需要transformation算子計算,最終由action算子觸發作業提交 */ wordsCount.foreach(new VoidFunction<Tuple2<String,Integer>>() { @Override public void call(Tuple2<String, Integer> pairs) throws Exception { System.out.println(pairs._1+":"+pairs._2); } });/** * 第6步:關閉SparkContext容器,結束本次作業 */ sc.close();}}?
?
?
?
?
?Spark-Java版本WordCount示例(集群模式)
?
注意,這是,Ubuntu系統下的路徑而已
/usr/local/spark/spark-1.5.2-bin-hadoop2.6
?
這里,必須要復制一份,到hdfs上,即Hadoop集群上。
spark@SparkSingleNode:/usr/local/hadoop/hadoop-2.6.0$
bin/hadoop fs -copyFromLocal ?/usr/local/spark/spark-1.5.2-bin-hadoop2.6/README.md ??hdfs://SparkSingleNode:9000/
這樣, 就可以了。
?
?
?
?
WordCountCluster.java
package com.zhouls.spark.SparkApps.cores;import java.util.Arrays;import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction;import scala.Tuple2;/** * Spark的WordCountCluster程序 * @author zhouls * */ public class WordCountCluster {public static void main(String[] args) {/** * 第1步:創建spark的配置對象SparkConf,設置spark程序的運行時的配置信息, * 例如說通過setMaster來設置程序要連接的spark集群的Master的URI,如果設置為local,則代表spark程序在本地運行, * 特別適合機器配置條件非常差(例如只有1G內存)的初學者 */ SparkConf conf = new SparkConf().setAppName("Spark WordCount written by Java").setMaster("local");/** * 2、創建SparkContext對象,Java開發使用JavaSparkContext;Scala開發使用SparkContext * 在Spark中,SparkContext負責連接Spark集群,創建RDD、累積量和廣播量等。 * Master參數是為了創建TaskSchedule(較低級的調度器,高層次的調度器為DAGSchedule),如下: * 如果setMaster("local")則創建LocalSchedule; * 如果setMaster("spark")則創建SparkDeploySchedulerBackend。在SparkDeploySchedulerBackend的start函數,會啟動一個Client對象,連接到Spark集群。 * /* * 第2步:創建SparkContext對象 * SparkContext是Spark程序所有功能的唯一入口,無論是采用scala、Java、Python、R等都必須有一個SparkContext(不同的語言具體的類名稱不同,如果是Java的話則為JavaSparkContext) * SparkContext核心作用,初始化Spark應用程序運行所需要的核心組件,包括DAGScheduler、TaskScheduler、SchedulerBackend * 同時還會負責Spark程序往Master注冊程序等 * SparkContext是整個Spark應用程序中最為至關重要的一個對象 */ JavaSparkContext sc = new JavaSparkContext(conf);//其實,底層實際上就是scala的SparkContext/** * 第3步: sc中提供了textFile方法是SparkContext中定義的,如下: * def textFile(path: String): JavaRDD[String] = sc.textFile(path) * 用來讀取HDFS上的文本文件、集群中節點的本地文本文件或任何支持Hadoop的文件系統上的文本文件,它的返回值是JavaRDD[String],是文本文件每一行 * * 第3步:根據具體的數據來源(如HDFS、HBase、Local FS、DB、S3等)通過JavaSparkContext來創建JavaRDD * JavaRDD的創建基本有三種方式:根據外部的數據來源(例如HDFS)、根據Scala集合、由其它的RDD操作 * 數據會被RDD劃分成一系列的Partitions,分配到每個Partition的數據屬于一個Task的處理范疇 */ JavaRDD<String> lines = sc.textFile("hdfs://SparkSingleNode:9000/README.md");/** * 4、將行文本內容拆分為多個單詞 * lines調用flatMap這個transformation算子(參數類型是FlatMapFunction接口實現類)返回每一行的每個單詞 * /* * 第4步:對初始的JavaRDD進行Transformation級別的處理,例如map、filter等高階函數等的編程,來進行具體的數據計算 * 第4.1步:將每一行的字符串拆分成單個的單詞 */ JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String,String>(){ private static final long serialVersionUID = -3243665984299496473L; @Override public Iterable<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")); } });/** * 第4.2步、將每個單詞的初始數量都標記為1個 * words調用mapToPair這個transformation算子(參數類型是PairFunction接口實現類,PairFunction<String, String, Integer>的三個參數是<輸入單詞, Tuple2的key, Tuple2的value>),返回一個新的RDD,即JavaPairRDD * * * 第4.2步:在單詞拆分的基礎上對每個單詞實例計數為1,也就是word => (word,1) */ JavaPairRDD<String,Integer> pairs =words.mapToPair(new PairFunction<String,String,Integer>(){ private static final long serialVersionUID = -7879847028195817507L; @Override public Tuple2<String,Integer> call(String word)throws Exception{ return new Tuple2<String,Integer>(word,1); } });/** * 第4.3步、計算每個相同單詞出現的次數 * pairs調用reduceByKey這個transformation算子(參數是Function2接口實現類)對每個key的value進行reduce操作,返回一個JavaPairRDD,這個JavaPairRDD中的每一個Tuple的key是單詞、value則是相同單詞次數的和 * * * 第4.3步:在每個單詞實例計數為1基礎之上統計每個單詞在文件中出現的總次數 */ JavaPairRDD<String,Integer> wordsCount = pairs.reduceByKey(new Function2<Integer,Integer,Integer>(){ private static final long serialVersionUID = -4171349401750495688L; @Override public Integer call(Integer v1,Integer v2)throws Exception{ return v1 + v2; } });/** * 第5步: 使用foreach這個action算子提交Spark應用程序 * 在Spark中,每個應用程序都需要transformation算子計算,最終由action算子觸發作業提交 */ wordsCount.foreach(new VoidFunction<Tuple2<String,Integer>>() { private static final long serialVersionUID = -5926812153234798612L; @Override public void call(Tuple2<String, Integer> pairs) throws Exception { System.out.println(pairs._1+":"+pairs._2); } });/** * 8、將計算結果文件輸出到文件系統 * HDFS: * 使用新版API(org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;) * wordCount.saveAsNewAPIHadoopFile("hdfs://SparkSingleNode:9000/wordcount", Text.class, IntWritable.class, TextOutputFormat.class, new Configuration()); * 使用舊版API(org.apache.hadoop.mapred.JobConf;org.apache.hadoop.mapred.OutputFormat;) * wordCount.saveAsHadoopFile("hdfs://SparkSingleNode:9000/wordcount", Text.class, IntWritable.class, OutputFormat.class, new JobConf(new Configuration())); * 使用默認TextOutputFile寫入到HDFS(注意寫入HDFS權限,如無權限則執行:hdfs dfs -chmod -R 777 /wordcount) * wordCount.saveAsTextFile("hdfs://SparkSingleNode:9000/README.md"); */ wordsCount.saveAsTextFile("hdfs://SparkSingleNode:9000/wordcount");/** * 第7步:關閉SparkContext容器,結束本次作業 */ sc.close();}}?
?
?
?
?在這里,遇到了問題,
繼續、、
?
?
?
?
?感謝如下的博主:
http://www.cnblogs.com/mengyao/p/5059556.html
http://blog.csdn.net/bluejoe2000/article/details/41556979
?
?
?
?
?
其實啊,在集群里,模板就是如下
val file = spark.textFile("hdfs://...”)
val counts = file.flatMap("line => line.spilt(" "))
.map(word => (word,1))
.reduceByKey(_+_)
counts.saveAsTextFile("hdfs://...”)
?
?
?
?
?
?
?
?
?
?
?
?
歡迎大家,加入我的微信公眾號:大數據躺過的坑 ? ? 免費給分享 ?同時,大家可以關注我的個人博客:
???http://www.cnblogs.com/zlslch/?? 和 ?http://www.cnblogs.com/lchzls/?
?
人生苦短,我愿分享。本公眾號將秉持活到老學到老學習無休止的交流分享開源精神,匯聚于互聯網和個人學習工作的精華干貨知識,一切來于互聯網,反饋回互聯網。
目前研究領域:大數據、機器學習、深度學習、人工智能、數據挖掘、數據分析。 語言涉及:Java、Scala、Python、Shell、Linux等 。同時還涉及平常所使用的手機、電腦和互聯網上的使用技巧、問題和實用軟件。 只要你一直關注和呆在群里,每天必須有收獲
?
? ? ? ?以及對應本平臺的QQ群:161156071(大數據躺過的坑)
?
?
?
?
?
?
?
?
?
?
?
總結
以上是生活随笔為你收集整理的Eclipse的下载、安装和WordCount的初步使用(本地模式和集群模式)的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2016 ACM/ICPC Asia R
- 下一篇: 工行居逸贷,信贷员说3年利率11.38%