启动Spark Shell,在Spark Shell中编写WordCount程序,在IDEA中编写WordCount的Maven程序,spark-submit使用spark的jar来做单词统计
1.啟動Spark Shell
spark-shell是Spark自帶的交互式Shell程序,方便用戶進(jìn)行交互式編程,用戶可以在該命令行下用scala編寫spark程序。要注意的是要啟動Spark-Shell需要先啟動Spark-ha集群,Spark集群安裝和部署參考:http://blog.csdn.net/tototuzuoquan/article/details/74481570
1.2.1、啟動spark shell
啟動方式一:
[root@hadoop1 spark-2.1.1-bin-hadoop2.7]# cd $SPARK_HOME [root@hadoop1 spark-2.1.1-bin-hadoop2.7]# pwd /home/tuzq/software/spark-2.1.1-bin-hadoop2.7 [root@hadoop1 spark-2.1.1-bin-hadoop2.7]# bin/spark-shell --master spark://hadoop1:7077,hadoop2:7077通過使用–master指定master的地址,連接的是啟動著的那個master
同樣,還可以指定執(zhí)行的內(nèi)存數(shù)和總的核心數(shù)
[root@hadoop1 spark-2.1.1-bin-hadoop2.7]# cd $SPARK_HOME [root@hadoop1 spark-2.1.1-bin-hadoop2.7]# bin/spark-shell --master spark://hadoop1:7077,hadoop2:7077 --executor-memory 2g --total-executor-cores 2參數(shù)說明:
–master spark://hadoop:7077 指定Master的地址
–executor-memory 2g 指定每個worker可用內(nèi)存為2G
–total-executor-cores 2 指定整個集群使用的cup核數(shù)為2個
注意:
如果啟動spark shell時沒有指定master地址,但是也可以正常啟動spark shell和執(zhí)行spark shell中的程序,其實是啟動了spark的local模式,該模式僅在本機(jī)啟動一個進(jìn)程,沒有與集群建立聯(lián)系。
Spark Shell中已經(jīng)默認(rèn)將SparkContext類初始化為對象sc。用戶代碼如果需要用到,則直接應(yīng)用sc即可
1.2.2、在spark shell中編寫WordCount程序
1.首先啟動hdfs
2.向hdfs上傳一個文件到hdfs(hdfs://mycluster/wordcount/input/2.txt)
效果圖下:
如果通過帶有協(xié)議的方式訪問hadoop集群上的文件可以通過下面的方式:
[root@hadoop2 hadoop-2.8.0]# hdfs dfs -ls hdfs://mycluster/ Found 2 items drwx-wx-wx - root supergroup 0 2017-07-06 11:11 hdfs://mycluster/tmp drwxr-xr-x - root supergroup 0 2017-07-06 11:16 hdfs://mycluster/wordcount [root@hadoop2 hadoop-2.8.0]# hdfs dfs -ls hdfs://mycluster/wordcount/input Found 9 items -rw-r--r-- 3 root supergroup 604 2017-07-06 11:16 hdfs://mycluster/wordcount/input/1.txt -rw-r--r-- 3 root supergroup 604 2017-07-06 11:16 hdfs://mycluster/wordcount/input/2.txt -rw-r--r-- 3 root supergroup 604 2017-07-06 11:16 hdfs://mycluster/wordcount/input/3.txt -rw-r--r-- 3 root supergroup 604 2017-07-06 11:16 hdfs://mycluster/wordcount/input/4.txt -rw-r--r-- 3 root supergroup 604 2017-07-06 11:16 hdfs://mycluster/wordcount/input/5.txt -rw-r--r-- 3 root supergroup 27209520 2017-07-06 11:16 hdfs://mycluster/wordcount/input/a.txt -rw-r--r-- 3 root supergroup 27209520 2017-07-06 11:16 hdfs://mycluster/wordcount/input/aaa.txt -rw-r--r-- 3 root supergroup 27787264 2017-07-06 11:16 hdfs://mycluster/wordcount/input/b.txt -rw-r--r-- 3 root supergroup 26738688 2017-07-06 11:16 hdfs://mycluster/wordcount/input/c.txt [root@hadoop2 hadoop-2.8.0]# hdfs dfs -ls hdfs://mycluster/wordcount/input/2.txt -rw-r--r-- 3 root supergroup 604 2017-07-06 11:16 hdfs://mycluster/wordcount/input/2.txt [root@hadoop2 hadoop-2.8.0]# hdfs dfs -cat hdfs://mycluster/wordcount/input/2.txt Collecting and analysis base data for big data analysis;Maintenance Hadoop platform Development Hadoop framework Cooperate with data scientist, verify and implement data models to realize automatic and accurate fraud detection, in order to improve the risk management level of E-commerce/payment platforms Analyze information acquired and compare solutions and weight them against the actual needs, provide root cause analysis affecting key business problems Play an active role in company's anti-fraud platform strategy Support related data analysis work, and provide valuable business reports[root@hadoop2 hadoop-2.8.0]#3.在spark shell中用scala語言編寫spark程序
scala> sc.textFile("hdfs://mycluster/wordcount/input/2.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://mycluster/wordcount/output")1.使用hdfs命令查看結(jié)果
[root@hadoop2 hadoop-2.8.0]# hdfs dfs -ls hdfs://mycluster/wordcount/output Found 3 items -rw-r--r-- 3 root supergroup 0 2017-07-06 11:48 hdfs://mycluster/wordcount/output/_SUCCESS -rw-r--r-- 3 root supergroup 400 2017-07-06 11:48 hdfs://mycluster/wordcount/output/part-00000 -rw-r--r-- 3 root supergroup 346 2017-07-06 11:48 hdfs://mycluster/wordcount/output/part-00001 [root@hadoop2 hadoop-2.8.0]# hdfs dfs -cat hdfs://mycluster/wordcount/output/part-00000 (role,1) (Play,1) (fraud,1) (level,1) (business,2) (improve,1) (platforms,1) (order,1) (big,1) (with,1) (scientist,,1) (active,1) (valuable,1) (data,5) (information,1) (Cooperate,1) (Collecting,1) (framework,1) (E-commerce/payment,1) (acquired,1) (root,1) (accurate,1) (solutions,1) (analysis;Maintenance,1) (problems,1) (them,1) (Analyze,1) (models,1) (analysis,3) (realize,1) (actual,1) (weight,1) [root@hadoop2 hadoop-2.8.0]#說明:
sc是SparkContext對象,該對象是提交spark程序的入口
sc.textFile(“hdfs://mycluster/wordcount/input/2.txt”)是從hdfs中讀取數(shù)據(jù)
flatMap(_.split(” “))先map在壓平
map((_,1))將單詞和1構(gòu)成元組
reduceByKey(+)按照key進(jìn)行reduce,并將value累加
saveAsTextFile(“hdfs://mycluster/wordcount/output”)將結(jié)果寫入到hdfs中
將wordCound的結(jié)果排序,并顯示的代碼:
scala> sc.textFile("hdfs://mycluster/wordcount/input/2.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).sortBy(_._2,false).collect res2: Array[(String, Int)] = Array((and,6), (data,5), (analysis,3), (business,2), (to,2), (platform,2), (in,2), (provide,2), (the,2), (Hadoop,2), (compare,1), (risk,1), (anti-fraud,1), (key,1), (related,1), (base,1), (Support,1), (against,1), (automatic,1), (company's,1), (needs,,1), (implement,1), (affecting,1), (strategy,1), (of,1), (reports,1), (management,1), (detection,,1), (for,1), (work,,1), (cause,1), (an,1), (verify,1), (Development,1), (role,1), (Play,1), (fraud,1), (level,1), (improve,1), (platforms,1), (order,1), (big,1), (with,1), (scientist,,1), (active,1), (valuable,1), (information,1), (Cooperate,1), (Collecting,1), (framework,1), (E-commerce/payment,1), (acquired,1), (root,1), (accurate,1), (solutions,1), (analysis;Maintenance,1), (problems,1), (them,1), (Analyze,1), (m... scala>2、idea中創(chuàng)建spark的maven工程
spark shell僅在測試和驗證我們的程序時使用的較多,在生產(chǎn)環(huán)境中,通常會在IDE中編制程序,然后打成jar包,然后提交到集群,最常用的是創(chuàng)建一個Maven項目,利用Maven來管理jar包的依賴。
創(chuàng)建Maven工程:
要注意的是,在創(chuàng)建好項目之后,一定要重新制定好Maven倉庫所在的位置,不然可能會導(dǎo)致重新下載jar包:
創(chuàng)建好maven項目后,點擊Enable Auto-Import
配置Maven的pom.xml
將src/main/java和src/test/java分別修改成src/main/scala和src/test/scala(或者創(chuàng)建scala的Directory),與pom.xml中的配置保持一致
或者通過如下方式:
新建一個scala class,類型為Object
編寫spark程序代碼:
package cn.toto.sparkimport org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext}/*** Created by toto on 2017/7/6.*/ object WordCount {def main(args: Array[String]): Unit = {//創(chuàng)建sparkconfval conf=new SparkConf().setAppName("WordCount")//創(chuàng)建sparkcontextval sc=new SparkContext(conf)//讀取hdfs中的數(shù)據(jù)val line:RDD[String]=sc.textFile(args(0))//切分單詞val words:RDD[String]=line.flatMap(_.split(" "))//將單詞計算val wordAndOne:RDD[(String,Int)]=words.map((_,1))//分組聚合val result:RDD[(String,Int)]=wordAndOne.reduceByKey((x,y)=>x+y)//排序val finalResult:RDD[(String,Int)]=result.sortBy(_._2,false)//將數(shù)據(jù)存到HDFS中finalResult.saveAsTextFile(args(1))//釋放資源sc.stop()} }打包:
進(jìn)入工程的target目錄下面,獲取jar包
或者直接在IDEA的工程目錄下找到:
將wordCount-1.0-SNAPSHOT.jar上傳到/home/tuzq/software/sparkdata下
使用spark的jar來做單詞統(tǒng)計
要注意的是最后的輸出路徑要不存在,并且運行下面的程序的時候,最好是把spark-shell給關(guān)閉了。否則可能會報錯。
運行時的狀態(tài):
查看hdfs上的結(jié)果:
[root@hadoop1 spark-2.1.1-bin-hadoop2.7]# hdfs dfs -ls hdfs://mycluster/wordcount/out0002 Found 10 items -rw-r--r-- 3 root supergroup 0 2017-07-06 13:13 hdfs://mycluster/wordcount/out0002/_SUCCESS -rw-r--r-- 3 root supergroup 191 2017-07-06 13:13 hdfs://mycluster/wordcount/out0002/part-00000 -rw-r--r-- 3 root supergroup 671 2017-07-06 13:13 hdfs://mycluster/wordcount/out0002/part-00001 -rw-r--r-- 3 root supergroup 245 2017-07-06 13:13 hdfs://mycluster/wordcount/out0002/part-00002 -rw-r--r-- 3 root supergroup 31 2017-07-06 13:13 hdfs://mycluster/wordcount/out0002/part-00003 -rw-r--r-- 3 root supergroup 1096 2017-07-06 13:13 hdfs://mycluster/wordcount/out0002/part-00004 -rw-r--r-- 3 root supergroup 11 2017-07-06 13:13 hdfs://mycluster/wordcount/out0002/part-00005 -rw-r--r-- 3 root supergroup 936 2017-07-06 13:13 hdfs://mycluster/wordcount/out0002/part-00006 -rw-r--r-- 3 root supergroup 588 2017-07-06 13:13 hdfs://mycluster/wordcount/out0002/part-00007 -rw-r--r-- 3 root supergroup 609 2017-07-06 13:13 hdfs://mycluster/wordcount/out0002/part-00008查看其中的任何一個:
[root@hadoop1 spark-2.1.1-bin-hadoop2.7]# hdfs dfs -cat hdfs://mycluster/wordcount/out0002/part-00000 (and,770752) (is,659375) (I,505440) (a,468642) (to,431857) (in,421230) (the,331176) (of,272080) (FDS,218862) (for,213029) (The,196569) (true,196567) (but,196566) (on,193650) (without,193649) [root@hadoop1 spark-2.1.1-bin-hadoop2.7]#總結(jié)
以上是生活随笔為你收集整理的启动Spark Shell,在Spark Shell中编写WordCount程序,在IDEA中编写WordCount的Maven程序,spark-submit使用spark的jar来做单词统计的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 科鲁泽和宝来那个底盘高?
- 下一篇: 23款哈弗m6发动机比21款升级了吗,油