2021年大数据Spark(十九):Spark Core的共享变量
目錄
共享變量
廣播變量
累加器
???????案例演示
共享變量
在默認情況下,當Spark在集群的多個不同節點的多個任務上并行運行一個函數時,它會把函數中涉及到的每個變量,在每個任務上都生成一個副本。但是,有時候需要在多個任務之間共享變量,或者在任務(Task)和任務控制節點(Driver Program)之間共享變量。
為了滿足這種需求,Spark提供了兩種類型的變量:
?1)、廣播變量Broadcast Variables
廣播變量用來把變量在所有節點的內存之間進行共享,在每個機器上緩存一個只讀的變量,而不是為機器上的每個任務都生成一個副本;
??2)、累加器Accumulators
累加器支持在所有不同節點之間進行累加計算(比如計數或者求和);
官方文檔:http://spark.apache.org/docs/2.4.5/rdd-programming-guide.html#shared-variables
?
???????廣播變量
廣播變量允許開發人員在每個節點(Worker or Executor)緩存只讀變量,而不是在Task之間傳遞這些變量。使用廣播變量能夠高效地在集群每個節點創建大數據集的副本。同時Spark還使用高效的廣播算法分發這些變量,從而減少通信的開銷。
?
可以通過調用sc.broadcast(v)創建一個廣播變量,該廣播變量的值封裝在v變量中,可使用獲取該變量value的方法進行訪問。
?
?
???????累加器
Spark提供的Accumulator,主要用于多個節點對一個變量進行共享性的操作。Accumulator只提供了累加的功能,即確提供了多個task對一個變量并行操作的功能。但是task只能對Accumulator進行累加操作,不能讀取Accumulator的值,只有Driver程序可以讀取Accumulator的值。創建的Accumulator變量的值能夠在Spark Web UI上看到,在創建時應該盡量為其命名。
?
Spark內置了三種類型的Accumulator,分別是LongAccumulator用來累加整數型,DoubleAccumulator用來累加浮點型,CollectionAccumulator用來累加集合元素。
當內置的Accumulator無法滿足要求時,可以繼承AccumulatorV2實現自定義的累加器。實現自定義累加器的步驟:
?第一步、繼承AccumulatorV2,實現相關方法;
?第二步、創建自定義Accumulator的實例,然后在SparkContext上注冊它;
官方提供實例如下:
?
???????案例演示
?????以詞頻統計WordCount程序為例,假設處理的數據如下所示,包括非單詞符合,統計數據詞頻時過濾非單詞的特殊符號并且統計總的格式。
?
實現功能:
?第一、過濾特殊字符
非單詞符合存儲列表List中
使用廣播變量廣播列表
?
?
?
?
?第二、累計統計非單詞符號出現次數
定義一個LongAccumulator累加器,進行計數
示例代碼:
?
package cn.itcast.coreimport org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.util.LongAccumulator
import org.apache.spark.{SparkConf, SparkContext}/*** 基于Spark框架使用Scala語言編程實現詞頻統計WordCount程序,將符號數據過濾,并統計出現的次數* -a. 過濾標點符號數據* 使用廣播變量* -b. 統計出標點符號數據出現次數* 使用累加器*/
object SparkSharedVariableTest {def main(args: Array[String]): Unit = {val sparkConf: SparkConf = new SparkConf().setAppName(this.getClass.getSimpleName.stripSuffix("$")).setMaster("local[*]")val sc: SparkContext = new SparkContext(sparkConf)sc.setLogLevel("WARN")// 讀取文件數據val datasRDD: RDD[String] = sc.textFile("data/input/words2.txt", minPartitions = 2)// 字典數據,只要有這些單詞就過濾: 特殊字符存儲列表List中val list: List[String] = List(",", ".", "!", "#", "$", "%")// 通過廣播變量 將列表list廣播到各個Executor內存中,便于多個Task使用val listBroadcast: Broadcast[List[String]] = sc.broadcast(list)// 定義累加器,記錄單詞為符號數據的個數val accumulator: LongAccumulator = sc.longAccumulator("mycounter")// 分割單詞,過濾數據val wordsRDD = datasRDD// 1)、過濾數據,去除空行數據.filter(line => line != null && line.trim.length > 0)// 2)、分割單詞.flatMap(_.trim.split("\\s+"))// 3)、過濾字典數據:符號數據.filter(word => {// 獲取符合列表?,從廣播變量中獲取列表list的值val listValue = listBroadcast.value// 判斷單詞是否為符號數據,如果是就過濾掉val isCharacter = listValue.contains(word)if (isCharacter) {// 如果單詞為符號數據,累加器加1accumulator.add(1L)}!isCharacter})val resultRDD: RDD[(String, Int)] = wordsRDD// 轉換為二元組.mapPartitions(iter => {iter.map((_, 1))})// 按照單詞聚合統計.reduceByKey(_+_)resultRDD.foreach(println)println(s"過濾符合數據的個數:${accumulator.value}")// 應用程序運行結束,關閉資源sc.stop()}
}
也可以通過WEB UI查看累加器的值
總結
以上是生活随笔為你收集整理的2021年大数据Spark(十九):Spark Core的共享变量的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 2021年大数据Spark(十八):Sp
- 下一篇: 2021年大数据Spark(二十):Sp