Spark执行流程与原理
?
Spark執行計劃分析:
https://blog.csdn.net/zyzzxycj/article/details/82704713
-----------
先貼一張sql解析的總流程圖:
第一次看這圖可能還不是很理解,先看一個簡單sql:
select * from heguozi.payinfo where pay = 0 limit 10當這個sqlText,到獲得最終結果中間,會存在哪些執行計劃呢?
explain extended select * from heguozi.payinfo where pay = 0 limit 10?會看到有4個執行計劃:
== Parsed Logical Plan == 'GlobalLimit 10 +- 'LocalLimit 10+- 'Project [*]+- 'Filter ('pay = 0)+- 'UnresolvedRelation `heguozi`.`payinfo`?Parsed Logical Plan對應圖中Unresolved LogicalPlan,
== Analyzed Logical Plan == pay_id: string, totalpay_id: string, kindpay_id: string, kindpayname: string, fee: double, operator: string, operator_name: string, pay_time: bigint, pay: double, charge: double, is_valid: int, entity_id: string, create_time: bigint, op_time: bigint, last_ver: bigint, opuser_id: string, card_id: string, card_entity_id: string, online_bill_id: string, type: int, code: string, waitingpay_id: string, load_time: int, modify_time: int, ... 8 more fields GlobalLimit 10 +- LocalLimit 10+- Project [pay_id#10079, totalpay_id#10080, kindpay_id#10081, kindpayname#10082, fee#10083, operator#10084, operator_name#10085, pay_time#10086L, pay#10087, charge#10088, is_valid#10089, entity_id#10090, create_time#10091L, op_time#10092L, last_ver#10093L, opuser_id#10094, card_id#10095, card_entity_id#10096, online_bill_id#10097, type#10098, code#10099, waitingpay_id#10100, load_time#10101, modify_time#10102, ... 8 more fields]+- Filter (pay#10087 = cast(0 as double))+- SubqueryAlias payinfo+- Relation[pay_id#10079,totalpay_id#10080,kindpay_id#10081,kindpayname#10082,fee#10083,operator#10084,operator_name#10085,pay_time#10086L,pay#10087,charge#10088,is_valid#10089,entity_id#10090,create_time#10091L,op_time#10092L,last_ver#10093L,opuser_id#10094,card_id#10095,card_entity_id#10096,online_bill_id#10097,type#10098,code#10099,waitingpay_id#10100,load_time#10101,modify_time#10102,... 8 more fields] parquet?Analyzed Logical Plan對應圖中Resolved LogicalPlan,
== Optimized Logical Plan == GlobalLimit 10 +- LocalLimit 10+- Filter (isnotnull(pay#10087) && (pay#10087 = 0.0))+- Relation[pay_id#10079,totalpay_id#10080,kindpay_id#10081,kindpayname#10082,fee#10083,operator#10084,operator_name#10085,pay_time#10086L,pay#10087,charge#10088,is_valid#10089,entity_id#10090,create_time#10091L,op_time#10092L,last_ver#10093L,opuser_id#10094,card_id#10095,card_entity_id#10096,online_bill_id#10097,type#10098,code#10099,waitingpay_id#10100,load_time#10101,modify_time#10102,... 8 more fields] parquetOptimized Logical Plan對應圖中Optimized LogicalPlan,?
== Physical Plan == CollectLimit 10 +- *(1) LocalLimit 10+- *(1) Project [pay_id#10079, totalpay_id#10080, kindpay_id#10081, kindpayname#10082, fee#10083, operator#10084, operator_name#10085, pay_time#10086L, pay#10087, charge#10088, is_valid#10089, entity_id#10090, create_time#10091L, op_time#10092L, last_ver#10093L, opuser_id#10094, card_id#10095, card_entity_id#10096, online_bill_id#10097, type#10098, code#10099, waitingpay_id#10100, load_time#10101, modify_time#10102, ... 8 more fields]+- *(1) Filter (isnotnull(pay#10087) && (pay#10087 = 0.0))+- *(1) FileScan parquet heguozi.payinfo[pay_id#10079,totalpay_id#10080,kindpay_id#10081,kindpayname#10082,fee#10083,operator#10084,operator_name#10085,pay_time#10086L,pay#10087,charge#10088,is_valid#10089,entity_id#10090,create_time#10091L,op_time#10092L,last_ver#10093L,opuser_id#10094,card_id#10095,card_entity_id#10096,online_bill_id#10097,type#10098,code#10099,waitingpay_id#10100,load_time#10101,modify_time#10102,... 8 more fields] Batched: true, Format: Parquet, Location: CatalogFileIndex[hdfs://cluster-cdh/user/flume/heguozi/payinfo], PartitionCount: 0, PartitionFilters: [], PushedFilters: [IsNotNull(pay), EqualTo(pay,0.0)], ReadSchema: struct<pay_id:string,totalpay_id:string,kindpay_id:string,kindpayname:string,fee:double,operator:...Physical Plan即為最終可執行的PhysicalPlan。
?
在Spark執行計劃分析(https://blog.csdn.net/zyzzxycj/article/details/82704713)中已經說明,Physical Plan 中的*(n)為WholeStageCodegenId,這個WholeStageCodegen又是個啥東西呢??
(whole-stage code?generation --暫時沒找到有什么確切的翻譯)
它是在spark2.x中才有的一個新技術,它的作用是將spark job執行過程中的算子自動生成為可執行代碼來執行,本質就是scala的反射機制,不涉及虛函數的調用,更優于spark1.x的Volcano Iterator Model (火山迭代模型)。當然,whole-stage code generation技術只是從CPU密集操作的方面進行性能調優,對IO密集操作的層面是無法提高效率的,比如Shuffle中產生的讀寫磁盤操作是無法通過該技術提升性能的。
?
那么就拿剛剛的sql為例,來看一下*(1)所生成的代碼吧:
代碼比較長,但是仔細看一下,幾乎都是重復的操作。
Found 1 WholeStageCodegen subtrees. == Subtree 1 / 1 == *(1) LocalLimit 10 +- *(1) Project [pay_id#10263, totalpay_id#10264, kindpay_id#10265, kindpayname#10266, fee#10267, operator#10268, operator_name#10269, pay_time#10270L, pay#10271, charge#10272, is_valid#10273, entity_id#10274, create_time#10275L, op_time#10276L, last_ver#10277L, opuser_id#10278, card_id#10279, card_entity_id#10280, online_bill_id#10281, type#10282, code#10283, waitingpay_id#10284, load_time#10285, modify_time#10286, ... 8 more fields]+- *(1) Filter (isnotnull(pay#10271) && (pay#10271 = 0.0))+- *(1) FileScan parquet heguozi.payinfo[pay_id#10263,totalpay_id#10264,kindpay_id#10265,kindpayname#10266,fee#10267,operator#10268,operator_name#10269,pay_time#10270L,pay#10271,charge#10272,is_valid#10273,entity_id#10274,create_time#10275L,op_time#10276L,last_ver#10277L,opuser_id#10278,card_id#10279,card_entity_id#10280,online_bill_id#10281,type#10282,code#10283,waitingpay_id#10284,load_time#10285,modify_time#10286,... 8 more fields] Batched: true, Format: Parquet, Location: CatalogFileIndex[hdfs://cluster-cdh/user/flume/heguozi/payinfo], PartitionCount: 0, PartitionFilters: [], PushedFilters: [IsNotNull(pay), EqualTo(pay,0.0)], ReadSchema: struct<pay_id:string,totalpay_id:string,kindpay_id:string,kindpayname:string,fee:double,operator:...Generated code: /* 001 */ public Object generate(Object[] references) { /* 002 */ return new GeneratedIteratorForCodegenStage1(references); /* 003 */ } /* 004 */ /* 005 */ final class GeneratedIteratorForCodegenStage1 extends org.apache.spark.sql.execution.BufferedRowIterator { /* 006 */ private Object[] references; /* 007 */ private scala.collection.Iterator[] inputs; /* 008 */ private long scan_scanTime_0; /* 009 */ private int scan_batchIdx_0; /* 010 */ private boolean locallimit_stopEarly_0; /* 011 */ private int locallimit_count_0; /* 012 */ private org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[] scan_mutableStateArray_4 = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder[4]; /* 013 */ private org.apache.spark.sql.execution.vectorized.OnHeapColumnVector[] scan_mutableStateArray_2 = new org.apache.spark.sql.execution.vectorized.OnHeapColumnVector[32]; /* 014 */ private org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[] scan_mutableStateArray_5 = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter[4]; /* 015 */ private UnsafeRow[] scan_mutableStateArray_3 = new UnsafeRow[4]; /* 016 */ private org.apache.spark.sql.vectorized.ColumnarBatch[] scan_mutableStateArray_1 = new org.apache.spark.sql.vectorized.ColumnarBatch[1]; /* 017 */ private scala.collection.Iterator[] scan_mutableStateArray_0 = new scala.collection.Iterator[1]; /* 018 */ /* 019 */ public GeneratedIteratorForCodegenStage1(Object[] references) { /* 020 */ this.references = references; /* 021 */ } /* 022 */ /* 023 */ public void init(int index, scala.collection.Iterator[] inputs) { /* 024 */ partitionIndex = index; /* 025 */ this.inputs = inputs; /* 026 */ wholestagecodegen_init_0_0(); /* 027 */ wholestagecodegen_init_0_1(); /* 028 */ /* 029 */ } /* 030 */ /* 031 */ private void wholestagecodegen_init_0_1() { /* 032 */ scan_mutableStateArray_4[3] = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(scan_mutableStateArray_3[3], 512); /* 033 */ scan_mutableStateArray_5[3] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(scan_mutableStateArray_4[3], 32); /* 034 */ /* 035 */ } /* 036 */ /* 037 */ private void scan_nextBatch_0() throws java.io.IOException { /* 038 */ long getBatchStart = System.nanoTime(); /* 039 */ if (scan_mutableStateArray_0[0].hasNext()) { /* 040 */ scan_mutableStateArray_1[0] = (org.apache.spark.sql.vectorized.ColumnarBatch)scan_mutableStateArray_0[0].next(); /* 041 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[0] /* numOutputRows */).add(scan_mutableStateArray_1[0].numRows()); /* 042 */ scan_batchIdx_0 = 0; /* 043 */ scan_mutableStateArray_2[0] = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) scan_mutableStateArray_1[0].column(0); /* 044 */ scan_mutableStateArray_2[1] = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) scan_mutableStateArray_1[0].column(1); /* 045 */ scan_mutableStateArray_2[2] = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) scan_mutableStateArray_1[0].column(2); /* 046 */ scan_mutableStateArray_2[3] = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) scan_mutableStateArray_1[0].column(3); /* 047 */ scan_mutableStateArray_2[4] = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) scan_mutableStateArray_1[0].column(4); /* 048 */ scan_mutableStateArray_2[5] = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) scan_mutableStateArray_1[0].column(5); /* 049 */ scan_mutableStateArray_2[6] = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) scan_mutableStateArray_1[0].column(6); /* 050 */ scan_mutableStateArray_2[7] = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) scan_mutableStateArray_1[0].column(7); /* 051 */ scan_mutableStateArray_2[8] = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) scan_mutableStateArray_1[0].column(8); /* 052 */ scan_mutableStateArray_2[9] = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) scan_mutableStateArray_1[0].column(9); /* 053 */ scan_mutableStateArray_2[10] = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) scan_mutableStateArray_1[0].column(10); /* 054 */ scan_mutableStateArray_2[11] = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) scan_mutableStateArray_1[0].column(11); /* 055 */ scan_mutableStateArray_2[12] = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) scan_mutableStateArray_1[0].column(12); /* 056 */ scan_mutableStateArray_2[13] = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) scan_mutableStateArray_1[0].column(13); /* 057 */ scan_mutableStateArray_2[14] = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) scan_mutableStateArray_1[0].column(14); /* 058 */ scan_mutableStateArray_2[15] = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) scan_mutableStateArray_1[0].column(15); /* 059 */ scan_mutableStateArray_2[16] = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) scan_mutableStateArray_1[0].column(16); /* 060 */ scan_mutableStateArray_2[17] = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) scan_mutableStateArray_1[0].column(17); /* 061 */ scan_mutableStateArray_2[18] = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) scan_mutableStateArray_1[0].column(18); /* 062 */ scan_mutableStateArray_2[19] = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) scan_mutableStateArray_1[0].column(19); /* 063 */ scan_mutableStateArray_2[20] = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) scan_mutableStateArray_1[0].column(20); /* 064 */ scan_mutableStateArray_2[21] = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) scan_mutableStateArray_1[0].column(21); /* 065 */ scan_mutableStateArray_2[22] = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) scan_mutableStateArray_1[0].column(22); /* 066 */ scan_mutableStateArray_2[23] = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) scan_mutableStateArray_1[0].column(23); /* 067 */ scan_mutableStateArray_2[24] = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) scan_mutableStateArray_1[0].column(24); /* 068 */ scan_mutableStateArray_2[25] = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) scan_mutableStateArray_1[0].column(25); /* 069 */ scan_mutableStateArray_2[26] = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) scan_mutableStateArray_1[0].column(26); /* 070 */ scan_mutableStateArray_2[27] = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) scan_mutableStateArray_1[0].column(27); /* 071 */ scan_mutableStateArray_2[28] = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) scan_mutableStateArray_1[0].column(28); /* 072 */ scan_mutableStateArray_2[29] = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) scan_mutableStateArray_1[0].column(29); /* 073 */ scan_mutableStateArray_2[30] = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) scan_mutableStateArray_1[0].column(30); /* 074 */ scan_mutableStateArray_2[31] = (org.apache.spark.sql.execution.vectorized.OnHeapColumnVector) scan_mutableStateArray_1[0].column(31); /* 075 */ /* 076 */ } /* 077 */ scan_scanTime_0 += System.nanoTime() - getBatchStart; /* 078 */ } /* 079 */ /* 080 */ protected void processNext() throws java.io.IOException { /* 081 */ if (scan_mutableStateArray_1[0] == null) { /* 082 */ scan_nextBatch_0(); /* 083 */ } /* 084 */ while (scan_mutableStateArray_1[0] != null) { /* 085 */ int scan_numRows_0 = scan_mutableStateArray_1[0].numRows(); /* 086 */ int scan_localEnd_0 = scan_numRows_0 - scan_batchIdx_0; /* 087 */ for (int scan_localIdx_0 = 0; scan_localIdx_0 < scan_localEnd_0; scan_localIdx_0++) { /* 088 */ int scan_rowIdx_0 = scan_batchIdx_0 + scan_localIdx_0; /* 089 */ do { /* 090 */ boolean scan_isNull_8 = scan_mutableStateArray_2[8].isNullAt(scan_rowIdx_0); /* 091 */ double scan_value_8 = scan_isNull_8 ? -1.0 : (scan_mutableStateArray_2[8].getDouble(scan_rowIdx_0)); /* 092 */ /* 093 */ if (!(!(scan_isNull_8))) continue; /* 094 */ /* 095 */ boolean filter_isNull_2 = false; /* 096 */ /* 097 */ boolean filter_value_2 = false; /* 098 */ filter_value_2 = ((java.lang.Double.isNaN(scan_value_8) && java.lang.Double.isNaN(0.0D)) || scan_value_8 == 0.0D); /* 099 */ if (!filter_value_2) continue; /* 100 */ /* 101 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[2] /* numOutputRows */).add(1); /* 102 */ /* 103 */ if (locallimit_count_0 < 10) { /* 104 */ locallimit_count_0 += 1; /* 105 */ /* 106 */ boolean scan_isNull_0 = scan_mutableStateArray_2[0].isNullAt(scan_rowIdx_0); /* 107 */ UTF8String scan_value_0 = scan_isNull_0 ? null : (scan_mutableStateArray_2[0].getUTF8String(scan_rowIdx_0)); /* 108 */ boolean scan_isNull_1 = scan_mutableStateArray_2[1].isNullAt(scan_rowIdx_0); /* 109 */ UTF8String scan_value_1 = scan_isNull_1 ? null : (scan_mutableStateArray_2[1].getUTF8String(scan_rowIdx_0)); /* 110 */ boolean scan_isNull_2 = scan_mutableStateArray_2[2].isNullAt(scan_rowIdx_0); /* 111 */ UTF8String scan_value_2 = scan_isNull_2 ? null : (scan_mutableStateArray_2[2].getUTF8String(scan_rowIdx_0)); /* 112 */ boolean scan_isNull_3 = scan_mutableStateArray_2[3].isNullAt(scan_rowIdx_0); /* 113 */ UTF8String scan_value_3 = scan_isNull_3 ? null : (scan_mutableStateArray_2[3].getUTF8String(scan_rowIdx_0)); /* 114 */ boolean scan_isNull_4 = scan_mutableStateArray_2[4].isNullAt(scan_rowIdx_0); /* 115 */ double scan_value_4 = scan_isNull_4 ? -1.0 : (scan_mutableStateArray_2[4].getDouble(scan_rowIdx_0)); /* 116 */ boolean scan_isNull_5 = scan_mutableStateArray_2[5].isNullAt(scan_rowIdx_0); /* 117 */ UTF8String scan_value_5 = scan_isNull_5 ? null : (scan_mutableStateArray_2[5].getUTF8String(scan_rowIdx_0)); /* 118 */ boolean scan_isNull_6 = scan_mutableStateArray_2[6].isNullAt(scan_rowIdx_0); /* 119 */ UTF8String scan_value_6 = scan_isNull_6 ? null : (scan_mutableStateArray_2[6].getUTF8String(scan_rowIdx_0)); /* 120 */ boolean scan_isNull_7 = scan_mutableStateArray_2[7].isNullAt(scan_rowIdx_0); /* 121 */ long scan_value_7 = scan_isNull_7 ? -1L : (scan_mutableStateArray_2[7].getLong(scan_rowIdx_0)); /* 122 */ boolean scan_isNull_9 = scan_mutableStateArray_2[9].isNullAt(scan_rowIdx_0); /* 123 */ double scan_value_9 = scan_isNull_9 ? -1.0 : (scan_mutableStateArray_2[9].getDouble(scan_rowIdx_0)); /* 124 */ boolean scan_isNull_10 = scan_mutableStateArray_2[10].isNullAt(scan_rowIdx_0); /* 125 */ int scan_value_10 = scan_isNull_10 ? -1 : (scan_mutableStateArray_2[10].getInt(scan_rowIdx_0)); /* 126 */ boolean scan_isNull_11 = scan_mutableStateArray_2[11].isNullAt(scan_rowIdx_0); /* 127 */ UTF8String scan_value_11 = scan_isNull_11 ? null : (scan_mutableStateArray_2[11].getUTF8String(scan_rowIdx_0)); /* 128 */ boolean scan_isNull_12 = scan_mutableStateArray_2[12].isNullAt(scan_rowIdx_0); /* 129 */ long scan_value_12 = scan_isNull_12 ? -1L : (scan_mutableStateArray_2[12].getLong(scan_rowIdx_0)); /* 130 */ boolean scan_isNull_13 = scan_mutableStateArray_2[13].isNullAt(scan_rowIdx_0); /* 131 */ long scan_value_13 = scan_isNull_13 ? -1L : (scan_mutableStateArray_2[13].getLong(scan_rowIdx_0)); /* 132 */ boolean scan_isNull_14 = scan_mutableStateArray_2[14].isNullAt(scan_rowIdx_0); /* 133 */ long scan_value_14 = scan_isNull_14 ? -1L : (scan_mutableStateArray_2[14].getLong(scan_rowIdx_0)); /* 134 */ boolean scan_isNull_15 = scan_mutableStateArray_2[15].isNullAt(scan_rowIdx_0); /* 135 */ UTF8String scan_value_15 = scan_isNull_15 ? null : (scan_mutableStateArray_2[15].getUTF8String(scan_rowIdx_0)); /* 136 */ boolean scan_isNull_16 = scan_mutableStateArray_2[16].isNullAt(scan_rowIdx_0); /* 137 */ UTF8String scan_value_16 = scan_isNull_16 ? null : (scan_mutableStateArray_2[16].getUTF8String(scan_rowIdx_0)); /* 138 */ boolean scan_isNull_17 = scan_mutableStateArray_2[17].isNullAt(scan_rowIdx_0); /* 139 */ UTF8String scan_value_17 = scan_isNull_17 ? null : (scan_mutableStateArray_2[17].getUTF8String(scan_rowIdx_0)); /* 140 */ boolean scan_isNull_18 = scan_mutableStateArray_2[18].isNullAt(scan_rowIdx_0); /* 141 */ UTF8String scan_value_18 = scan_isNull_18 ? null : (scan_mutableStateArray_2[18].getUTF8String(scan_rowIdx_0)); /* 142 */ boolean scan_isNull_19 = scan_mutableStateArray_2[19].isNullAt(scan_rowIdx_0); /* 143 */ int scan_value_19 = scan_isNull_19 ? -1 : (scan_mutableStateArray_2[19].getInt(scan_rowIdx_0)); /* 144 */ boolean scan_isNull_20 = scan_mutableStateArray_2[20].isNullAt(scan_rowIdx_0); /* 145 */ UTF8String scan_value_20 = scan_isNull_20 ? null : (scan_mutableStateArray_2[20].getUTF8String(scan_rowIdx_0)); /* 146 */ boolean scan_isNull_21 = scan_mutableStateArray_2[21].isNullAt(scan_rowIdx_0); /* 147 */ UTF8String scan_value_21 = scan_isNull_21 ? null : (scan_mutableStateArray_2[21].getUTF8String(scan_rowIdx_0)); /* 148 */ boolean scan_isNull_22 = scan_mutableStateArray_2[22].isNullAt(scan_rowIdx_0); /* 149 */ int scan_value_22 = scan_isNull_22 ? -1 : (scan_mutableStateArray_2[22].getInt(scan_rowIdx_0)); /* 150 */ boolean scan_isNull_23 = scan_mutableStateArray_2[23].isNullAt(scan_rowIdx_0); /* 151 */ int scan_value_23 = scan_isNull_23 ? -1 : (scan_mutableStateArray_2[23].getInt(scan_rowIdx_0)); /* 152 */ boolean scan_isNull_24 = scan_mutableStateArray_2[24].isNullAt(scan_rowIdx_0); /* 153 */ byte scan_value_24 = scan_isNull_24 ? (byte)-1 : (scan_mutableStateArray_2[24].getByte(scan_rowIdx_0)); /* 154 */ boolean scan_isNull_25 = scan_mutableStateArray_2[25].isNullAt(scan_rowIdx_0); /* 155 */ UTF8String scan_value_25 = scan_isNull_25 ? null : (scan_mutableStateArray_2[25].getUTF8String(scan_rowIdx_0)); /* 156 */ boolean scan_isNull_26 = scan_mutableStateArray_2[26].isNullAt(scan_rowIdx_0); /* 157 */ double scan_value_26 = scan_isNull_26 ? -1.0 : (scan_mutableStateArray_2[26].getDouble(scan_rowIdx_0)); /* 158 */ boolean scan_isNull_27 = scan_mutableStateArray_2[27].isNullAt(scan_rowIdx_0); /* 159 */ double scan_value_27 = scan_isNull_27 ? -1.0 : (scan_mutableStateArray_2[27].getDouble(scan_rowIdx_0)); /* 160 */ boolean scan_isNull_28 = scan_mutableStateArray_2[28].isNullAt(scan_rowIdx_0); /* 161 */ int scan_value_28 = scan_isNull_28 ? -1 : (scan_mutableStateArray_2[28].getInt(scan_rowIdx_0)); /* 162 */ boolean scan_isNull_29 = scan_mutableStateArray_2[29].isNullAt(scan_rowIdx_0); /* 163 */ UTF8String scan_value_29 = scan_isNull_29 ? null : (scan_mutableStateArray_2[29].getUTF8String(scan_rowIdx_0)); /* 164 */ boolean scan_isNull_30 = scan_mutableStateArray_2[30].isNullAt(scan_rowIdx_0); /* 165 */ int scan_value_30 = scan_isNull_30 ? -1 : (scan_mutableStateArray_2[30].getInt(scan_rowIdx_0)); /* 166 */ boolean scan_isNull_31 = scan_mutableStateArray_2[31].isNullAt(scan_rowIdx_0); /* 167 */ UTF8String scan_value_31 = scan_isNull_31 ? null : (scan_mutableStateArray_2[31].getUTF8String(scan_rowIdx_0)); /* 168 */ scan_mutableStateArray_4[3].reset(); /* 169 */ /* 170 */ scan_mutableStateArray_5[3].zeroOutNullBytes(); /* 171 */ /* 172 */ if (scan_isNull_0) { /* 173 */ scan_mutableStateArray_5[3].setNullAt(0); /* 174 */ } else { /* 175 */ scan_mutableStateArray_5[3].write(0, scan_value_0); /* 176 */ } /* 177 */ /* 178 */ if (scan_isNull_1) { /* 179 */ scan_mutableStateArray_5[3].setNullAt(1); /* 180 */ } else { /* 181 */ scan_mutableStateArray_5[3].write(1, scan_value_1); /* 182 */ } /* 183 */ /* 184 */ if (scan_isNull_2) { /* 185 */ scan_mutableStateArray_5[3].setNullAt(2); /* 186 */ } else { /* 187 */ scan_mutableStateArray_5[3].write(2, scan_value_2); /* 188 */ } /* 189 */ /* 190 */ if (scan_isNull_3) { /* 191 */ scan_mutableStateArray_5[3].setNullAt(3); /* 192 */ } else { /* 193 */ scan_mutableStateArray_5[3].write(3, scan_value_3); /* 194 */ } /* 195 */ /* 196 */ if (scan_isNull_4) { /* 197 */ scan_mutableStateArray_5[3].setNullAt(4); /* 198 */ } else { /* 199 */ scan_mutableStateArray_5[3].write(4, scan_value_4); /* 200 */ } /* 201 */ /* 202 */ if (scan_isNull_5) { /* 203 */ scan_mutableStateArray_5[3].setNullAt(5); /* 204 */ } else { /* 205 */ scan_mutableStateArray_5[3].write(5, scan_value_5); /* 206 */ } /* 207 */ /* 208 */ if (scan_isNull_6) { /* 209 */ scan_mutableStateArray_5[3].setNullAt(6); /* 210 */ } else { /* 211 */ scan_mutableStateArray_5[3].write(6, scan_value_6); /* 212 */ } /* 213 */ /* 214 */ if (scan_isNull_7) { /* 215 */ scan_mutableStateArray_5[3].setNullAt(7); /* 216 */ } else { /* 217 */ scan_mutableStateArray_5[3].write(7, scan_value_7); /* 218 */ } /* 219 */ /* 220 */ scan_mutableStateArray_5[3].write(8, scan_value_8); /* 221 */ /* 222 */ if (scan_isNull_9) { /* 223 */ scan_mutableStateArray_5[3].setNullAt(9); /* 224 */ } else { /* 225 */ scan_mutableStateArray_5[3].write(9, scan_value_9); /* 226 */ } /* 227 */ /* 228 */ if (scan_isNull_10) { /* 229 */ scan_mutableStateArray_5[3].setNullAt(10); /* 230 */ } else { /* 231 */ scan_mutableStateArray_5[3].write(10, scan_value_10); /* 232 */ } /* 233 */ /* 234 */ if (scan_isNull_11) { /* 235 */ scan_mutableStateArray_5[3].setNullAt(11); /* 236 */ } else { /* 237 */ scan_mutableStateArray_5[3].write(11, scan_value_11); /* 238 */ } /* 239 */ /* 240 */ if (scan_isNull_12) { /* 241 */ scan_mutableStateArray_5[3].setNullAt(12); /* 242 */ } else { /* 243 */ scan_mutableStateArray_5[3].write(12, scan_value_12); /* 244 */ } /* 245 */ /* 246 */ if (scan_isNull_13) { /* 247 */ scan_mutableStateArray_5[3].setNullAt(13); /* 248 */ } else { /* 249 */ scan_mutableStateArray_5[3].write(13, scan_value_13); /* 250 */ } /* 251 */ /* 252 */ if (scan_isNull_14) { /* 253 */ scan_mutableStateArray_5[3].setNullAt(14); /* 254 */ } else { /* 255 */ scan_mutableStateArray_5[3].write(14, scan_value_14); /* 256 */ } /* 257 */ /* 258 */ if (scan_isNull_15) { /* 259 */ scan_mutableStateArray_5[3].setNullAt(15); /* 260 */ } else { /* 261 */ scan_mutableStateArray_5[3].write(15, scan_value_15); /* 262 */ } /* 263 */ /* 264 */ if (scan_isNull_16) { /* 265 */ scan_mutableStateArray_5[3].setNullAt(16); /* 266 */ } else { /* 267 */ scan_mutableStateArray_5[3].write(16, scan_value_16); /* 268 */ } /* 269 */ /* 270 */ if (scan_isNull_17) { /* 271 */ scan_mutableStateArray_5[3].setNullAt(17); /* 272 */ } else { /* 273 */ scan_mutableStateArray_5[3].write(17, scan_value_17); /* 274 */ } /* 275 */ /* 276 */ if (scan_isNull_18) { /* 277 */ scan_mutableStateArray_5[3].setNullAt(18); /* 278 */ } else { /* 279 */ scan_mutableStateArray_5[3].write(18, scan_value_18); /* 280 */ } /* 281 */ /* 282 */ if (scan_isNull_19) { /* 283 */ scan_mutableStateArray_5[3].setNullAt(19); /* 284 */ } else { /* 285 */ scan_mutableStateArray_5[3].write(19, scan_value_19); /* 286 */ } /* 287 */ /* 288 */ if (scan_isNull_20) { /* 289 */ scan_mutableStateArray_5[3].setNullAt(20); /* 290 */ } else { /* 291 */ scan_mutableStateArray_5[3].write(20, scan_value_20); /* 292 */ } /* 293 */ /* 294 */ if (scan_isNull_21) { /* 295 */ scan_mutableStateArray_5[3].setNullAt(21); /* 296 */ } else { /* 297 */ scan_mutableStateArray_5[3].write(21, scan_value_21); /* 298 */ } /* 299 */ /* 300 */ if (scan_isNull_22) { /* 301 */ scan_mutableStateArray_5[3].setNullAt(22); /* 302 */ } else { /* 303 */ scan_mutableStateArray_5[3].write(22, scan_value_22); /* 304 */ } /* 305 */ /* 306 */ if (scan_isNull_23) { /* 307 */ scan_mutableStateArray_5[3].setNullAt(23); /* 308 */ } else { /* 309 */ scan_mutableStateArray_5[3].write(23, scan_value_23); /* 310 */ } /* 311 */ /* 312 */ if (scan_isNull_24) { /* 313 */ scan_mutableStateArray_5[3].setNullAt(24); /* 314 */ } else { /* 315 */ scan_mutableStateArray_5[3].write(24, scan_value_24); /* 316 */ } /* 317 */ /* 318 */ if (scan_isNull_25) { /* 319 */ scan_mutableStateArray_5[3].setNullAt(25); /* 320 */ } else { /* 321 */ scan_mutableStateArray_5[3].write(25, scan_value_25); /* 322 */ } /* 323 */ /* 324 */ if (scan_isNull_26) { /* 325 */ scan_mutableStateArray_5[3].setNullAt(26); /* 326 */ } else { /* 327 */ scan_mutableStateArray_5[3].write(26, scan_value_26); /* 328 */ } /* 329 */ /* 330 */ if (scan_isNull_27) { /* 331 */ scan_mutableStateArray_5[3].setNullAt(27); /* 332 */ } else { /* 333 */ scan_mutableStateArray_5[3].write(27, scan_value_27); /* 334 */ } /* 335 */ /* 336 */ if (scan_isNull_28) { /* 337 */ scan_mutableStateArray_5[3].setNullAt(28); /* 338 */ } else { /* 339 */ scan_mutableStateArray_5[3].write(28, scan_value_28); /* 340 */ } /* 341 */ /* 342 */ if (scan_isNull_29) { /* 343 */ scan_mutableStateArray_5[3].setNullAt(29); /* 344 */ } else { /* 345 */ scan_mutableStateArray_5[3].write(29, scan_value_29); /* 346 */ } /* 347 */ /* 348 */ if (scan_isNull_30) { /* 349 */ scan_mutableStateArray_5[3].setNullAt(30); /* 350 */ } else { /* 351 */ scan_mutableStateArray_5[3].write(30, scan_value_30); /* 352 */ } /* 353 */ /* 354 */ if (scan_isNull_31) { /* 355 */ scan_mutableStateArray_5[3].setNullAt(31); /* 356 */ } else { /* 357 */ scan_mutableStateArray_5[3].write(31, scan_value_31); /* 358 */ } /* 359 */ scan_mutableStateArray_3[3].setTotalSize(scan_mutableStateArray_4[3].totalSize()); /* 360 */ append(scan_mutableStateArray_3[3]); /* 361 */ /* 362 */ } else { /* 363 */ locallimit_stopEarly_0 = true; /* 364 */ } /* 365 */ /* 366 */ } while(false); /* 367 */ if (shouldStop()) { scan_batchIdx_0 = scan_rowIdx_0 + 1; return; } /* 368 */ } /* 369 */ scan_batchIdx_0 = scan_numRows_0; /* 370 */ scan_mutableStateArray_1[0] = null; /* 371 */ scan_nextBatch_0(); /* 372 */ } /* 373 */ ((org.apache.spark.sql.execution.metric.SQLMetric) references[1] /* scanTime */).add(scan_scanTime_0 / (1000 * 1000)); /* 374 */ scan_scanTime_0 = 0; /* 375 */ } /* 376 */ /* 377 */ @Override /* 378 */ protected boolean stopEarly() { /* 379 */ return locallimit_stopEarly_0; /* 380 */ } /* 381 */ /* 382 */ private void wholestagecodegen_init_0_0() { /* 383 */ scan_mutableStateArray_0[0] = inputs[0]; /* 384 */ /* 385 */ scan_mutableStateArray_3[0] = new UnsafeRow(32); /* 386 */ scan_mutableStateArray_4[0] = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(scan_mutableStateArray_3[0], 512); /* 387 */ scan_mutableStateArray_5[0] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(scan_mutableStateArray_4[0], 32); /* 388 */ scan_mutableStateArray_3[1] = new UnsafeRow(32); /* 389 */ scan_mutableStateArray_4[1] = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(scan_mutableStateArray_3[1], 512); /* 390 */ scan_mutableStateArray_5[1] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(scan_mutableStateArray_4[1], 32); /* 391 */ scan_mutableStateArray_3[2] = new UnsafeRow(32); /* 392 */ scan_mutableStateArray_4[2] = new org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder(scan_mutableStateArray_3[2], 512); /* 393 */ scan_mutableStateArray_5[2] = new org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter(scan_mutableStateArray_4[2], 32); /* 394 */ scan_mutableStateArray_3[3] = new UnsafeRow(32); /* 395 */ /* 396 */ } /* 397 */ /* 398 */ }關于虛調用為什么耗時的原因如下:
以具體的SQL語句 select a+b fromtable 為例進行說明,下面是它的解析過程: 1.調用虛函數Add.eval(),需確認Add兩邊數據類型 2.調用虛函數a.eval(),需要確認a的數據類型 3.確認a的數據類型是int,裝箱 4.調用虛函數b.eval(),需確認b的數據類型 5.確認b的數據類型是int,裝箱 6.調用int類型的add 7.返回裝箱后的計算結果 從上面的步驟可以看出,一條SQL語句的解析需要進行多次虛函數的調用。我們知道,虛函數的調用會極大的降低效率。那么,虛函數的調用為什么會影響效率呢? 有人答案是:虛函數調用會進行一次間接尋址過程。事實上這一步間接尋址真的會顯著降低運行效率?顯然不是。 流水線的打斷才是真正降低效率的原因。 我們知道,虛函數的調用時是運行時多態,意思就是在編譯期你是無法知道虛函數的具體調用。設想一下,如果說不是虛函數,那么在編譯時期,其相對地址是確定的,編譯器可以直接生成jmp/invoke指令; 如果是虛函數,多出來的一次查找vtable所帶來的開銷,倒是次要的,關鍵在于,這個函數地址是動態的,譬如 取到的地址在eax里,則在call eax之后的那些已經被預取進入流水線的所有指令都將失效。流水線越長,一次分支預測失敗的代價也就越大,如下所示: pf->test 001E146D mov eax,dword ptr[pf] 011E1470 mov edx,dword,ptr[eax] 011E1472 mov esi,esp 011E1474 mov ecx,dword ptr[pf] 011E1477 mov eax,dword ptr[edx] 011E1479 eax <-----------------------分支預測失敗 011E147B cmp esi esp 011E147D @ILT+355(__RTC_CheckEsp)(11E1168h)?
總結
以上是生活随笔為你收集整理的Spark执行流程与原理的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: Css3中align-content,c
- 下一篇: leetcode:641. 设计循环双端