通过案例对SparkStreaming透彻理解-3
2019獨角獸企業重金招聘Python工程師標準>>>
本期內容:
解密Spark Streaming Job架構和運行機制
解密Spark Streaming 容錯架構和運行機制
?
一切不能進行實時流處理的數據都是無效的數據。在流處理時代,SparkStreaming有著強大吸引力,而且發展前景廣闊,加之Spark的生態系統,Streaming可以方便調用其他的諸如SQL,MLlib等強大框架,它必將一統天下。
Spark Streaming運行時與其說是Spark Core上的一個流式處理框架,不如說是Spark Core上的一個最復雜的應用程序。如果可以掌握Spark streaming這個復雜的應用程序,那么其他的再復雜的應用程序都不在話下了。這里選擇Spark Streaming作為版本定制的切入點也是大勢所趨。
????本節課通過從job和容錯的整體架構上來考察Spark Streaming的運行機制。
用之前已有的最簡單的例子:
//?Socket來源的單詞計數 //?YY課堂:每天20:00現場授課頻道68917580 val?sparkConf?=?new?SparkConf().setMaster("local[2]").setAppName("StreamingWordCountSelfScala") val?ssc?=?new?StreamingContext(sparkConf,?Durations.seconds(5)) val?lines?=?ssc.socketTextStream("localhost",?9999) val?words?=?lines.flatMap(_.split("?")).map((_,?1)).reduceByKey(_?+?_) words.print() ssc.start()?
跟蹤源碼可以發現:
在初始化 StreamingContext時,創建了如下幾個對象:
//?StreamingContext.scala?line?183 private[streaming]?val?scheduler?=?new?JobScheduler(this)?
而JobScheduler在初始化的時候,會初始化jobGenerator,且包含receiverTracker。
//?JobScheduler.scala?line?50 private?val?jobGenerator?=?new?JobGenerator(this)?//?line?50 val?clock?=?jobGenerator.clock val?listenerBus?=?new?StreamingListenerBus()//?These?two?are?created?only?when?scheduler?starts. //?eventLoop?not?being?null?means?the?scheduler?has?been?started?and?not?stopped var?receiverTracker:?ReceiverTracker?=?null?//?56?
再看創建DStream的部分
//?StreamingContext.scala?line?327 def?socketTextStream(hostname:?String,port:?Int,storageLevel:?StorageLevel?=?StorageLevel.MEMORY_AND_DISK_SER_2):?ReceiverInputDStream[String]?=?withNamedScope("socket?text?stream")?{socketStream[String](hostname,?port,?SocketReceiver.bytesToLines,?storageLevel) }//?StreamingContext.scala?line?345 def?socketStream[T:?ClassTag](hostname:?String,port:?Int,converter:?(InputStream)?=>?Iterator[T],storageLevel:?StorageLevel):?ReceiverInputDStream[T]?=?{new?SocketInputDStream[T](this,?hostname,?port,?converter,?storageLevel)?//?line?351 }?
?
//?SocketInputDStream.scala?line?33 private[streaming] class?SocketInputDStream[T:?ClassTag](ssc_?:?StreamingContext,host:?String,port:?Int,bytesToObjects:?InputStream?=>?Iterator[T],storageLevel:?StorageLevel)?extends?ReceiverInputDStream[T](ssc_)?{//?這個方法是關鍵def?getReceiver():?Receiver[T]?=?{new?SocketReceiver(host,?port,?bytesToObjects,?storageLevel)} }?
再看 ssc.start
//?StreamingContext.scala?line?594 def?start():?Unit?=?synchronized?{state?match?{case?INITIALIZED?=>startSite.set(DStream.getCreationSite())StreamingContext.ACTIVATION_LOCK.synchronized?{StreamingContext.assertNoOtherContextIsActive()try?{validate()//?Start?the?streaming?scheduler?in?a?new?thread,?so?that?thread?local?properties//?like?call?sites?and?job?groups?can?be?reset?without?affecting?those?of?the//?current?thread.ThreadUtils.runInNewThread("streaming-start")?{sparkContext.setCallSite(startSite.get)sparkContext.clearJobGroup()sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL,?"false")scheduler.start()?//?line?610}state?=?StreamingContextState.ACTIVE}?catch?{case?NonFatal(e)?=>logError("Error?starting?the?context,?marking?it?as?stopped",?e)scheduler.stop(false)state?=?StreamingContextState.STOPPEDthrow?e}StreamingContext.setActiveContext(this)}shutdownHookRef?=?ShutdownHookManager.addShutdownHook(StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)//?Registering?Streaming?Metrics?at?the?start?of?the?StreamingContextassert(env.metricsSystem?!=?null)env.metricsSystem.registerSource(streamingSource)uiTab.foreach(_.attach())logInfo("StreamingContext?started")case?ACTIVE?=>logWarning("StreamingContext?has?already?been?started")case?STOPPED?=>throw?new?IllegalStateException("StreamingContext?has?already?been?stopped")} }?
第610行,調用了scheduler.start,scheduler就是之前初始化是產生的JobScheduler。
//?JobScheduler.scala?line?62 def?start():?Unit?=?synchronized?{if?(eventLoop?!=?null)?return?//?scheduler?has?already?been?startedlogDebug("Starting?JobScheduler")eventLoop?=?new?EventLoop[JobSchedulerEvent]("JobScheduler")?{override?protected?def?onReceive(event:?JobSchedulerEvent):?Unit?=?processEvent(event)override?protected?def?onError(e:?Throwable):?Unit?=?reportError("Error?in?job?scheduler",?e)}eventLoop.start()//?attach?rate?controllers?of?input?streams?to?receive?batch?completion?updatesfor?{inputDStream?<-?ssc.graph.getInputStreamsrateController?<-?inputDStream.rateController}?ssc.addStreamingListener(rateController)listenerBus.start(ssc.sparkContext)receiverTracker?=?new?ReceiverTracker(ssc)?//?line?80inputInfoTracker?=?new?InputInfoTracker(ssc)receiverTracker.start()jobGenerator.start()logInfo("Started?JobScheduler") }?
請看80行,將receiverTracker初始化:
//?ReceiverTracker.scala?line?101 private[streaming] class?ReceiverTracker(ssc:?StreamingContext,?skipReceiverLaunch:?Boolean?=?false)?extends?Logging?{private?val?receiverInputStreams?=?ssc.graph.getReceiverInputStreams()private?val?receiverInputStreamIds?=?receiverInputStreams.map?{?_.id?}private?val?receivedBlockTracker?=?new?ReceivedBlockTracker(ssc.sparkContext.conf,ssc.sparkContext.hadoopConfiguration,receiverInputStreamIds,ssc.scheduler.clock,ssc.isCheckpointPresent,Option(ssc.checkpointDir))?
調用receiverTracker.start和jobGenerator.star
//?ReceiverTracker.scala?line?148 /**?Start?the?endpoint?and?receiver?execution?thread.?*/ def?start():?Unit?=?synchronized?{if?(isTrackerStarted)?{throw?new?SparkException("ReceiverTracker?already?started")}if?(!receiverInputStreams.isEmpty)?{endpoint?=?ssc.env.rpcEnv.setupEndpoint("ReceiverTracker",?new?ReceiverTrackerEndpoint(ssc.env.rpcEnv))if?(!skipReceiverLaunch)?launchReceivers()?//?line?157logInfo("ReceiverTracker?started")trackerState?=?Started} }?
launchReceivers()
//?ReceiverTracker.scala?line?413 private?def?launchReceivers():?Unit?=?{val?receivers?=?receiverInputStreams.map(nis?=>?{val?rcvr?=?nis.getReceiver()?//?這個就是SocketInputDStream.getReceiver(),本例中是SocketReceiver?,見SocketInputDStream.scala?line?34rcvr.setReceiverId(nis.id)rcvr})runDummySparkJob()logInfo("Starting?"?+?receivers.length?+?"?receivers")endpoint.send(StartAllReceivers(receivers))?//?line?423 }?
看看StartAllReceivers是如何被消費的?
//?ReceiverTracker.scala?line?448 //?Local?messages case?StartAllReceivers(receivers)?=>val?scheduledLocations?=?schedulingPolicy.scheduleReceivers(receivers,?getExecutors)?//?盡量負載均勻for?(receiver?<-?receivers)?{val?executors?=?scheduledLocations(receiver.streamId)updateReceiverScheduledExecutors(receiver.streamId,?executors)receiverPreferredLocations(receiver.streamId)?=?receiver.preferredLocationstartReceiver(receiver,?executors)?//?啟動接收器,不再進一步深究,有興趣的可以繼續查看源碼}?
再回到JobScheduler.scala line 83,jobGenerator.start
//?JobGenerator.scala?line?79 def?start():?Unit?=?synchronized?{if?(eventLoop?!=?null)?return?//?generator?has?already?been?started//?Call?checkpointWriter?here?to?initialize?it?before?eventLoop?uses?it?to?avoid?a?deadlock.//?See?SPARK-10125checkpointWritereventLoop?=?new?EventLoop[JobGeneratorEvent]("JobGenerator")?{override?protected?def?onReceive(event:?JobGeneratorEvent):?Unit?=?processEvent(event)override?protected?def?onError(e:?Throwable):?Unit?=?{jobScheduler.reportError("Error?in?job?generator",?e)}}eventLoop.start()if?(ssc.isCheckpointPresent)?{restart()}?else?{startFirstTime()} }?
至此消息接收和Job生成器已啟動。
?
在StreamingContext調用start方法的內部其實是會啟動JobScheduler的Start方法,進行消息循環,在JobScheduler的start內部會構造JobGenerator和ReceiverTacker,并且調用JobGenerator和ReceiverTacker的start方法
?
1.JobGenerator啟動后會不斷的根據batchDuration生成一個個的Job
?
2.ReceiverTracker啟動后首先在Spark Cluster中啟動Receiver(其實是在Executor中先啟動ReceiverSupervisor),在Receiver收到數據后會通過ReceiverSupervisor存儲到Executor并且把數據的Metadata信息發送給Driver中的ReceiverTracker,在ReceiverTracker內部會通過ReceivedBlockTracker來管理接受到的元數據信息
?
每個BatchInterval會產生一個具體的Job,其實這里的Job不是Spark Core中所指的Job,它只是基于DStreamGraph而生成的RDD的DAG而已,從Java角度講,相當于Runnable接口實例,此時要想運行Job需要提交給JobScheduler,在JobScheduler中通過線程池的方式找到一個單獨的線程來提交Job到集群運行(其實是在線程中基于RDD的Action觸發真正的作業的運行)。
?
為什么使用線程池呢?
?
1.作業不斷生成,所以為了提升效率,我們需要線程池;這和在Executor中通過線程池執行Task有異曲同工之妙;
?
? 2.有可能設置了Job的FAIR公平調度的方式,這個時候也需要多線程的支持。
?
第二部分:從容錯架構的角度透視Spark Streaming
?
我們知道DStream與RDD的關系就是隨著時間流逝不斷的產生RDD,對DStream的操作就是在固定時間上操作RDD。所以從某種意義上而言,Spark Streaming的基于DStream的容錯機制,實際上就是劃分到每一次形成的RDD的容錯機制,這也是Spark Streaming的高明之處。
?
RDD作為 分布式彈性數據集,它的彈性主要體現在:
?
1.自動的分配內存和硬盤,優先基于內存
?
2.基于lineage容錯機制
?
3.task會指定次數的重試
?
4.stage失敗會自動重試
?
5.checkpoint和persist 復用
?
6.數據調度彈性:DAG,TASK和資源管理無關。
?
7.數據分片的高度彈性
?
基于RDD的特性,它的容錯機制主要就是兩種:一是checkpoint,二是基于lineage(血統)的容錯。一般而言,spark選擇血統容錯,因為對于大規模的數據集,做檢查點的成本很高。但是有的情況下,不如說lineage鏈條過于復雜和冗長,這時候就需要做checkpoint。
?
考慮到RDD的依賴關系,每個stage內部都是窄依賴,此時一般基于lineage容錯,方便高效。在stage之間,是寬依賴,產生了shuffle操作,這種情況下,做檢查點則更好。總結來說,stage內部做lineage,stage之間做checkpoint。
后續的會有什么更深的內幕?且聽下回分解。
轉載于:https://my.oschina.net/corleone/blog/669520
與50位技術專家面對面20年技術見證,附贈技術全景圖總結
以上是生活随笔為你收集整理的通过案例对SparkStreaming透彻理解-3的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Swift 总结使用问号(?)和感叹号
- 下一篇: linux下mysql中文乱码