spark广播变量 和 累加器
1 為什么使用廣播變量 和 累加器
變量存在的問題:在spark程序中,當一個傳遞給Spark操作(例如map和reduce)的函數(shù)在遠程節(jié)點上面運行時,Spark操作實際上操作的是這個函數(shù)所用變量的一個獨立副本。這些變量會被復(fù)制到每臺機器上,并且這些變量在遠程機器上的所有更新都不會傳遞回驅(qū)動程序,通常跨任務(wù)的讀寫變量是低效的。
廣播變量的目的就是解決變量存在的問題,變量聲明為廣播變量,那么知識每個executor擁有一份,這個executor啟動的task會共享這個變量,節(jié)省了通信的成本和服務(wù)器的資源。
總的來說:累加器是用來對信息進行聚合,廣播變量是用來分發(fā)較大的只讀對象。
?
2 如何定義? 和? 還原? 廣播變量
int a = 3; Broadcast<Integer> broadcast = sc.broadcast(a); //定義廣播變量int c = broadcast.value; //還原廣播變量
?
3 廣播變量注意事項
(1)變量一旦被定義為一個廣播變量,那么這個變量只能讀,不能修改
(2)能不能將一個RDD使用廣播變量廣播出去?
?????? 不能,因為RDD是不存儲數(shù)據(jù)的。可以將RDD的結(jié)果廣播出去。
(3) 廣播變量只能在Driver端定義,不能在Executor端定義。
(4) 在Driver端可以修改廣播變量的值,在Executor端無法修改廣播變量的值。
(5)如果executor端用到了Driver的變量,如果不使用廣播變量在Executor有多少task就有多少Driver端的變量副本。
(6)如果Executor端用到了Driver的變量,如果使用廣播變量在每個Executor中只有一份Driver端的變量副本。
?
4 廣播變量的優(yōu)化
當廣播一個比較大的值時,選擇既快又好的序列化格式是很重要的。因為如果序列化對象的時間很長或者傳送時間太久,這段時間很容易出現(xiàn)性能瓶頸。
默認情況下,spark會使用java內(nèi)建的序列化庫。建議選擇kryo序列化工具,使用方法設(shè)置spark.serializer為org.apache.spark.serializer.KryoSerializer;
最好強制要求這種注冊,設(shè)置spark.kryo.registrationRequired為true;
SparkConf conf = new SparkConf();conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer");conf.set("spark.kryo.registrationRequired","true");conf.registerKryoClasses(Array(classOf[myClass]),classOf(MyOtherClass));這樣還會有其他的問題,如果代碼中引用的類沒有序列化,會報異常,最簡單的方式是實現(xiàn)序列化接口。
?
5 累加器和定義和還原
累加器只是一個只寫變量
LongAccumulator accumulator = new LongAccumulator();accumulator.add(1);long count = accumulator.count();?
?
參考文獻:扎心了,老鐵
轉(zhuǎn)載于:https://www.cnblogs.com/parent-absent-son/p/9956574.html
總結(jié)
以上是生活随笔為你收集整理的spark广播变量 和 累加器的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: removeTask
- 下一篇: 激活prompt