(六)Spark-Eclipse开发环境WordCount-JavaPython版Spark
Spark-Eclipse開發(fā)環(huán)境WordCount
視頻教程:
1、優(yōu)酷
2、YouTube
?
安裝eclipse
解壓eclipse-jee-mars-2-win32-x86_64.zip
?
JavaWordcount
解壓spark-2.0.0-bin-hadoop2.6.tgz
創(chuàng)建?Java Project-->Spark
將spark-2.0.0-bin-hadoop2.6下的jars里面的jar全部復(fù)制到Spark項目下的lib下
Add Build Path
1 package com.bean.spark.wordcount; 2 3 4 5 import java.util.Arrays; 6 7 import java.util.Iterator; 8 9 10 11 import org.apache.spark.SparkConf; 12 13 import org.apache.spark.api.java.JavaPairRDD; 14 15 import org.apache.spark.api.java.JavaRDD; 16 17 import org.apache.spark.api.java.JavaSparkContext; 18 19 import org.apache.spark.api.java.function.FlatMapFunction; 20 21 import org.apache.spark.api.java.function.Function2; 22 23 import org.apache.spark.api.java.function.PairFunction; 24 25 import org.apache.spark.api.java.function.VoidFunction; 26 27 28 29 import scala.Tuple2; 30 31 32 33 public class WordCount { 34 35 public static void main(String[] args) { 36 37 //創(chuàng)建SparkConf對象,設(shè)置Spark應(yīng)用程序的配置信息 38 39 SparkConf conf = new SparkConf(); 40 41 conf.setMaster("local"); 42 43 conf.setAppName("wordcount"); 44 45 46 47 //創(chuàng)建SparkContext對象,Java開發(fā)使用JavaSparkContext;Scala開發(fā)使用SparkContext 48 49 //SparkContext負(fù)責(zé)連接Spark集群,創(chuàng)建RDD、累積量和廣播量等 50 51 JavaSparkContext sc = new JavaSparkContext(conf); 52 53 54 55 //sc中提供了textFile方法是SparkContext中定義的,用來讀取HDFS上的 56 57 //文本文件、集群中節(jié)點的本地文本文件或任何支持Hadoop的文件系統(tǒng)上的文本文件,它的返回值是JavaRDD[String],是文本文件每一行 58 59 JavaRDD<String> lines = sc.textFile("D:/tools/data/wordcount/wordcount.txt"); 60 61 //將每一行文本內(nèi)容拆分為多個單詞 62 63 //lines調(diào)用flatMap這個transformation算子(參數(shù)類型是FlatMapFunction接口實現(xiàn)類)返回每一行的每個單詞 64 65 JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { 66 67 68 69 private static final long serialVersionUID = 1L; 70 71 72 73 @Override 74 75 public Iterator<String> call(String s) throws Exception { 76 77 // TODO Auto-generated method stub 78 79 return Arrays.asList(s.split(" ")).iterator(); 80 81 } 82 83 }); 84 85 //將每個單詞的初始數(shù)量都標(biāo)記為1個 86 87 //words調(diào)用mapToPair這個transformation算子(參數(shù)類型是PairFunction接口實現(xiàn)類, 88 89 //PairFunction<String, String, Integer>的三個參數(shù)是<輸入單詞, Tuple2的key, Tuple2的value>), 90 91 //返回一個新的RDD,即JavaPairRDD 92 93 JavaPairRDD<String, Integer> word = words.mapToPair(new PairFunction<String, String, Integer>() { 94 95 96 97 private static final long serialVersionUID = 1L; 98 99 100 101 @Override 102 103 public Tuple2<String, Integer> call(String s) throws Exception { 104 105 // TODO Auto-generated method stub 106 107 return new Tuple2<String, Integer>(s, 1); 108 109 } 110 111 }); 112 113 //計算每個相同單詞出現(xiàn)的次數(shù) 114 115 //pairs調(diào)用reduceByKey這個transformation算子(參數(shù)是Function2接口實現(xiàn)類)對每個key的value進行reduce操作, 116 117 //返回一個JavaPairRDD,這個JavaPairRDD中的每一個Tuple的key是單詞、value則是相同單詞次數(shù)的和 118 119 JavaPairRDD<String, Integer> counts = word.reduceByKey(new Function2<Integer, Integer, Integer>() { 120 121 122 123 private static final long serialVersionUID = 1L; 124 125 126 127 @Override 128 129 public Integer call(Integer s1, Integer s2) throws Exception { 130 131 // TODO Auto-generated method stub 132 133 return s1 + s2; 134 135 } 136 137 }); 138 139 counts.foreach(new VoidFunction<Tuple2<String,Integer>>() { 140 141 142 143 private static final long serialVersionUID = 1L; 144 145 146 147 @Override 148 149 public void call(Tuple2<String, Integer> wordcount) throws Exception { 150 151 // TODO Auto-generated method stub 152 153 System.out.println(wordcount._1+" : "+wordcount._2); 154 155 } 156 157 }); 158 159 //將計算結(jié)果文件輸出到文件系統(tǒng) 160 161 /* 162 163 * HDFS 164 165 * 新版的API 166 167 * org.apache.hadoop.mapreduce.lib.output.TextOutputFormat 168 169 * counts.saveAsNewAPIHadoopFile("hdfs://master:9000/data/wordcount/output", Text.class, IntWritable.class, TextOutputFormat.class, new Configuration()); 170 171 * 使用默認(rèn)TextOutputFile寫入到HDFS(注意寫入HDFS權(quán)限,如無權(quán)限則執(zhí)行:hdfs dfs -chmod -R 777 /data/wordCount/output) 172 173 * wordCount.saveAsTextFile("hdfs://soy1:9000/data/wordCount/output"); 174 175 * 176 177 * 178 179 * */ 180 181 counts.saveAsTextFile("D:/tools/data/wordcount/output"); 182 183 184 185 186 187 //關(guān)閉SparkContext容器,結(jié)束本次作業(yè) 188 189 sc.close(); 190 191 } 192 193 }?
?
運行出錯
在代碼中加入:只要式加在JavaSparkContext初始化之前就可以
System.setProperty("hadoop.home.dir", "D:/tools/spark-2.0.0-bin-hadoop2.6");
將hadoop2.6(x64)工具.zip解壓到D:\tools\spark-2.0.0-bin-hadoop2.6\bin目錄下
?
PythonWordcount
eclipse集成python插件
解壓pydev.zip將features和plugins中的包復(fù)制到eclipse的對應(yīng)目錄
1 #-*- coding:utf-8-*- 2 3 4 5 from __future__ import print_function 6 7 from operator import add 8 9 import os 10 11 from pyspark.context import SparkContext 12 13 ''' 14 15 wordcount 16 17 ''' 18 19 if __name__ == "__main__": 20 21 os.environ["HADOOP_HOME"] = "D:/tools/spark-2.0.0-bin-hadoop2.6" 22 23 sc = SparkContext() 24 25 lines = sc.textFile("file:///D:/tools/data/wordcount/wordcount.txt").map(lambda r: r[0:]) 26 27 counts = lines.flatMap(lambda x: x.split(' ')) \ 28 29 .map(lambda x: (x, 1)) \ 30 31 .reduceByKey(add) 32 33 output = counts.collect() 34 35 for (word, count) in output: 36 37 print("%s: %i" % (word, count))?
?
提交代碼到集群上運行
java:
[hadoop@master application]$ spark-submit --master spark://master:7077 --class com.bean.spark.wordcount.WordCount spark.jar
?python:
[hadoop@master application]$ spark-submit --master spark://master:7077 wordcount.py
?
轉(zhuǎn)載于:https://www.cnblogs.com/LgyBean/p/6251344.html
總結(jié)
以上是生活随笔為你收集整理的(六)Spark-Eclipse开发环境WordCount-JavaPython版Spark的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: With you With me
- 下一篇: 性能指标分析