reduceByKey和groupByKey区别与用法
2019獨(dú)角獸企業(yè)重金招聘Python工程師標(biāo)準(zhǔn)>>>
在Spar看中,我們知道一切的操作都是基于RDD的。在使用中,RDD有一種非常特殊也是非常實(shí)用的format——pair RDD,即RDD的每一行是(key, value)的格式。這種格式很像Python的字典類型,便于針對key進(jìn)行一些處理。
針對pair RDD這樣的特殊形式,spark中定義了許多方便的操作,今天主要介紹一下reduceByKey和groupByKey,因?yàn)樵诮酉聛碇v解《在spark中如何實(shí)現(xiàn)SQL中的group_concat功能?》時(shí)會用到這兩個(gè)operations。
首先,看一看spark官網(wǎng)[1]是怎么解釋的:
reduceByKey(func,?numPartitions=None)
Merge the values for each key using an associative reduce function. This will also perform the merging?locally on each mapper?before sending results to a reducer, similarly to a “combiner” in MapReduce. Output will be hash-partitioned with?numPartitions?partitions, or the default parallelism level if?numPartitions?is not specified.
也就是,reduceByKey用于對每個(gè)key對應(yīng)的多個(gè)value進(jìn)行merge操作,最重要的是它能夠在本地先進(jìn)行merge操作,并且merge操作可以通過函數(shù)自定義。
groupByKey(numPartitions=None)
Group the values for each key in the RDD into a single sequence. Hash-partitions the resulting RDD with numPartitions partitions.?Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using?reduceByKey?or aggregateByKey will provide much better performance.
也就是,groupByKey也是對每個(gè)key進(jìn)行操作,但只生成一個(gè)sequence。需要特別注意“Note”中的話,它告訴我們:如果需要對sequence進(jìn)行aggregation操作(注意,groupByKey本身不能自定義操作函數(shù)),那么,選擇reduceByKey/aggregateByKey更好。這是因?yàn)間roupByKey不能自定義函數(shù),我們需要先用groupByKey生成RDD,然后才能對此RDD通過map進(jìn)行自定義函數(shù)操作。
為了更好的理解上面這段話,下面我們使用兩種不同的方式去計(jì)算單詞的個(gè)數(shù)[2]:
val words = Array("one", "two", "two", "three", "three", "three") val wordPairsRDD = sc.parallelize(words).map(word => (word, 1)) val wordCountsWithReduce = wordPairsRDD.reduceByKey(_ + _) val wordCountsWithGroup = wordPairsRDD.groupByKey().map(t => (t._1, t._2.sum))上面得到的wordCountsWithReduce和wordCountsWithGroup是完全一樣的,但是,它們的內(nèi)部運(yùn)算過程是不同的。
(1)當(dāng)采用reduceByKeyt時(shí),Spark可以在每個(gè)分區(qū)移動(dòng)數(shù)據(jù)之前將待輸出數(shù)據(jù)與一個(gè)共用的key結(jié)合。借助下圖可以理解在reduceByKey里究竟發(fā)生了什么。 注意在數(shù)據(jù)對被搬移前同一機(jī)器上同樣的key是怎樣被組合的(reduceByKey中的lamdba函數(shù))。然后lamdba函數(shù)在每個(gè)區(qū)上被再次調(diào)用來將所有值reduce成一個(gè)最終結(jié)果。整個(gè)過程如下:
(2)當(dāng)采用groupByKey時(shí),由于它不接收函數(shù),spark只能先將所有的鍵值對(key-value pair)都移動(dòng),這樣的后果是集群節(jié)點(diǎn)之間的開銷很大,導(dǎo)致傳輸延時(shí)。整個(gè)過程如下:
因此,在對大數(shù)據(jù)進(jìn)行復(fù)雜計(jì)算時(shí),reduceByKey優(yōu)于groupByKey。
另外,如果僅僅是group處理,那么以下函數(shù)應(yīng)該優(yōu)先于 groupByKey?:
(1)、combineByKey?組合數(shù)據(jù),但是組合之后的數(shù)據(jù)類型與輸入時(shí)值的類型不一樣。
(2)、foldByKey合并每一個(gè) key 的所有值,在級聯(lián)函數(shù)和“零值”中使用。
最后,對reduceByKey中的func做一些介紹:
如果是用python寫的spark,那么有一個(gè)庫非常實(shí)用:operator[3],其中可以用的函數(shù)包括:大小比較函數(shù),邏輯操作函數(shù),數(shù)學(xué)運(yùn)算函數(shù),序列操作函數(shù)等等。這些函數(shù)可以直接通過“from operator import *”進(jìn)行調(diào)用,直接把函數(shù)名作為參數(shù)傳遞給reduceByKey即可。如下:
<span style="font-size:14px;">from operator import add rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)]) sorted(rdd.reduceByKey(add).collect()) [('a', 2), ('b', 1)]</span>?
轉(zhuǎn)載于:https://my.oschina.net/u/2935389/blog/1359396
總結(jié)
以上是生活随笔為你收集整理的reduceByKey和groupByKey区别与用法的全部內(nèi)容,希望文章能夠幫你解決所遇到的問題。
- 上一篇: 干货:完全基于情感词典的文本情感分析
- 下一篇: Linux下C语言实现LCD屏幕截图