数据倾斜原理及解决方案
導讀
相信很多接觸MapReduce的朋友對'數據傾斜'這四個字并不陌生,那么究竟什么是數據傾斜?又該怎樣解決這種該死的情況呢?
何為數據傾斜?
在弄清什么是數據傾斜之前,我想讓大家看看數據分布的概念:
正常的數據分布理論上都是傾斜的,就是我們所說的20-80原理:80%的財富集中在20%的人手中, 80%的用戶只使用20%的功能 , 20%的用戶貢獻了80%的訪問量 , 不同的數據字段可能的數據傾斜一般有兩種情況:
一種是唯一值非常少,極少數值有非常多的記錄值(唯一值少于幾千)
一種是唯一值比較多,這個字段的某些值有遠遠多于其他值的記錄數,但是它的占比也小于百分之一或千分之一
數據傾斜:
數據傾斜在MapReduce編程模型中十分常見,用最通俗易懂的話來說,數據傾斜無非就是大量的相同key被partition分配到一個分區里,造成了'一個人累死,其他人閑死'的情況,這種情況是我們不能接受的,這也違背了并行計算的初衷,首先一個節點要承受著巨大的壓力,而其他節點計算完畢后要一直等待這個忙碌的節點,也拖累了整體的計算時間,可以說效率是十分低下的。
數據傾斜發生時的現象:?
1、絕大多數task執行得都非常快,但個別task執行的極慢。?
2、原本能正常執行的Spark作業,某天突然爆出OOM(內存溢出)異常。觀察異常棧,是我們寫的業務代碼造成的
數據傾斜發生的原理 :
在進行shuffle的時候,必須將各個節點上相同的Key拉取到某個節點上的一個task來進行處理,比如按照key進行聚合或者join操作。如果某個key對應的數據量特別大的話,會發生數據傾斜。比如大部分key對應的10條數據,但個別key卻對應了100萬條數據,那么大部分task會只分配到10條數據,而個別task可能會分配了100萬數據。整個spark作業的運行進度是由運行時間最長的那個task決定的。?
因此出現數據傾斜的時候,spark作業看起來會運行得非常緩慢,甚至可能因為某個task處理的數據量過大導致OOM。
解決方案
1、增加jvm內存,這適用于第一種情況(唯一值非常少,極少數值有非常多的記錄值(唯一值少于幾千)),這種情況下,往往只能通過硬件的手段來進行調優,增加jvm內存可以顯著的提高運行效率。
2、增加reduce的個數,這適用于第二種情況(唯一值比較多,這個字段的某些值有遠遠多于其他值的記錄數,但是它的占比也小于百分之一或千分之一),我們知道,這種情況下,最容易造成的結果就是大量相同key被partition到一個分區,從而一個reduce執行了大量的工作,而如果我們增加了reduce的個數,這種情況相對來說會減輕很多,畢竟計算的節點多了,就算工作量還是不均勻的,那也要小很多。
3、自定義分區,這需要用戶自己繼承partition類,指定分區策略,這種方式效果比較顯著。
4、重新設計key,有一種方案是在map階段時給key加上一個隨機數,有了隨機數的key就不會被大量的分配到同一節點(小幾率),待到reduce后再把隨機數去掉即可。
5、使用combinner合并,combinner是在map階段,reduce之前的一個中間階段,在這個階段可以選擇性的把大量的相同key數據先進行一個合并,可以看做是local reduce,然后再交給reduce來處理,這樣做的好處很多,即減輕了map端向reduce端發送的數據量(減輕了網絡帶寬),也減輕了map端和reduce端中間的shuffle階段的數據拉取數量(本地化磁盤IO速率),推薦使用這種方法。
如何定位發生數據傾斜的代碼?
1、數據傾斜只會發生在shuffle中,下面是常用的可能會觸發shuffle操作的算子:distinct、groupByKey、reduceByKey、aggregateByKey、join、cogroup、repartition等。出現數據傾斜時,可能就是代碼中使用了這些算子的原因?
2、通過觀察spark UI的節目定位數據傾斜發生在第幾個stage中,如果是用yarn-client模式提交,那么本地是可以直接看到log的,可以在log中找到當前運行到了第幾個stage;如果用yarn-cluster模式提交,可以通過Spark Web UI 來查看當前運行到了第幾個stage。此外,無論是使用了yarn-client模式還是yarn-cluster模式,我們都可以在Spark Web UI 上深入看一下當前這個stage各個task分配的數據量,從而進一步確定是不是task分配的數據不均勻導致了數據傾斜。?
3、根據之前學的stage的劃分算法定位到極有可能發生數據傾斜的代碼
這是沒有發生傾斜的例子,若41ms為1h即表示發生傾斜。?
也可查看屬于第幾個stage。
查看導致數據傾斜的key的分布情況?
1. 如果是Spark SQL中的group by、join語句導致的數據傾斜,那么就查詢一下SQL中使用的表的key分布情況。?
2. 如果是對Spark RDD執行shuffle算子導致的數據傾斜,那么可以在Spark作業中加入查看key分布的代碼,比如RDD.countByKey()。然后對統計出來的各個key出現的次數,collect/take到客戶端打印一下,就可以看到key的分布情況。
數據傾斜詳細解決方案
一、使用Hive ETL(提取、轉換和加載) 預處理數據?
方案使用場景:?
導致數據傾斜的是Hive表。如果該Hive表中的數據本身很不均勻,而且業務場景需要頻繁的使用Spark對Hive表執行某個分析操作,那么比較適合使用這種技術方案。?
思路:
此時可以評估,是否可以通過Hive來進行數據預處理。即通過Hive ETL 預先對數據按照Key進行聚合,或者是預先和其他表進行join,然后再Spark作業中針對的數據源就是預處理后的Hive表。此時由于數據已經預先進行過聚合或者join操作了,那么在Spark作業中也就不需要使用原先的shuffle類算子執行這類操作了。?
原理:
從根源上解決了數據傾斜,因為徹底避免了在Spark中執行shuffle類算子。?
但是因為畢竟數據本身就存在分布不均勻的問題,所以在Hive ETL中進行groubBy或者join等shuffle操作時,還是會發生數據傾斜,導致Hive ETL速度很慢。只是避免了Spark程序發生數據傾斜。?
經驗:
在一些Java系統與Spark結合使用的項目中,會出現Java代碼頻繁調用Spark作業的場景,而且對Spark作業的執行性能要求很高,就比較適合使用這種方案。將數據傾斜提前到上游的Hive ETL,每天僅執行一次,只有那一次是比較慢的,而之后每次Java調用Spark作業時,執行速度都會很快,能夠提供更好的用戶體驗。
二、過濾少數導致傾斜的key?
方案使用場景:
若發現導致傾斜的key就少數幾個,并且對計算本身的影響并不大。比如99%的key對應10條數據,但只有一個key對應100萬數據。?
思路:
若判斷少數幾個數據量特別多的key對作業的執行和計算結果不是那么特別重要,可以直接過濾掉那幾個key。如在Spark SQL中就可以使用where子句過濾掉這些key,或者在Spark Core 中對RDD執行filter算子過濾掉這些key。如果需要每次作業執行時,動態判定哪些key的數據量最多然后過濾,可以使用sample算子對RDD進行采樣,然后計算每個key的數量,取數據量最多的key過濾即可。?
缺點:
適用場景不多,大多數情況下,導致傾斜的key還是很多的,并不是只有少數幾個。
三、提高shuffle操作的并行度?
方案使用場景:
若我們必須要面對數據傾斜問題,要這么使用。?
思路:
在對RDD執行shuffle算子時,給shuffle算子傳入一個參數,如reduceByKey(1000),該參數設置了這個shuffle算子執行時shuffle read task 的數量。對于Spark SQL中的shuffle類語句,如 groupBy 、join 等需要設置一個參數,即spark.sql.shuffle.partitions。該參數代表了shuffle read task 的并行度,默認值是200。?
原理:
增加shuffle read task 的數量,可以讓原本分配給一個task的多個key分配給多個task,從而讓每個task處理比原來更少的數據。舉例來說,如果原本有5個key,每個key對應10條數據,這5個key都是分配給一個task的,那么這個task就要處理50條數據。而增加了shuffle read task以后,每個task就分配到一個key,即每個task就處理10條數據,那么自然每個task的執行時間都會變短了。?
實現起來比較簡單,可以有效緩解和減輕數據傾斜的影響。?
只是緩解了數據傾斜而已,沒有徹底根除問題,根據實踐經驗來看,其效果有限。
四、兩階段聚合(局部聚合+全局聚合)?
方案使用場景:
對RDD執行reduceByKey等聚合類shuffle算子或者在Spark SQL中使用group by語句進行分組聚合時,比較適用這種方案。?
思路:?
這個方案的核心實現思路就是進行兩階段聚合。第一次是局部聚合,先給每個key都打上一個隨機數,比如10以內的隨機數,此時原先一樣的key就變成不一樣的了,比如(hello, 1) (hello, 1) (hello, 1) (hello, 1),就會變成(1_hello, 1) (1_hello, 1) (2_hello, 1) (2_hello, 1)。接著對打上隨機數后的數據,執行reduceByKey等聚合操作,進行局部聚合,那么局部聚合結果,就會變成了(1_hello, 2) (2_hello, 2)。然后將各個key的前綴給去掉,就會變成(hello,2)(hello,2),再次進行全局聚合操作,就可以得到最終結果了,比如(hello, 4)。?
?
方案優點:?
對于聚合類的shuffle操作導致的數據傾斜,效果是非常不錯的。通常都可以解決掉數據傾斜,或者至少是大幅度緩解數據傾斜,將Spark作業的性能提升數倍以上。?
方案缺點:?
僅僅適用于聚合類的shuffle操作,適用范圍相對較窄。如果是join類的shuffle操作,還得用其他的解決方案。
五、將reduce join 轉為map join?
方案使用場景:
在對RDD使用join類操作,或者是在Spark SQL中使用join語句時,而且join操作中的一個RDD或表的數據量比較小(幾百M或者一兩G)。?
實現思路:
不使用join算子進行連接操作,而使用Broadcast變量與map類算子實現join操作,進而完全規避掉shuffle類操作,徹底避免數據傾斜的發生和出現。將較小RDD中的數據直接通過collect算子拉取到Driver端的內存中來,然后對其創建一個Broadcast變量;接著對另外RDD執行map類算子,在算子函數內,從Broadcast變量中獲取較小RDD 的全量數據,與當前RDD的每一條數據按照連接key進行比對,如果連接key相同的話,那么就將兩個RDD的數據用你需要的方式連接起來。?
實現原理:?
普通的join是會走shuffle過程的,而一旦shuffle,就相當于會將相同key的數據拉取到一個shuffle read task中再進行join,此時就是reduce join。但是如果一個RDD是比較小的,則可以采用廣播小RDD全量數據+map算子來實現與join同樣的效果,也就是mao join ,而此時不會發生shuffle操作,也就不會發生數據傾斜。?
方案優點:?
對join操作導致的數據傾斜,效果非常好,因為根本就不會發生shuffle,也就根本不會發生數據傾斜。?
方案缺點:?
適用場景較少,因為這個方案只適用于一個大表和一個小表的情況。畢竟我們需要將小表進行廣播,此時會比較消耗內存資源,driver和每個Executor內存中都會駐留一份小RDD的全量數據。如果我們廣播出去的RDD數據比較大,比如10G以上,那么就可能發生內存溢出了。因此并不適合兩個都是大表的情況。
六、采樣傾斜key并分拆join操作?
方案使用場景:
兩個RDD/Hive表進行join的時候,如果數據量都比較大,無法采用上第五點解決方案,那么此時可以看一下兩個RDD/Hive表中key的分布情況,若出現數據傾斜,是因為其中某一個RDD/Hive表中的少數幾個key的數據量過大,而另一個中的所有key都分布比較均勻,那么采用這個解決方案是比較合適的。?
實現思路:
對包含少數幾個數據量過大的key的那個RDD,通過sample算子采樣出一份樣本來,然后統計一下每個key的數據量,計算出數據量最大的是哪幾個key。?
然后將這幾個key對應數據從原來的RDD中拆分出來,形成一個單獨的RDD,并給每個key打上n以內的隨機數作為前綴,而不會導致傾斜的大部分key形成另外一個RDD。
接著將需要join的另一個RDD,也就是過濾出來的那幾個傾斜key對應的數據并形成一個單獨的RDD,將每條數據膨脹成n條數據,這n條數據都按順序附加一個0~n的前綴,不會導致傾斜的大部分key也形成另外一個RDD?
(此時一共生存了四個RDD:兩個key有傾斜的RDD,兩個正常RDD)?
再將附加了隨機前綴的獨立RDD與另一個膨脹n倍的獨立RDD進行join,此時就可以將原先相同的key打散成n份,分散到多個task中去進行join。?
而另外兩普通的RDD就照常join即可。?
最后將兩次join的結果使用union算子合并起來即可。
原理:?
對于join導致的數據傾斜,如果只是某幾個key導致了傾斜,可以將少數幾個key拆分為獨立RDD,并附加隨機前綴打散成n份去進行join,此時這幾個key對于的數據就不會集中在少數幾個task上,而是分散到多個task進行join。?
優點:
對于join導致的數據傾斜,如果只是某幾個key導致了傾斜,此方法可以用最有效的方式打散key進行join,且只需要針對少數傾斜的key對應的數據進行擴容n倍,不需要對全量數據進行擴容,避免占用過多內存。?
缺點:?
若key特別多,則不合適。
七、使用隨機前綴和擴容RDD進行join?
方案使用場景:
若在進行join操作時,RDD中有大量的key導致數據傾斜的時候。?
思路:
首先查看RDD/Hive表中的數據分布情況,找到造成數據傾斜的RDD/Hive表,比如有多個key都對應了炒股哦萬條數據。?
然后將該RDD 的每條數據都打上一個n以內的隨即前綴。?
同時對另外一個正常的RDD進行擴容,將每條數據都擴容成n條數據,擴容出來的每條數據都依次打上一個0~n的前綴?
最后將兩個處理后的RDD進行join即可。?
原理:?
將原先一樣的key通過附加前綴變成不一樣的key,然后就看可以將這些處理后的“不同的key”分散到多個task中那個去處理,而不是讓一個task去處理大量相同的key。此方法與方法六的區別在于,有大量傾斜key的情況,沒法將部分key拆分出來單獨處理,因此只能對整個RDD 進行數據擴容,對資源要求很高。?
缺點:
更多的是緩解數據傾斜,而不是徹底避免,而且需要對整個RDD進行擴容,對內存資源要求較高。
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.PairFlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
public class ExtendRDDTest {
public static void main(String[] args) {
SparkConf conf = new SparkConf();
conf.setMaster("local");
conf.setAppName("ExtendRDDTest");
JavaSparkContext sc = new JavaSparkContext(conf);
List<Tuple2<String, String>> list1 = Arrays.asList(
new Tuple2<String, String>("001", "令狐沖"),
new Tuple2<String, String>("002", "任盈盈")
);
List<Tuple2<String, String>> list2 = Arrays.asList(
new Tuple2<String, String>("001", "一班"),
new Tuple2<String, String>("002", "二班")
);
JavaRDD<Tuple2<String, String>> list1RDD = sc.parallelize(list1);
JavaRDD<Tuple2<String, String>> list2RDD = sc.parallelize(list2);
//首先將其中key分布比較均勻的RDD擴容100倍。
JavaPairRDD<String, String> extendRDD = list1RDD.flatMapToPair(new PairFlatMapFunction<Tuple2<String,String>, String, String>() {
public Iterable<Tuple2<String, String>> call(Tuple2<String, String> arg0) throws Exception {
ArrayList<Tuple2<String,String>> list = new ArrayList<Tuple2<String,String>>();
for (int i =0;i<100;i++){
list.add(new Tuple2<String,String>(i+"_"+arg0._1,arg0._2));
}
return list;
}
});
//將另外一個key分布不均勻的RDD加上0~99的隨機數
JavaPairRDD<String, String> mappedRDD = list2RDD.mapToPair(new PairFunction<Tuple2<String,String>, String, String>() {
public Tuple2<String, String> call(Tuple2<String, String> arg0) throws Exception {
Random random = new Random();
int prefix = random.nextInt(100);
return new Tuple2<String, String>(prefix+"_"+arg0._1,arg0._2);
}
});
mappedRDD.join(extendRDD)
.foreach(new VoidFunction<Tuple2<String,Tuple2<String,String>>>() {
public void call(Tuple2<String, Tuple2<String, String>> arg0) throws Exception {
System.out.println(arg0._1.split("_")[1]+" "+arg0._2._1 +"_"+arg0._2._2);
}
});
}
}
?
總結
以上是生活随笔為你收集整理的数据倾斜原理及解决方案的全部內容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: zcmu2138
- 下一篇: C4996 'fopen': Th