hadoop源码分析_Spark2.x精通:Job触发流程源码深度剖析(一)
,? ??
一、概述????
????之前幾篇文章對Spark集群的Master、Worker啟動流程進行了源碼剖析,后面直接從客戶端角度出發(fā),講解了spark-submit任務提交過程及driver的啟動;集群啟動、任務提交、SparkContext初始化等前期準備工作完成之后,后面就是我們的主函數(shù)的代碼Job如何觸發(fā)的,本篇文章還是結(jié)合源碼進行剖析。
????軟件版本:
????????spark2.2.0
二、Job觸發(fā)流程源碼剖析
1. 我們先上一段最簡單的代碼,讀取本地文件進行WordCount,并打印統(tǒng)計結(jié)果,代碼如下:
package com.hadoop.ljs.spark220.study;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import org.apache.spark.api.java.function.VoidFunction;import org.apache.spark.sql.SparkSession;import scala.Tuple2;import java.util.Arrays;import java.util.Iterator;/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-03-12 08:26 * @version: v1.0 * @description: com.hadoop.ljs.spark220.study */public class Example1 { public static void main(String[] args) throws Exception{ /*spark環(huán)境初始化*/ SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("Example1"); SparkSession sc = SparkSession.builder().config(sparkConf).getOrCreate(); JavaSparkContext jsc = new JavaSparkContext(sc.sparkContext()); /*讀取本地文件*/ JavaRDD<String> sourceRDD = jsc.textFile("D:\\kafkaSSL\\kafka_client_jaas.conf"); /*轉(zhuǎn)換多維為一維數(shù)組*/ JavaRDD<String> words = sourceRDD.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String s) { return Arrays.asList(s.split(" ")).iterator(); } }); /*轉(zhuǎn)換成(hello,1)格式*/ JavaPairRDD<String, Integer> wordOne = words.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) { return new Tuple2<String, Integer>(s, 1); }????????}); /*根據(jù)key進行聚合*/ JavaPairRDD<String, Integer> wordCount = wordOne.reduceByKey(new Function2() { @Override public Integer call(Integer v1, Integer v2) { return v1+v2; } }); /*打印結(jié)果*/ wordCount.foreach(new VoidFunctionString, Integer>>() { @Override public void call(Tuple2<String, Integer> result){ System.out.println("word: "+result._1+" count: "+result._2); } }); }}????我們一行行的進行分析,首先看讀取本地文件textFile()函數(shù):
??/*這里直接調(diào)用的SparkContext的textFile函數(shù)*/ def textFile(path: String): JavaRDD[String] = sc.textFile(path)2. 直接看sc.textFile()函數(shù):
def textFile( path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = withScope { assertNotStopped()????/*這里調(diào)用了hadoopFile函數(shù),傳入三個,寫過Mapreuce的時候都知道?第二個參數(shù)就是Map的輸入格式化類型,參數(shù)3是行號?4是一行的內(nèi)容*/????/*hadoopFile()函數(shù),返回了一個HadoopRDD*/????hadoopFile(path,?classOf[TextInputFormat],?classOf[LongWritable],?classOf[Text],????minPartitions).map(pair?=>?pair._2.toString).setName(path) }看hadoopFile()函數(shù)
def hadoopFile[K, V]( path: String, inputFormatClass: Class[_ <: inputformat v> keyClass: Class[K], valueClass: Class[V], minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {????assertNotStopped() // This is a hack to enforce loading hdfs-site.xml. // See SPARK-11227 for details.????FileSystem.getLocal(hadoopConfiguration)????//這里把hadoopConfiguration配置做了一個廣播變量 val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))????/*?傳入一個jobConf對輸入數(shù)據(jù)進行格式化*/ val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)????/*?返回一個HadoopRDD實例,這里Hadoop配置文件是以廣播變量的方式傳進去的*/????/*廣播變量?每個Worker保存一份,被多個Executor共享*/????/*HadoopRDD繼承自RDD*/ new HadoopRDD( this, confBroadcast, Some(setInputPathsFunc), inputFormatClass, keyClass, valueClass, minPartitions).setName(path) }????上面直接對HadopRDD做了一個map轉(zhuǎn)換,這里Hadoop繼承自RDD,調(diào)用的是RDD里面的map()函數(shù),我們直接看看map函數(shù)代碼:
??/*?最后其實是返回了一個MapPartitionsRDD,里面是(key,value),key是行號,value是內(nèi)容*/ def map[U: ClassTag](f: T => U): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF)) }?上面對返回的RDD是一個鍵值對,然后.map(pair?=>?pair._2.toString對其進行了轉(zhuǎn)換,其實就是去掉了那個key行號,剩下的是一個vlaue數(shù)組,里面是每行的內(nèi)容,至此textFile這一行剖析完畢。
3.主函數(shù)的第30-42行都是對RDD進行了一系列的轉(zhuǎn)換,其實都是調(diào)用RDD.scala中的內(nèi)容對MapPartitionsRDD進行的轉(zhuǎn)換,有興趣你可以跟進去看一下,比較簡單:
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF)) }??/*?mapToPair函數(shù)里面其實是調(diào)用的rdd.map函數(shù),剛才上面已經(jīng)說過了*/ def mapToPair[K2, V2](f: PairFunction[T, K2, V2]): JavaPairRDD[K2, V2] = { def cm: ClassTag[(K2, V2)] = implicitly[ClassTag[(K2, V2)]] new JavaPairRDD(rdd.map[(K2, V2)](f)(cm))(fakeClassTag[K2], fakeClassTag[V2]) }4.最后調(diào)用reduceBykey進行了聚合,這里就比較重要了,我們之前講過一個spark任務里面會有多個job,job的劃分依據(jù)是action,有幾個action就有幾個job,而每個job的劃分依據(jù)是shuffle,只要發(fā)生了shuffle就會有新的stage生成,reduceBykey是個action操作,RDD中沒有這個函數(shù),是通過里面的隱式轉(zhuǎn)換調(diào)用了PairRDDFunctions.scala中的reduceBykey()函數(shù),里面的轉(zhuǎn)換先不用管,因為涉及到shuffle操作,會有新的stage的生成,這里先略過:
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope { combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)??}5.?最后主函數(shù)調(diào)用了wordCount.foreach()進行了結(jié)果打印,這是一個action操作,有幾個action就會提交幾個job,直接去看代碼:
def foreach(f: T => Unit): Unit = withScope { val cleanF = sc.clean(f)????/*這里是執(zhí)行了runJob,跟其他操作不一樣,這里會提交一個job*/ sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF)) }????跟進代碼,里面調(diào)用了SparkContext.scala中的函數(shù):
def runJob[T, U: ClassTag]( rdd: RDD[T], func: Iterator[T] => U, partitions: Seq[Int]): Array[U] = {??????//這里clean函數(shù)其實直接輸出????val?cleanedFunc?=?clean(func)????runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions) }????跟進了好幾層,最后看runJob干了啥:
def runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit): Unit = { if (stopped.get()) { throw new IllegalStateException("SparkContext has been shutdown") } val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite.shortForm) if (conf.getBoolean("spark.logLineage", false)) { logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString) }????//SparkContext初始化的dagScheduler調(diào)用runJob函數(shù)比較任務,這樣就跟之前SparkContext源碼剖析內(nèi)容聯(lián)系在一起了 dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get) progressBar.foreach(_.finishAll()) rdd.doCheckpoint() }6.上面調(diào)用了DAGScheduler中的runJob函數(shù),這個DAGScheduler是我們在SparkContext初始化的時候執(zhí)行的初始化,DAGSCheduler主要工作:創(chuàng)建Job,推斷出每一個Job的stage劃分(DAG),跟蹤RDD,實體化stage的輸出,調(diào)度job,將stage以taskSet的形式提交給TaskScheduler的實現(xiàn)類,在集群上運運行,其中,TaskSet是一組可以立即運行的獨立task,基于集群上已存在的數(shù)據(jù),直接看下代碼:
def runJob[T, U]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: CallSite, resultHandler: (Int, U) => Unit, properties: Properties): Unit = { val start = System.nanoTime????/* 這里就一行比較重要,這里調(diào)用submitJob進行提交 */ val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties) ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)????//?下面這些就是任務結(jié)果的一些判斷了 waiter.completionFuture.value.get match { case scala.util.Success(_) => logInfo("Job %d finished: %s, took %f s".format (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) case scala.util.Failure(exception) => logInfo("Job %d failed: %s, took %f s".format (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9)) // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler. val callerStackTrace = Thread.currentThread().getStackTrace.tail exception.setStackTrace(exception.getStackTrace ++ callerStackTrace) throw exception } }????下面就是調(diào)用了submitJob進行任務的提交,代碼如下:
def submitJob[T, U]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], callSite: CallSite, resultHandler: (Int, U) => Unit, properties: Properties): JobWaiter[U] = {????//?這里確認我們提交的Partition存在 val maxPartitions = rdd.partitions.length partitions.find(p => p >= maxPartitions || p < 0).foreach { p => throw new IllegalArgumentException( "Attempting to access a non-existent partition: " + p + ". " + "Total number of partitions: " + maxPartitions) } val jobId = nextJobId.getAndIncrement() if (partitions.size == 0) { // Return immediately if the job is running 0 tasks return new JobWaiter[U](this, jobId, 0, resultHandler) } assert(partitions.size > 0) val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _] val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)????//這里會觸發(fā)DAGSchedulerEventProcessLoop的JobSubmitted,他里面onReceive()函數(shù)????//接收消息進行處理,這里調(diào)用的是JobSubmitted,觸發(fā)dagScheduler.handleJobSubmitted????//函數(shù)進行處理 eventProcessLoop.post(JobSubmitted( jobId, rdd, func2, partitions.toArray, callSite, waiter, SerializationUtils.clone(properties))) waiter }下面就是調(diào)用handleJobSubmitted()函數(shù)進行處理,它是DAGSchduler的job調(diào)度核心入口,代碼如下:
private[scheduler] def handleJobSubmitted(jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _, partitions: Array[Int], callSite: CallSite, listener: JobListener, properties: Properties) { // var finalStage: ResultStage = null????try?{??????//使用觸發(fā)job的最后一個rdd,創(chuàng)建stage??????//當hdfs上的文件被刪除的時候??stage可能創(chuàng)建失敗 finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) } catch { case e: Exception => logWarning("Creating new stage failed due to exception - job: " + jobId, e) listener.jobFailed(e) return }????//通過finalStage創(chuàng)創(chuàng)建一個job, val job = new ActiveJob(jobId, finalStage, callSite, listener, properties) clearCacheLocs() logInfo("Got job %s (%s) with %d output partitions".format( job.jobId, callSite.shortForm, partitions.length)) logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")") logInfo("Parents of final stage: " + finalStage.parents) logInfo("Missing parents: " + getMissingParentStages(finalStage)) val jobSubmissionTime = clock.getTimeMillis()????//將job加入到activeJob緩存中 jobIdToActiveJob(jobId) = job activeJobs += job finalStage.setActiveJob(job) val stageIds = jobIdToStageIds(jobId).toArray val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) listenerBus.post( SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties)) //提交finalStage,但是finalStage肯定不會首先執(zhí)行,它要先執(zhí)行它的依賴stage submitStage(finalStage) }7.最后調(diào)用了submitStage進行了finalStage的提交,finalStage肯定不會首先執(zhí)行,它要先執(zhí)行它的依賴stage,這里面就涉及到了stage的換分了,代碼如下:
/** Submits stage, but first recursively submits any missing parents. */ private def submitStage(stage: Stage) { val jobId = activeJobForStage(stage) if (jobId.isDefined) { logDebug("submitStage(" + stage + ")") if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {????????//獲取stage對應的父stage,返回List[Stage]按id排序 val missing = getMissingParentStages(stage).sortBy(_.id) logDebug("missing: " + missing)????????//?如果父stage為空,則調(diào)用submitMissingTasks?提交stage, if (missing.isEmpty) { logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents") submitMissingTasks(stage, jobId.get) } else { for (parent ???????????//?如果父stage不為空,則調(diào)用submitStage?提交父stage submitStage(parent) }??????????//并將stage放入等待的隊列中,先去執(zhí)行父stage waitingStages += stage } } } else { abortStage(stage, "No active job for stage " + stage.id, None) } }? ?我們看下getMissingParentStages()函數(shù),如何進行stage劃分的,代碼如下:
?//大體劃分流程:遍歷rdd的所有的依賴,如果是ShufDep,則通過getShuffleMapStage獲取stage, // 并加入到missing隊列中。如果是窄依賴的話,將放入waitingForVisit的棧中。 private def getMissingParentStages(stage: Stage): List[Stage] = { val missing = new HashSet[Stage] val visited = new HashSet[RDD[_]] // We are manually maintaining a stack here to prevent StackOverflowError // caused by recursively visiting val waitingForVisit = new Stack[RDD[_]] def visit(rdd: RDD[_]) { if (!visited(rdd)) { visited += rdd val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil) if (rddHasUncachedPartitions) { for (dep dep match {????????????//如果shufDep也就是我們說的寬依賴 case shufDep: ShuffleDependency[_, _, _] =>??????????????//寬依賴,則創(chuàng)建一個shuffleStage,即finalStage之前的stage是shuffle?stage val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId) if (!mapStage.isAvailable) {?????????????????//加入到missing隊列,返回 missing += mapStage } //如果narrowDep也就是我們說的窄依賴 case narrowDep: NarrowDependency[_] =>??????????????//加入等待隊列中 waitingForVisit.push(narrowDep.rdd) } } } } } waitingForVisit.push(stage.rdd) while (waitingForVisit.nonEmpty) { // 如果是窄依賴,將rdd放入棧中 visit(waitingForVisit.pop()) } missing.toList }??? submitStage()函數(shù)中如果父stage為空則,調(diào)用submitMissingTasks()函數(shù)進行提交,這個函數(shù)主要做了一下幾件事:
? ? a.首先獲取stage中沒有計算的partition;
? ? b.通過 taskIdToLocations(id) 方法進行tasks運行最佳位置的確定;
? ? c.調(diào)用taskScheduler的submitTasks進行任務的提交。
????至此,Spark任務Job觸發(fā)流程源碼深度剖析的第一部分講解完畢,后面會寫一遍文章專門講解submitMissingTasks()函數(shù)中task最佳位置的定位、task的提交具體流程,請繼續(xù)關注。
? ? 如果覺得我的文章能幫到您,請關注微信公眾號“大數(shù)據(jù)開發(fā)運維架構”,并轉(zhuǎn)發(fā)朋友圈,謝謝支持!!!
總結(jié)
以上是生活随笔為你收集整理的hadoop源码分析_Spark2.x精通:Job触发流程源码深度剖析(一)的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 美的美居app是干什么的
- 下一篇: python面向对象编程138讲_Pyt