SparkHiveSQL中Join操作的谓词下推?
前言:
SparkSQL和HiveSQL的Join操作中也有謂詞下推?今天就通過大神的文章來了解下。同樣,如有冒犯,請聯(lián)系。
正文
上文簡要介紹了Join在大數(shù)據(jù)領(lǐng)域中的使用背景以及常用的幾種算法-broadcast hash join 、shuffle hash join以及sort merge join等,對每一種算法的核心應(yīng)用場景也做了相關(guān)介紹,這里再重點說明一番:大表與小表進(jìn)行join會使用broadcast hash join,一旦小表稍微大點不再適合廣播分發(fā)就會選擇shuffle hash join,最后,兩張大表的話無疑選擇sort merge join。
好了,問題來了,說是這么一說,但到底選擇哪種算法歸根結(jié)底是SQL執(zhí)行引擎干的事情,按照上文邏輯,SQL執(zhí)行引擎肯定要知道參與Join的兩表大小,才能選擇最優(yōu)的算法嘍!那么斗膽問一句,怎么知道兩表大小?衡量兩表大小的是物理大小還是紀(jì)錄多少抑或兩者都有?其實,這是另一門學(xué)問-基于代價優(yōu)化(Cost Based Optimization,簡稱CBO),它不僅能夠解釋Join算法的選擇問題,更重要的,它還能確定多表聯(lián)合Join場景下的Join順序問題。
是不是對CBO很期待呢?好吧,這里先刨個坑,下一個話題我們再聊。那今天要聊點什么呢?Join算法選擇、Join順序選擇確實對Join性能影響極大,但,還有一個很重要的因素對Join的性能至關(guān)重要,那就是Join算法優(yōu)化!無論是broadcast hash join、shuffle hash join還是sort merge join,都是最基礎(chǔ)的join算法,有沒有什么優(yōu)化方案呢?還真有,這就是今天要聊的主角-Runtime Filter(下文簡稱RF)
RF預(yù)備知識:bloom filter
RF說白了是使用bloomfilter對參與join的表進(jìn)行過濾,減少實際參與join的數(shù)據(jù)量。為了下文詳細(xì)解釋整個流程,有必要先解釋一下bloomfilter這個數(shù)據(jù)結(jié)構(gòu)(對之熟悉的看官可以繞道)。Bloom Filter使用位數(shù)組來實現(xiàn)過濾,初始狀態(tài)下位數(shù)組每一位都為0,如下圖所示:
假如此時有一個集合S = {x1, x2, … xn},Bloom Filter使用k個獨立的hash函數(shù),分別將集合中的每一個元素映射到{1,…,m}的范圍。對于任何一個元素,被映射到的數(shù)字作為對應(yīng)的位數(shù)組的索引,該位會被置為1。比如元素x1被hash函數(shù)映射到數(shù)字8,那么位數(shù)組的第8位就會被置為1。下圖中集合S只有兩個元素x和y,分別被3個hash函數(shù)進(jìn)行映射,映射到的位置分別為(0,3,6)和(4,7,10),對應(yīng)的位會被置為1:
現(xiàn)在假如要判斷另一個元素是否是在此集合中,只需要被這3個hash函數(shù)進(jìn)行映射,查看對應(yīng)的位置是否有0存在,如果有的話,表示此元素肯定不存在于這個集合,否則有可能存在。下圖所示就表示z肯定不在集合{x,y}中:
RF算法理論
為了更好地說明整個過程,這里使用一個SQL示例對RF算法進(jìn)行完整講解,SQL:
select item.name, order.* from order , item where order.item_id = item.id and item.category = ‘book’,其中order為訂單表,item為商品表,兩張表根據(jù)商品id字段進(jìn)行join,該SQL意為取出商品類別為書籍的所有訂單詳情。假設(shè)商品類型為書籍的商品并不多,join算法因此確定為broadcast hash join。整個流程如下圖所示:
Step 1:將item表的join字段(item.id)經(jīng)過多個hash函數(shù)映射處理為一個bloomfilter(如果對bloomfilter不了解,自行g(shù)oogle)
Step 2:將映射好的bloomfilter分別廣播到order表的所有partition上,準(zhǔn)備進(jìn)行過濾
Step 3:以Partition2為例,存儲進(jìn)程(比如DataNode進(jìn)程)將order表中join列(order.item_id)數(shù)據(jù)一條一條讀出來,使用bloomfilter進(jìn)行過濾。淘汰該訂單數(shù)據(jù)不是書籍相關(guān)商品的訂單,這條數(shù)據(jù)直接跳過;否則該條訂單數(shù)據(jù)有可能是待檢索訂單,將該行數(shù)據(jù)全部掃描出來。
Step 4:將所有未被bloomfilter過濾掉的訂單數(shù)據(jù),通過本地socket通信發(fā)送到計算進(jìn)程(impalad)。
Step 5:再將所有書籍商品數(shù)據(jù)廣播到所有Partition節(jié)點與step4所得訂單數(shù)據(jù)進(jìn)行真正的hashjoin操作,得到最終的選擇結(jié)果。
RF算法分析
上面通過一個SQL示例簡單演示了整個RF算法在broadcast hash join中的操作流程,根據(jù)流程對該算法進(jìn)行一下理論層次分析:
- RF本質(zhì):通過謂詞(
bloomfilter)下推,在存儲層通過bloomfilter對數(shù)據(jù)進(jìn)行過濾,可以從三個方面實現(xiàn)對Join的優(yōu)化。其一,如果可以跳過很多記錄,就可以減少了數(shù)據(jù)IO掃描次數(shù)。這點需要重點解釋一下,許多朋友會有這樣的疑問:既然需要把數(shù)據(jù)掃描出來使用BloomFilter進(jìn)行過濾,為什么還會減少IO掃描次數(shù)呢?這里需要關(guān)注一個事實:大多數(shù)表存儲行為都是列存,列之間獨立存儲,掃描過濾只需要掃描join列數(shù)據(jù)(而不是所有列),如果某一列被過濾掉了,其他對應(yīng)的同一行的列就不需要掃描了,這樣減少IO掃描次數(shù)。其二,減少了數(shù)據(jù)從存儲層通過socket(甚至TPC)發(fā)送到計算層的開銷,其三,減少了最終hash
join執(zhí)行的開銷。 - RF代價:對照未使用RF的Broadcast Hash
Join來看,前者主要增加了bloomfilter的生成、廣播以及大表根據(jù)bloomfilter進(jìn)行過濾這三個開銷。通常情況下,這幾個步驟在小表較小的情況下代價并不大,基本可以忽略。 - RF優(yōu)化效果:基本取決于bloomfilter的過濾效果,如果大量數(shù)據(jù)被過濾掉了,那么join的性能就會得到極大提升;否則性能提升就會有限。
- RF實現(xiàn):和常見的謂詞下推(’=‘,’>’,’<‘等)一樣,RF實現(xiàn)需要在計算層以及存儲層分別進(jìn)行相關(guān)邏輯實現(xiàn),計算層要構(gòu)造bloomfilter并將bloomfilter下傳到存儲層,存儲層要實現(xiàn)使用該bloomfilter對指定數(shù)據(jù)進(jìn)行過濾。
RF效果驗證
事實上,RF這個東東的優(yōu)化效果是在組內(nèi)同事何大神做impala on parquet以及impala on kudu的基準(zhǔn)對比測試的時候分析發(fā)現(xiàn)的。實際測試中,impala on parquet 比之impala on kudu性能有明顯優(yōu)勢,目測至少10倍性能提升。同一SQL解析引擎,不同存儲引擎,性能竟然天壤之別!為了分析具體原因,同事就使用impala的執(zhí)行計劃分析工具對兩者的執(zhí)行計劃分別進(jìn)行了分析,才透過蛛絲馬跡發(fā)現(xiàn)前者使用了RF,而后者并沒有(當(dāng)然可能還有其他因素,但RF肯定是原因之一)。
簡單復(fù)盤一下這次測試吧,基準(zhǔn)測試使用TPCDS測試,數(shù)據(jù)規(guī)模為1T,本文使用測試過程中的一個典型SQL(Q40)作為示例對RF的神奇功效進(jìn)行回放演示。下圖是Q40的對比性能,直觀上來看RF可以直接帶來40x的性能提升,40倍哎,這到底是怎么做到的?
先來簡單看看Q40的SQL語句,如下所示,看起來比較復(fù)雜,核心涉及到3個表(catalog_sales join date_dim 、catalog_sales join warehouse 、catalog_sales join item)的join操作:
典型的星型結(jié)構(gòu),其中catalog_sales是事實表,其他表為緯度表。本次分析選擇其中catalog_sales join item這個緯度的join。因為對比測試中兩者的SQL解析引擎都是使用impala,所以SQL執(zhí)行計劃基本都相同。在此基礎(chǔ)上,來看看執(zhí)行計劃中單個執(zhí)行節(jié)點在執(zhí)行catalog_sales join item操作時由先到后的主要階段耗時,其中只貼出來重要耗時階段(Q40中Join算法為shuffle hash join,與上文所舉broadcast hash join示例略有不同,不過不影響結(jié)論):
| total time | 43s996ms | 2s385ms |
| bloomfilter生成 | Filter 0 arrival: 857ms Filter 1 arrival: 879ms Filter 2 arrival: 939ms | |
| 大表scan掃描 | HDFS_SCAN_NODE (id=0):(Total: 3s479ms) – RowsRead: 72.01M –RowsReturned: 72.01M – RowsReturnedRate: 20.69 M/s | HDFS_SCAN_NODE (id=0):(Total: 2s011ms) – RowsRead: 72.01M – RowsReturned: 35.92K – RowsReturnedRate: 17.86 K/sec Filter 0 (1.00 MB): – Rows processed: 72.01M – Rows rejected: 71.43M – Rows total: 72.01M Filter 1 (1.00 MB): – Rows processed: 49.15K – Rows rejected: 126 – Rows total: 49.15K Filter 2 (1.00 MB): – Rows processed: 584.38K – Rows rejected: 548.46K – Rows total: 584.38K |
| 數(shù)據(jù)加載計算進(jìn)程內(nèi)存 | DataStreamSender (dst_id=11):(Total: 15s984ms) – NetworkThroughput(*): 298.78 MB/sec – OverallThroughput: 100.85 MB/sec – RowsReturned: 72.01M– SerializeBatchTime: 10s567ms – TransmitDataRPCTime: 5s395ms | DataStreamSender (dst_id=11):(Total: 10.725ms) – NetworkThroughput(*): 244.06 MB/sec – OverallThroughput: 71.23 MB/sec – RowsReturned: 35.92K – SerializeBatchTime: 7.544ms – TransmitDataRPCTime: 3.130ms |
| Hash Join | HASH_JOIN_NODE (id=5): (Total: 19s104ms) – BuildPartitionTime: 862.560ms – BuildRows: 8.99M – BuildRowsPartitioned: 8.99M – BuildTime: 373.855ms – …… – ProbeRows: 90.00M – ProbeRowsPartitioned: 0 (0) – ProbeTime: 17s628ms – RowsReturned: 90.00M – RowsReturnedRate: 985.85 K/s – SpilledPartitions: 0 (0) – UnpinTime: 960.000ns | HASH_JOIN_NODE (id=6): (Total: 21.707ms) – BuildPartitionTime: 3.487ms – BuildRows: 18.81K (18814) – BuildRowsPartitioned: 18.81K – BuildTime: 646.817us – …… – ProbeRows: 85.28K (85278) – ProbeRowsPartitioned: 0 (0) – ProbeTime: 6.396ms – RowsReturned: 85.27K – RowsReturnedRate: 38.88 K/s – SpilledPartitions: 0 (0) – UnpinTime: 915.000ns |
經(jīng)過對兩種場景執(zhí)行計劃的解析,可以基本驗證上文所做的基本理論結(jié)果:
1. 確認(rèn)經(jīng)過RF之后大表的數(shù)據(jù)量得到大量濾除,只剩下少量數(shù)據(jù)參與最終的HashJoin。參見第二行大表scan掃描結(jié)果,未使用rf的返回結(jié)果有7千萬行+紀(jì)錄,而經(jīng)過RF過濾之后滿足條件的只有3w+紀(jì)錄。3萬相比7千萬,性能優(yōu)化效果自然不言而喻。
2. 經(jīng)過RF濾除之后,少量數(shù)據(jù)經(jīng)過網(wǎng)絡(luò)從存儲進(jìn)程加載到計算進(jìn)程內(nèi)存的網(wǎng)絡(luò)耗時大量減少。參見第三行“數(shù)據(jù)加載到計算進(jìn)程內(nèi)存”,前者耗時15s,后者耗時僅僅11ms。主要耗時分為兩部分,其中數(shù)據(jù)序列化時間占到2/3-10s左右,數(shù)據(jù)經(jīng)過RPC傳輸時間占另外1/3 -5s左右。
3. 最后,經(jīng)過RF濾除之后,參與到最終Hash Join的數(shù)據(jù)量大幅減少,Hash Join耗時前者是19s,后者是21ms左右。主要耗時在于大表Probe Time,前者消耗了17s左右,而后者僅需6ms。
說好的謂詞下推呢?
講真,剛開始接觸RF的時候覺得這簡直是一個實實在在的神器,崇拜之情溢于言表。然而,經(jīng)過一段時間的探索消化,直至把這篇文章寫完,也就是此時此刻,忽然覺得它并不高深莫測,說白了就是一個謂詞下推,不同的是這里的謂詞稍微奇怪一點,是一個bloomfilter而已。
提到謂詞下推,這里再引申一下下。以前經(jīng)常滿大街聽到謂詞下推,然而對謂詞下推卻總感覺懵懵懂懂,并不明白的很真切。經(jīng)過RF的洗禮,現(xiàn)在確信有了更進(jìn)一步的理解。這里拿出來和大家交流交流。個人認(rèn)為謂詞下推有兩個層面的理解:
-
其一是邏輯執(zhí)行計劃優(yōu)化層面的說法,比如SQL語句:select * from order ,item where item.id =order.item_id and item.category =‘book’,正常情況語法解析之后應(yīng)該是先執(zhí)行Join操作,再執(zhí)行Filter操作。通過謂詞下推,可以將Filter操作下推到Join操作之前執(zhí)行。即將where item.category = ‘book’下推到 item.id = order.item_id之前先行執(zhí)行。
-
其二是真正實現(xiàn)層面的說法,謂詞下推是將過濾條件從計算進(jìn)程下推到存儲進(jìn)程先行執(zhí)行,注意這里有兩種類型進(jìn)程:計算進(jìn)程以及存儲進(jìn)程。計算與存儲分離思想,這在大數(shù)據(jù)領(lǐng)域相當(dāng)常見,比如最常見的計算進(jìn)程有SparkSQL、Hive、impala等,負(fù)責(zé)SQL解析優(yōu)化、數(shù)據(jù)計算聚合等,存儲進(jìn)程有HDFS(DataNode)、Kudu、HBase,負(fù)責(zé)數(shù)據(jù)存儲。正常情況下應(yīng)該是將所有數(shù)據(jù)從存儲進(jìn)程加載到計算進(jìn)程,再進(jìn)行過濾計算。謂詞下推是說將一些過濾條件下推到存儲進(jìn)程,直接讓存儲進(jìn)程將數(shù)據(jù)過濾掉。這樣的好處顯而易見,過濾的越早,數(shù)據(jù)量越少,序列化開銷、網(wǎng)絡(luò)開銷、計算開銷這一系列都會減少,性能自然會提高。
寫到這里,忽然意識到筆者在上文出現(xiàn)了一個很嚴(yán)重的認(rèn)知錯誤:RF機(jī)制并不僅僅是一個簡單的謂詞下推,它的精髓在于提出了一個重要的謂詞-bloomfilter。當(dāng)前對RF支持的系統(tǒng)并不多,筆者只知道目前唯有Impala on Parquet進(jìn)行了支持。Impala on Kudu雖說Impala支持,但Kudu并不支持。SparkSQL on Parqeut中雖有存儲系統(tǒng)支持,無奈計算引擎-SparkSQL目前還不支持。
轉(zhuǎn)自:http://hbasefly.com/2017/04/10/bigdata-join-2/
總結(jié)
以上是生活随笔為你收集整理的SparkHiveSQL中Join操作的谓词下推?的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 位运算中的左移和右移的计算详解
- 下一篇: matlab2016安装报错(附资源)