Spark2.1.0之初体验
? ? ? ?在《Spark2.1.0之運行環境準備》一文中,已經介紹了如何準備好基本的Spark運行環境,現在是時候實踐一下,以便于在使用過程中提升讀者對于Spark最直接的感觸!本文通過Spark的基本使用,讓讀者對Spark能有初步的認識,便于引導讀者逐步深入學習。
運行spark-shell
? ? 在《Spark2.1.0之運行環境準備》一文曾經簡單運行了spark-shell,并用下圖進行了展示(此處再次展示此圖)。
圖1????執行spark-shell進入Scala命令行
圖1中顯示了很多信息,這里進行一些說明:
- 在安裝完Spark 2.1.0后,如果沒有明確指定log4j的配置,那么Spark會使用core模塊的org/apache/spark/目錄下的log4j-defaults.properties作為log4j的默認配置。log4j-defaults.properties指定的Spark日志級別為WARN。用戶可以到Spark安裝目錄的conf文件夾下從log4j.properties.template復制一份log4j.properties文件,并在其中增加自己想要的配置。
- 除了指定log4j.properties文件外,還可以在spark-shell命令行中通過sc.setLogLevel(newLevel)語句指定日志級別。
- SparkContext的Web UI的地址是:http://192.168.0.106:4040。192.168.0.106是筆者安裝Spark的機器的ip地址,4040是SparkContext的Web UI的默認監聽端口。
- 指定的部署模式(即master)為local[*]。當前應用(Application)的ID為local-1497084620457。
- 可以在spark-shell命令行通過sc使用SparkContext,通過spark使用SparkSession。sc和spark實際分別是SparkContext和SparkSession在Spark REPL中的變量名,具體細節已在《Spark2.1.0之剖析spark-shell》一文有過分析。
??? 由于Spark core的默認日志級別是WARN,所以看到的信息不是很多。現在我們將Spark安裝目錄的conf文件夾下的log4j.properties.template以如下命令復制出一份:
cp log4j.properties.template log4j.properties并將log4j.properties中的log4j.logger.org.apache.spark.repl.Main=WARN修改為log4j.logger.org.apache.spark.repl.Main=INFO,然后我們再次運行spark-shell,將打印出更豐富的信息,如圖2所示。
圖2 ?Spark啟動過程打印的部分信息
從圖2展示的啟動日志中我們可以看到SecurityManager、SparkEnv、BlockManagerMasterEndpoint、DiskBlockManager、MemoryStore、SparkUI、Executor、NettyBlockTransferService、BlockManager、BlockManagerMaster等信息。它們是做什么的?剛剛接觸Spark的讀者只需要知道這些信息即可,具體內容將在后邊的博文給出。
執行word count
??????這一節,我們通過word count這個耳熟能詳的例子來感受下Spark任務的執行過程。啟動spark-shell后,會打開Scala命令行,然后按照以下步驟輸入腳本:
步驟1 ???
??????輸入val lines =sc.textFile("../README.md", 2),以Spark安裝目錄下的README.md文件的內容作為word count例子的數據源,執行結果如圖3所示。
圖3 ? 步驟1執行結果
圖3告訴我們lines的實際類型是MapPartitionsRDD。
步驟2
?? ????textFile方法對文本文件是逐行讀取的,我們需要輸入val words =lines.flatMap(line => line.split(" ")),將每行文本按照空格分隔以得到每個單詞,執行結果如圖4所示。圖4 ? 步驟2執行結果
圖4告訴我們lines在經過flatMap方法的轉換后得到的words的實際類型也是MapPartitionsRDD。
步驟3
? ? ?對于得到的每個單詞,通過輸入val ones = words.map(w => (w,1)),將每個單詞的計數初始化為1,執行結果如圖5所示。圖5 ? 步驟3執行結果
圖5告訴我們words在經過map方法的轉換后得到的ones的實際類型也是MapPartitionsRDD。
步驟4
????輸入val counts = ones.reduceByKey(_ + _),對單詞進行計數值的聚合,執行結果如圖6所示。
圖6 ? 步驟4執行結果
圖6告訴我們ones在經過reduceByKey方法的轉換后得到的counts的實際類型是ShuffledRDD。
步驟5
???????輸入counts.foreach(println),將每個單詞的計數值打印出來,作業的執行過程如圖7和圖8所示。作業的輸出結果如圖9所示。
圖7 ? 步驟5執行過程第一部分
圖8 ?步驟5執行過程第二部分
圖7和圖8展示了很多作業提交、執行的信息,這里挑選關鍵的內容進行介紹:
- SparkContext為提交的Job生成的ID是0。
- 一共有四個RDD,被劃分為ResultStage和ShuffleMapStage。ShuffleMapStage的ID為0,嘗試號為0。ResultStage的ID為1,嘗試號也為0。在Spark中,如果Stage沒有執行完成,就會進行多次重試。Stage無論是首次執行還是重試都被視為是一次Stage嘗試(Stage Attempt),每次Attempt都有一個唯一的嘗試號(AttemptNumber)。
- 由于Job有兩個分區,所以ShuffleMapStage和ResultStage都有兩個Task被提交。每個Task也會有多次嘗試,因而也有屬于Task的嘗試號。從圖中看出ShuffleMapStage中的兩個Task和ResultStage中的兩個Task的嘗試號也都是0。
- HadoopRDD則用于讀取文件內容。
圖9 ?步驟5輸出結果
圖9展示了單詞計數的輸出結果和最后打印的任務結束的日志信息。
??? 筆者在本文介紹的word count例子是以SparkContext的API來實現的,讀者朋友們也可以選擇在spark-shell中通過運用SparkSession的API來實現。
有了對Spark的初次體驗,下面可以來分析下spark-shell的實現原理了,請看——《Spark2.1.0之剖析spark-shell》
關于《Spark內核設計的藝術 架構設計與實現》
經過近一年的準備,基于Spark2.1.0版本的《 Spark內核設計的藝術 架構設計與實現》一書現已出版發行,圖書如圖:紙質版售賣鏈接如下:
京東: https://item.jd.com/12302500.html 電子版售賣鏈接如下: 京東: https://e.jd.com/30389208.html
總結
以上是生活随笔為你收集整理的Spark2.1.0之初体验的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: scanf提取gprmc数据
- 下一篇: 【HTTP】Fiddler(二) - 使