spark中的广播变量broadcast
Spark中的Broadcast處理
首先先來看一看broadcast的使用代碼:
val?values?=?List[Int](1,2,3)
val?broadcastValues?=?sparkContext.broadcast(values)
rdd.mapPartitions(iter?=>?{
??broadcastValues.getValue.foreach(println)
})
?
在上面的代碼中,首先生成了一個集合變量,把這個變量通過sparkContext的broadcast函數進行廣播,
最后在rdd的每個partition的迭代時,使用這個廣播變量.
?
接下來看看廣播變量的生成與數據的讀取實現部分:
def?broadcast[T:?ClassTag](value:?T):?Broadcast[T]?=?{
??assertNotStopped()
??if?(classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass))?{
這里要注意,使用broadcast時,不能直接對RDD進行broadcast的操作.
????//?This?is?a?warning?instead?of?an?exception?in?order?to?avoid?breaking
//???????user?programs?that
????//?might?have?created?RDD?broadcast?variables?but?not?used?them:
????logWarning("Can?not?directly?broadcast?RDDs;?instead,?call?collect()?and?"
??????+?"broadcast?the?result?(see?SPARK-5063)")
??}
?
通過broadcastManager中的newBroadcast函數來進行廣播.
??val?bc?=?env.broadcastManager.newBroadcast[T](value,?isLocal)
??val?callSite?=?getCallSite
??logInfo("Created?broadcast?"?+?bc.id?+?"?from?"?+?callSite.shortForm)
??cleaner.foreach(_.registerBroadcastForCleanup(bc))
??bc
}
?
在BroadcastManager中生成廣播變量的函數,這個函數直接使用的broadcastFactory的相應函數.
broadcastFactory的實例通過配置spark.broadcast.factory,
?????默認是TorrentBroadcastFactory.
def?newBroadcast[T:?ClassTag](value_?:?T,?isLocal:?Boolean):?Broadcast[T]?=?{
??broadcastFactory.newBroadcast[T](value_,?isLocal,?
???????nextBroadcastId.getAndIncrement())
}
?
在TorrentBroadcastFactory中生成廣播變量的函數:
在這里面,直接生成了一個TorrentBroadcast的實例.
override?def?newBroadcast[T:?ClassTag](value_?:?T,?isLocal:?Boolean,?id:?Long)
:?Broadcast[T]?=?{
??new?TorrentBroadcast[T](value_,?id)
}
?
TorrentBroadcast實例生成時的處理流程:
這里基本的代碼部分是直接寫入這個要廣播的變量,返回的值是這個變量所占用的block的個數.
Broadcast的block的大小通過spark.broadcast.blockSize配置.默認是4MB,
Broadcast的壓縮是否通過spark.broadcast.compress配置,默認是true表示啟用,默認情況下使用snappy的壓縮.
?
private?val?broadcastId?=?BroadcastBlockId(id)
/**?Total?number?of?blocks?this?broadcast?variable?contains.?*/
private?val?numBlocks:?Int?=?writeBlocks(obj)
?
接下來生成一個lazy的屬性,這個屬性僅僅有在詳細的使用時,才會運行,在實例生成時不運行(上面的演示樣例中的getValue.foreach時運行).
@transient?private?lazy?val?_value:?T?=?readBroadcastBlock()
override?protected?def?getValue()?=?{
??_value
}
?
看看實例生成時的writeBlocks的函數:
private?def?writeBlocks(value:?T):?Int?=?{
這里先把這個廣播變量保存一份到當前的task的storage中,這樣做是保證在讀取時,假設要使用這個廣播變量的task就是本地的task時,直接從blockManager中本地讀取.
??SparkEnv.get.blockManager.putSingle(broadcastId,?value,?
StorageLevel.MEMORY_AND_DISK,
????tellMaster?=?false)
?
這里依據block的設置大小,對value進行序列化/壓縮分塊,每個塊的大小為blocksize的大小,
??val?blocks?=
????TorrentBroadcast.blockifyObject(value,?blockSize,?SparkEnv.get.serializer,?
????compressionCodec)
?
這里把序列化并壓縮分塊后的blocks進行迭代,存儲到blockManager中,
??blocks.zipWithIndex.foreach?{?case?(block,?i)?=>
????SparkEnv.get.blockManager.putBytes(
??????BroadcastBlockId(id,?"piece"?+?i),
??????block,
??????StorageLevel.MEMORY_AND_DISK_SER,
??????tellMaster?=?true)
??}
這個函數的返回值是一個int類型的值,這個值就是序列化壓縮存儲后block的個數.
??blocks.length
}
?
在我們的演示樣例中,使用getValue時,會運行實例初始化時定義的lazy的函數readBroadcastBlock:
private?def?readBroadcastBlock():?T?=?Utils.tryOrIOException?{
??TorrentBroadcast.synchronized?{
????setConf(SparkEnv.get.conf)
這里先從local端的blockmanager中直接讀取storage中相應此廣播變量的內容,假設能讀取到,表示這個廣播變量已經讀取過來或者說這個task就是廣播的本地executor.
????SparkEnv.get.blockManager.getLocal(broadcastId).map(_.data.next())?match?{
??????case?Some(x)?=>
????????x.asInstanceOf[T]
以下這部分運行時,表示這個廣播變量在當前的executor中是第一次讀取,通過readBlocks函數去讀取這個廣播變量的全部的blocks,反序列化后,直接把這個廣播變量存儲到本地的blockManager中,下次讀取時,就能夠直接從本地進行讀取.
??????case?None?=>
????????logInfo("Started?reading?broadcast?variable?"?+?id)
????????val?startTimeMs?=?System.currentTimeMillis()
????????val?blocks?=?readBlocks()
????????logInfo("Reading?broadcast?variable?"?+?id?+?"?took"?+?
??????????????Utils.getUsedTimeMs(startTimeMs))
????????val?obj?=?TorrentBroadcast.unBlockifyObject[T](
??????????blocks,?SparkEnv.get.serializer,?compressionCodec)
????????//?Store?the?merged?copy?in?BlockManager?so?other?tasks?on?this?executor?don't
????????//?need?to?re-fetch?it.
????????SparkEnv.get.blockManager.putSingle(
??????????broadcastId,?obj,?StorageLevel.MEMORY_AND_DISK,?tellMaster?=?false)
????????obj
????}
??}
}
?
最后再看看readBlocks函數的處理流程:
private?def?readBlocks():?Array[ByteBuffer]?=?{
這里定義的變量用于存儲讀取到的block的信息,numBlocks是廣播變量序列化后所占用的block的個數.
??val?blocks?=?new?Array[ByteBuffer](numBlocks)
??val?bm?=?SparkEnv.get.blockManager
這里開始迭代讀取每個block的內容,這里的讀取是先從local中進行讀取,假設local中沒有讀取到數據時,通過blockManager讀取遠端的數據,通過讀取這個block相應的location從這個location去讀取這個block的內容,并存儲到本地的blockManager中.最后,這個函數返回讀取到的blocks的集合.
??for?(pid?<-?Random.shuffle(Seq.range(0,?numBlocks)))?{
????val?pieceId?=?BroadcastBlockId(id,?"piece"?+?pid)
????logDebug(s"Reading?piece?$pieceId?of?$broadcastId")
????def?getLocal:?Option[ByteBuffer]?=?bm.getLocalBytes(pieceId)
????def?getRemote:?Option[ByteBuffer]?=?bm.getRemoteBytes(pieceId).map?{?block?=>
??????SparkEnv.get.blockManager.putBytes(
????????pieceId,
????????block,
????????StorageLevel.MEMORY_AND_DISK_SER,
????????tellMaster?=?true)
??????block
????}
????val?block:?ByteBuffer?=?getLocal.orElse(getRemote).getOrElse(
??????throw?new?SparkException(s"Failed?to?get?$pieceId?of?$broadcastId"))
????blocks(pid)?=?block
??}
??blocks
}
總結
以上是生活随笔為你收集整理的spark中的广播变量broadcast的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: kill -0 pid是做什么用的?
- 下一篇: 我们工作的意义到底在哪?