MaxCompute full outer join改写left anti join实践
簡介:?ods層數據同步時經常會遇到增全量合并的模型,即T-1天增量表 + T-2全量表 = T-1全量表。可以通過full outer join腳本來完成合并,但是數據量很大時非常消耗資源。本文將為您介紹在做增量數據的增加、更新時如何通過full outer join改寫left anti join來實現的最佳實踐。
背景
ods層數據同步時經常會遇到增全量合并的模型,即T-1天增量表 + T-2全量表 = T-1全量表。可以通過full outer join腳本來完成合并,但是數據量很大時非常消耗資源。
insert overwrite table tb_test partition(ds='${bizdate}') select case when a.id is not null then a.id esle b.id end as id ,if(a.name is not null, a.name, b.name) as name,coalesce(a.age, b.age) as age --這3種寫法一樣,都是優先取delta表的字段from (select * from tb_test_delta where ds='${bizdate}' ) a full outer join (select * from tb_test where ds='${bizdate-1}' ) b on a.id =b.id;這種寫法可實現新增和更新操作:
- 新增是指增量表中新出現的數據,而全量表中沒有;
- 更新是指增量表和全量表中都有的數據,但優先取增量表的數據,覆蓋歷史表的數據。
如下圖所示,R2_1是增量表當天去重后增量數據,M3是全量表前一天的數據,而J4_2_3則是full outer join的執行圖。
將J4_2_3展開會發現里面將增量和全量進行了merge join,當數據量很大(1288億條)時會產生很大的shuffle開銷。此時優化方案就是將full outer join改成 union all,從而避免join shuffle。
優化模型
結論:full outer join改成hash cluster + left join +union all可以有效地降低計算成本,且有兩種應用場景。先將模型進行抽象,假設有a和b兩個表,a是增量表,b是全量表:
with a as ( select * from values (1,'111'),(2,'two'),(7,'777') as (id,name) ) --增量,b as ( select * from values (1,''),(2,'222'),(3,'333'),(4,'444') as (id,name) ) --全量場景1:只合并新增數據到全量表
left anti join相當于not in,增量not in全量,過濾后只剩下完全新增的id,對全量中已有的id不修改:
--查詢完全新增的id select * from a left anti join b on a.id=b.id ; --結果如下 +------------+------+ | id | name | +------------+------+ | 7 | 777 | +------------+------+ --完全新增的合并全量表 select * from a --增量表 left anti join b on a.id=b.id union all select * from b --全量表 --結果如下 +------------+------+ | id | name | +------------+------+ | 1 | | | 2 | 222 | | 3 | 333 | | 4 | 444 | | 7 | 777 | +------------+------+場景2:合并新增數據到全量表,且更新歷史數據
全量not in增量,過濾后只剩下歷史的id,然后union all增量,既新增也修改
--查詢歷史全量數據 select * from b left anti join a on a.id=b.id; --結果如下 +------------+------+ | id | name | +------------+------+ | 3 | 333 | | 4 | 444 | +------------+------+ --合并新增數據到全量表,且更新歷史數據 select * from b --全量表 left anti join a on a.id=b.id union all select * from a ; --增量表 --結果如下 +------------+------+ | id | name | +------------+------+ | 1 | 111 | | 2 | two | | 7 | 777 | | 3 | 333 | | 4 | 444 | +------------+------+優化實踐
步驟1:表屬性修改
表、作業屬性修改,對原來的表、作業進行屬性優化,可以提升優化效果。
set odps.sql.reducer.instances=3072; --可選。默認最大1111個reducer,1111哈希桶。 alter table table_name clustered by(contact_id) sorted by(contact_id) into 3072 buckets;--必選步驟2:按照上述模型的場景1 或者 場景2進行代碼改造。
這里先給出代碼改造后的資源消耗對比:
| 時間消耗 | 8h30min38s | 1h4min48s | 7h32min30s | 32min30s |
| cpu消耗 | 29666.02 Core * Min | 65705.30 Core * Min | 31126.86 Core * Min | 30589.29 Core * Min |
| mem消耗 | 109640.80 GB * Min | 133922.25 GB * Min | 114764.80 GB * Min | 65509.28 GB * Min |
可以發現hash cluster分桶操作在初始化有額外的開銷,主要是按主鍵進行散列和排序,但是這是值得的,可一勞永逸,后續的讀取速度非常快。以前每天跑需要8小時,現在除了分桶初始化需要1小時,以后每天實際只需要30分鐘。
初始化執行圖
圖1:
- M2是讀全量表。
-
M4是讀取增量表,在場景2的模型中增量表被讀取了兩次,其中:
- R5_4是對主鍵去重(row_number)后用于后面的union all,里面包含了所有的增量數據;
- R1_4是對主鍵去重(row_number)后用于left anti join,里面只包含了主鍵。
- J3_1_2是left anti join,將它展開后看到這里還是有mergJoin,但是這只是初始化的操作,后面每天就不會有了。展開后如圖2。
- R6_3_5是將增量和全量進行union all,展開后如圖3。
- R7_6則是將索引信息寫入元數據,如圖3的MetaCollector1會在R7_6中sink。
因此:圖1中除了R5_4和R1_4是去重必須的,有shuffle。還有J3_1_2和R6_3_5這兩個地方有shuffle。
圖2:
圖3:
第二天以后的執行圖
圖1:
同上,圖1中的R3_2和R1_2是對增量去重必要對操作,有shuffle,這里忽略。
初始化執行圖的J3_1_2和R6_3_5已經被合并到了M4_1_3,將其展開后如圖2。即left anti join 和 union all這兩步操作在一個階段完成了,且這個階段是Map 任務(M4_1_3),而不是Join任務或Reduce任務。而且全量表不在單獨占用一個Map任務,也被合并到了M4_1_3,因此整個過程下來沒有shuffle操作,速度提升非常明顯。也就是說只需要一個M4_1_3就能完成所有到操作,直接sink到表。
R5_4則是將索引信息寫入元數據,如圖2的MetaCollector1會在R5_4中sink。
圖2:
?
?
原文鏈接
本文為阿里云原創內容,未經允許不得轉載。
總結
以上是生活随笔為你收集整理的MaxCompute full outer join改写left anti join实践的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 从前端智能化看“低代码/无代码”
- 下一篇: SpringCloud应用在Kubern