Spark SQL Catalyst源代码分析之TreeNode Library
? ??/**?Spark SQL源代碼分析系列文章*/
? ? 前幾篇文章介紹了Spark SQL的Catalyst的核心執(zhí)行流程、SqlParser,和Analyzer,本來打算直接寫Optimizer的,可是發(fā)現(xiàn)忘記介紹TreeNode這個Catalyst的核心概念,介紹這個能夠更好的理解Optimizer是怎樣對Analyzed Logical Plan進行優(yōu)化的生成Optimized Logical Plan,本文就將TreeNode基本架構進行解釋。
? ??
一、TreeNode類型
? ?TreeNode Library是Catalyst的核心類庫,語法樹的構建都是由一個個TreeNode組成。TreeNode本身是一個BaseType <: TreeNode[BaseType] 的類型,而且實現(xiàn)了Product這個trait,這樣能夠存放異構的元素了。? ?TreeNode有三種形態(tài):BinaryNode、UnaryNode、Leaf Node.?
? ?在Catalyst里,這些Node都是繼承自Logical Plan,能夠說每個TreeNode節(jié)點就是一個Logical Plan(包括Expression)(直接繼承自TreeNode)
? ?主要繼承關系類圖例如以下:
?1、BinaryNode?
二元節(jié)點,即有左右孩子的二叉節(jié)點
[[TreeNode]] that has two children, [[left]] and [[right]]. trait BinaryNode[BaseType <: TreeNode[BaseType]] {def left: BaseTypedef right: BaseTypedef children = Seq(left, right) } abstract class BinaryNode extends LogicalPlan with trees.BinaryNode[LogicalPlan] {self: Product => }?節(jié)點定義比較簡單,左孩子,右孩子都是BaseType。 children是一個Seq(left, right)以下列出主要繼承二元節(jié)點的類,能夠當查詢手冊用 :)
這里提示下尋經常常使用的二元節(jié)點:Join和Union
?2、UnaryNode
?一元節(jié)點,即僅僅有一個孩子節(jié)點
經常使用的二元節(jié)點有,Project,Subquery,Filter,Limit ...等
3、Leaf Node?
葉子節(jié)點,沒有孩子節(jié)點的節(jié)點。
提示經常使用的葉子節(jié)點: Command類系列,一些Funtion函數(shù),以及Unresolved Relation...etc.
二、TreeNode 核心方法
簡介一個TreeNode這個類的屬性和方法? currentId
? 一顆樹里的TreeNode有個唯一的id,類型是java.util.concurrent.atomic.AtomicLong原子類型。
? 推斷2個實例是否是同一個的時候,僅僅須要推斷TreeNode的id。
def sameInstance(other: TreeNode[_]): Boolean = {this.id == other.id}? fastEquals,更經常使用的一個快捷的判定方法,沒有重寫Object.Equals,這樣防止scala編譯器生成case class equals 方法
def fastEquals(other: TreeNode[_]): Boolean = {sameInstance(other) || this == other}? map,flatMap,collect都是遞歸的對子節(jié)點進行應用PartialFunction,其他方法還有非常多,篇幅有限這里不一一描寫敘述了。
2.1、核心方法 transform 方法
? transform該方法接受一個PartialFunction,就是就是前一篇文章Analyzer里提到的Batch里面的Rule。? 是會將Rule迭代應用到該節(jié)點的全部子節(jié)點,最后返回這個節(jié)點的副本(一個和當前節(jié)點不同的節(jié)點,后面會介紹,事實上就是利用反射來返回一個改動后的節(jié)點)。
? 假設rule沒有對一個節(jié)點進行PartialFunction的操作,就返回這個節(jié)點本身。
? 來看一個樣例:
object GlobalAggregates extends Rule[LogicalPlan] {def apply(plan: LogicalPlan): LogicalPlan = plan transform { //apply方法這里調用了logical plan(TreeNode) 的transform方法來應用一個PartialFunction。case Project(projectList, child) if containsAggregates(projectList) =>Aggregate(Nil, projectList, child)}def containsAggregates(exprs: Seq[Expression]): Boolean = {exprs.foreach(_.foreach {case agg: AggregateExpression => return truecase _ =>})false}}?這種方法真正的調用是transformChildrenDown,這里提到了用先序遍歷來對子節(jié)點進行遞歸的Rule應用。?假設在對當前節(jié)點應用rule成功,改動后的節(jié)點afterRule,來對其children節(jié)點進行rule的應用。
?transformDown方法:
/*** Returns a copy of this node where `rule` has been recursively applied to it and all of its* children (pre-order). When `rule` does not apply to a given node it is left unchanged.* @param rule the function used to transform this nodes children*/def transformDown(rule: PartialFunction[BaseType, BaseType]): BaseType = {val afterRule = rule.applyOrElse(this, identity[BaseType])// Check if unchanged and then possibly return old copy to avoid gc churn.if (this fastEquals afterRule) {transformChildrenDown(rule) //改動前節(jié)點this.transformChildrenDown(rule)} else {afterRule.transformChildrenDown(rule) //改動后節(jié)點進行transformChildrenDown}}? 最重要的方法transformChildrenDown:? 對children節(jié)點進行遞歸的調用PartialFunction,利用終于返回的newArgs來生成一個新的節(jié)點,這里調用了makeCopy()來生成節(jié)點。
?transformChildrenDown方法:
/*** Returns a copy of this node where `rule` has been recursively applied to all the children of* this node. When `rule` does not apply to a given node it is left unchanged.* @param rule the function used to transform this nodes children*/def transformChildrenDown(rule: PartialFunction[BaseType, BaseType]): this.type = {var changed = falseval newArgs = productIterator.map {case arg: TreeNode[_] if children contains arg =>val newChild = arg.asInstanceOf[BaseType].transformDown(rule) //遞歸子節(jié)點應用ruleif (!(newChild fastEquals arg)) {changed = truenewChild} else {arg}case Some(arg: TreeNode[_]) if children contains arg =>val newChild = arg.asInstanceOf[BaseType].transformDown(rule)if (!(newChild fastEquals arg)) {changed = trueSome(newChild)} else {Some(arg)}case m: Map[_,_] => mcase args: Traversable[_] => args.map {case arg: TreeNode[_] if children contains arg =>val newChild = arg.asInstanceOf[BaseType].transformDown(rule)if (!(newChild fastEquals arg)) {changed = truenewChild} else {arg}case other => other}case nonChild: AnyRef => nonChildcase null => null}.toArrayif (changed) makeCopy(newArgs) else this //依據(jù)作用結果返回的newArgs數(shù)組,反射生成新的節(jié)點副本。}??makeCopy方法,反射生成節(jié)點副本?? /*** Creates a copy of this type of tree node after a transformation.* Must be overridden by child classes that have constructor arguments* that are not present in the productIterator.* @param newArgs the new product arguments.*/def makeCopy(newArgs: Array[AnyRef]): this.type = attachTree(this, "makeCopy") {try {val defaultCtor = getClass.getConstructors.head //反射獲取默認構造函數(shù)的第一個if (otherCopyArgs.isEmpty) {defaultCtor.newInstance(newArgs: _*).asInstanceOf[this.type] //反射生成當前節(jié)點類型的節(jié)點} else {defaultCtor.newInstance((newArgs ++ otherCopyArgs).toArray: _*).asInstanceOf[this.type] //假設還有其他參數(shù),++}} catch {case e: java.lang.IllegalArgumentException =>throw new TreeNodeException(this, s"Failed to copy node. Is otherCopyArgs specified correctly for $nodeName? "+ s"Exception message: ${e.getMessage}.")}}三、TreeNode實例
如今準備從一段sql來出發(fā),畫一下這個spark sql的總體樹的transformation。 ?SELECT * FROM (SELECT * FROM src) a join (select * from src)b on a.key=b.key ?首先,我們先運行一下,在控制臺里看一下生成的計劃: <span style="font-size:12px;">sbt/sbt hive/console Using /usr/java/default as default JAVA_HOME. Note, this will be overridden by -java-home if it is set. [info] Loading project definition from /app/hadoop/shengli/spark/project/project [info] Loading project definition from /app/hadoop/shengli/spark/project [info] Set current project to root (in build file:/app/hadoop/shengli/spark/) [info] Starting scala interpreter... [info] import org.apache.spark.sql.catalyst.analysis._ import org.apache.spark.sql.catalyst.dsl._ import org.apache.spark.sql.catalyst.errors._ import org.apache.spark.sql.catalyst.expressions._ import org.apache.spark.sql.catalyst.plans.logical._ import org.apache.spark.sql.catalyst.rules._ import org.apache.spark.sql.catalyst.types._ import org.apache.spark.sql.catalyst.util._ import org.apache.spark.sql.execution import org.apache.spark.sql.hive._ import org.apache.spark.sql.hive.test.TestHive._ import org.apache.spark.sql.parquet.ParquetTestDatascala> val query = sql("SELECT * FROM (SELECT * FROM src) a join (select * from src)b on a.key=b.key")</span>3.1、UnResolve Logical Plan
第一步生成UnResolve Logical Plan 例如以下: scala> query.queryExecution.logical res0: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Project [*]Join Inner, Some(('a.key = 'b.key))Subquery aProject [*]UnresolvedRelation None, src, NoneSubquery bProject [*]UnresolvedRelation None, src, None? 假設畫成樹是這種,僅個人理解: 我將一開始介紹的三種Node分別用綠色UnaryNode,紅色Binary Node 和 藍色 LeafNode 來表示。3.2、Analyzed Logical Plan
Analyzer會將允用Batch的Rules來對Unresolved Logical ?Plan Tree 進行rule應用,這里用來EliminateAnalysisOperators將Subquery給消除掉,Batch("Resolution將Atrribute和Relation給Resolve了,Analyzed Logical Plan Tree例如以下圖:3.3、Optimized Plan
我把Catalyst里的Optimizer戲稱為Spark SQL的優(yōu)化大師,由于整個Spark SQL的優(yōu)化都是在這里進行的,后面會有文章來解說Optimizer。 在這里,優(yōu)化的不明顯,由于SQL本身不復雜 scala> query.queryExecution.optimizedPlan res3: org.apache.spark.sql.catalyst.plans.logical.LogicalPlan = Project [key#0,value#1,key#2,value#3]Join Inner, Some((key#0 = key#2))MetastoreRelation default, src, NoneMetastoreRelation default, src, None 生成的樹例如以下圖:3.4、executedPlan
最后一步是終于生成的物理運行計劃,里面涉及到了Hive的TableScan,涉及到了HashJoin操作,還涉及到了Exchange,Exchange涉及到了Shuffle和Partition操作。 scala> query.queryExecution.executedPlan res4: org.apache.spark.sql.execution.SparkPlan = Project [key#0:0,value#1:1,key#2:2,value#3:3]HashJoin [key#0], [key#2], BuildRightExchange (HashPartitioning [key#0:0], 150)HiveTableScan [key#0,value#1], (MetastoreRelation default, src, None), NoneExchange (HashPartitioning [key#2:0], 150)HiveTableScan [key#2,value#3], (MetastoreRelation default, src, None), None?生成的物理運行樹如圖:四、總結:
本文介紹了Spark SQL的Catalyst框架核心TreeNode類庫,繪制了TreeNode繼承關系的類圖,了解了TreeNode這個類在Catalyst所起到的作用。語法樹中的Logical Plan均派生自TreeNode,而且Logical Plan派生出TreeNode的三種形態(tài),即Binary Node, Unary Node, Leaft Node。 正式這幾種節(jié)點,組成了Spark SQl的Catalyst的語法樹。? TreeNode的transform方法是核心的方法,它接受一個rule,會對當前節(jié)點的孩子節(jié)點進行遞歸的調用rule,最后會返回一個TreeNode的copy,這樣的操作就是transformation,貫穿了Spark SQL運行的幾個核心階段,如Analyze,Optimize階段。
? 最后用一個實際的樣例,展示出來Spark SQL的運行樹生成流程。
??
? 我眼下的理解就是這些,假設分析不到位的地方,請大家多多指正。
——EOF—— 原創(chuàng)文章,轉載請注明出自:http://blog.csdn.net/oopsoom/article/details/38084079
總結
以上是生活随笔為你收集整理的Spark SQL Catalyst源代码分析之TreeNode Library的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: input的onkeyup效果 超级简短
- 下一篇: Java中书写要注意的地方