Spark 小文件合并优化实践
文章目錄
- 背景
- 一些嘗試
- MergeTable
- 性能優化
- 后記
對 spark 任務數據落地(HDFS) 碎片文件過多的問題的優化實踐及思考。
背景
此文是關于公司在 Delta Lake 上線之前對Spark任務寫入數據產生碎片文件優化的一些實踐。
-
形成原因
數據在流轉過程中經歷 filter/shuffle 等過程后,開發人員難以評估作業寫出的數據量。即使使用了 Spark 提供的AE功能,目前也只能控制 shuffle read 階段的數據量,寫出數據的大小實際還會受壓縮算法及格式的影響,因此在任務運行時,對分區的數據評估非常困難。- shuffle 分區過多過碎,寫入性能會較差且生成的小文件會非常多。
- shuffle 分區過少過大,則寫入并發度可能會不夠,影響任務運行時間。
-
不利影響
在產生大量碎片文件后,任務數據讀取的速度會變慢(需要尋找讀入大量的文件,如果是機械盤更是需要大量的尋址操作),同時會對 hdfs namenode 內存造成很大的壓力。
在這種情況下,只能讓業務/開發人員主動的合并下數據或者控制分區數量,提高了用戶的學習及使用成本,往往效果還非常不理想。
既然在運行過程中對最終落地數據的評估如此困難,是否能將該操作放在數據落地后進行?對此我們進行了一些嘗試,希望能自動化的解決/緩解此類問題。
一些嘗試
大致做了這么一些工作:
第1和第2點主要是平臺化的一些工作,包括監測數據落盤,根據采集的 metrics 信息再判斷是否需要進行 MergeTable 操作,下文是關于 MergeTable 的一些細節實現。
MergeTable
功能:
語法:
merge table [表名] [options (fileCount=合并后文件數量)] --非分區表 merge table [表名] PARTITION (分區信息) [options (fileCount=合并后文件數量)] --分區表碎片文件校驗及合并流程圖?:
性能優化
對合并操作的性能優化
只合并碎片文件
如果設置的碎片閾值是128M,那么只會將該表/分區內小于該閾值的文件進行合并,同時如果碎片文件數量小于一定閾值,將不會觸發合并,這里主要考慮的是合并任務存在一定性能開銷,因此允許系統中存在一定量的小文件?。
分區數量及合并方式
定義了一些規則用于計算輸出文件數量及合并方式的選擇,獲取任務的最大并發度 maxConcurrency 用于計算數據的分塊大小,再根據數據碎片文件的總大小選擇合并(coalesce/repartition)方式。
- 開啟 dynamicAllocation
maxConcurrency = spark.dynamicAllocation.maxExecutors * spark.executor.cores - 未開啟 dynamicAllocation
maxConcurrency = spark.executor.instances * spark.executor.cores
以幾個場景為例對比優化前后?的性能:
? 場景1:最大并發度100,碎片文件數據100,碎片文件總大小100M,如果使用 coalesce(1),將會只會有1個線程去讀/寫數據,改為 repartition(1),則會有100個并發讀,一個線程順序寫。性能相差100X。
? 場景2:最大并發度100,碎片文件數量10000,碎片文件總大小100G,如果使用 repartition(200),將會導致100G的數據發生 shuffle,改為 coalesce(200),則能在保持相同并發的情況下避免 200G數據的IO。
? 場景3:最大并發度200,碎片文件數量10000,碎片文件總大小50G,如果使用 coalesce(100),會保存出100個500M文件,但是會浪費一半的計算性能,改為 coalesce(200),合并耗時會下降為原來的50%。
上述例子的核心都是在充分計算資源的同時避免不必要的IO。
修復元數據
因為 merge 操作會修改數據的創建及訪問時間,所以在目錄替換時需要將元數據信息修改到 merge 前的一個狀態,該操作還能避免冷數據掃描的誤判。最后還要調用 refresh table 更新表在 spark 中的狀態緩存。?
commit 前進行校驗
在最終提交前對數據進行校驗,判斷合并前后數據量是否發生變化(從數據塊元數據中直接獲取數量,避免發生IO),存在異常則會進行回滾,放棄合并操作。?
數據寫入后,自動合并效果圖:
后記
收益
該同步合并的方式已經在我們的線上穩定運行了1年多,成功的將平均文件大小從150M提升到了270M左右,提高了數據讀取速度,與此同時 Namenode 的內存壓力也得到了極大緩解。
?對 MergeTable 操作做了上述的相關優化后,根據不同的數據場景下,能帶來數倍至數十倍的性能提升。
缺陷
因為采用的是同步合并的方式,由于沒有事務控制,所以在合并過程中數據不可用,這也是我們后來開始引入 D?elta Lake 的一個原因。
總結
以上是生活随笔為你收集整理的Spark 小文件合并优化实践的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: python能开发智能家居吗_厉害了!P
- 下一篇: 过年也要跑跑跑,网约车的春节数据