spark sql合并小文件_Spark SQL小文件问题在OPPO的解决方案
Spark SQL小文件是指文件大小顯著小于hdfs block塊大小的的文件。過于繁多的小文件會給HDFS帶來很嚴重的性能瓶頸,對任務的穩定和集群的維護會帶來極大的挑戰。
一般來說,通過Hive調度的MR任務都可以簡單設置如下幾個小文件合并的參數來解決任務產生的小文件問題:
set?hive.merge.mapfiles=true;set?hive.merge.mapredfiles=true;
set?hive.merge.size.per.task=xxxx;
set?hive.merge.smallfiles.avgsize=xxx;
然而在我們將離線調度任務逐步從Hive遷移到Spark的過程中,由于Spark本身并不支持小文件合并功能,小文件問題日益突出,對集群穩定性造成很大影響,一度阻礙了我們的遷移工作。
為了解決小文件問題,我們經歷了從開始的不斷調整參數到后期的代碼開發等不同階段,這里給大家做一個簡單的分享。
1. Spark為什么會產生小文件
Spark生成的文件數量直接取決于RDD里partition的數量和表分區數量。注意這里的兩個分區概念并不相同,RDD的分區與任務并行度相關,而表分區則是Hive的分區數目。生成的文件數目一般是RDD分區數和表分區的乘積。因此,當任務并行度過高或者分區數目很大時,很容易產生很多的小文件。
圖1:Spark RDD分區數
因此,如果需要從參數調整來減少生成的文件數目,就只能通過減少最后一個階段RDD的分區數來達到了(減少分區數目限制于歷史數據和上下游關系,難以修改)
2. 基于社區版本的參數進行調整的方案
2.1?不含有Shuffle算子的簡單靜態分區SQL?
這樣的SQL比較簡單,主要是filter上游表一部分數據寫入到下游表,或者是兩張表簡單UNION起來的任務,這種任務的分區數目主要是由讀取文件時Partition數目決定的。
?因為從Spark 2.4以來,對Hive orc表和parquet支持已經很不錯了,為了加快運行速率,我們開啟了將Hive orc/parquet表自動轉為DataSource的參數。對于這種DataSource表的類型,partition數目主要是由如下三個參數控制其關系。
spark.sql.files.opencostinbytes;
spark.default.parallelism;
其關系如下圖所示,因此可以通過調整這三個參數來輸入數據的分片進行調整:
? ? ? ?
而非DataSource表,使用CombineInputFormat來讀取數據,因此主要是通過MR參數來進行分片調整:
mapreduce.input.fileinputformat.split.minsize
雖然我們可以通過調整輸入數據的分片來對最終文件數量進行調整,但是這樣的調整是不穩定的,上游數據大小發生一些輕微的變化,就可能帶來參數的重新適配。
為了簡單粗暴的解決這個問題,我們對這樣的SQL加了repartition的hint,引入了新的shuffle,保證文件數量是一個固定值。
2.2?帶有Shuffle算子的靜態分區任務?
在ISSUE SPARK-9858中,引入了一個新的參數:
spark.sql.adaptive.shuffle.targetPostShuffleInputSize,
后期基于spark adaptive又對這個參數做了進一步增強,可以動態的調整partition數量,盡可能保證每個task處理targetPostShuffleInputSize大小的數據,因此這個參數我們也可以用來在一定程度上控制生成的文件數量。
2.3?動態分區任務??
動態分區任務因為存在著分區這一變量,單純調整rdd這邊的partition數目很難把控整體的文件數量。
在hive里,我們可以通過設置hive.optimize.sort.dynamic.partition來緩解動態分區產生文件過多導致任務執行時task節點經常oom的狀況。這樣的參數會引入新的的shuffle,來對數據進行重排序,將相同的partition分給同一個task處理,從而避免了一個task同時持有多個文件句柄。
因此,我們可以借助這樣的思想,使用distribute by語句來修改sql,從而控制文件數量。一般而言,假設我們想對于每個分區生成不超過N個文件,則可以在SQL末尾增加DISTRIBUTE BY [動態分區列],ceil(rand() * N)。
3. 自研可合并文件的commitProtocol方案
綜上種種,每個方法都存在一定的弊端,眾多規則也在實際使用過程中對業務方造成很大困擾。
因此我們產生了想在spark這邊實現和hive類似的小文件合并機制。在幾個可能的方案選型中,我們最終選擇了:重寫spark.sql.sources.commitProtocolClass方法。
一方面,該方案對Spark代碼無侵入,便于Spark源碼的維護,另一方面,該方案對業務方使用友好,可以動態通過set命令設置,如果出現問題回滾也十分方便。業務方在使用過程中,只需要簡單設置:
spark.sql.sources.commitProtocolClass,即可控制是否開啟小文件合并。
在開啟小文件合并參數后,我們會在commit階段拿到生成的所有文件,引入兩個新的job來對這些文件進行處理。首先我們在第一個job獲取到所有大小小于spark.compact.smallfile.size的文件,在查找完成后按照spark.compact.size參數值對組合文件,并在第二個job中對這些文件進行合并。
☆?END?☆
招聘信息OPPO互聯網技術團隊招聘一大波崗位,涵蓋C++、Go、OpenJDK、Java、DevOps、容器、Linux內核開發、產品經理、項目經理等多個方向,請在公眾號后臺回復關鍵詞“招聘”查看查詳細信息。
你可能還喜歡OPPO自研ESA DataFlow架構與實踐
OPPO 實時數倉揭秘:從頂層設計實現離線與實時的平滑遷移
OPPO異地多活實踐——緩存篇
OPPO百萬級高并發MongoDB集群性能數十倍提升優化實踐(上)
OPPO百萬級高并發MongoDB集群性能數十倍提升優化實踐(下)
更多技術干貨
掃碼關注
OPPO互聯網技術
?我就知道你“在看”總結
以上是生活随笔為你收集整理的spark sql合并小文件_Spark SQL小文件问题在OPPO的解决方案的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: qt显示rgba8888 如何改 fra
- 下一篇: teamviewer设备数量上限怎么解决