此文已由作者葉林寶授權網易云社區發布。
歡迎訪問網易云社區,了解更多網易技術產品運營經驗。
方案四:Sort on Cell Values
簡述:
上述方案三, 當數據行數較多, 情況下, 在二次排序還是可能出現oom情況, 而且, 不同的field_index的數據可能shuffle到同一個分區,這樣就加大了oom的概率。當field_index本身取值較多 情況下, 增加分區數是其中一種解決方法。但是field_index取值本身就少于分區數的情況下, 增加分區數對緩解oom就沒任何作用了。 如果 當field_value相比field_index較為分散, 且值較多的情況下, 不妨換個思維, 按field_value分區。 具體算法如下:
算法:
(1)將df 轉換為(field_value, field_index)
(2)對分區內的數據, 用sortByKey根據 field_value排序 (rangPartition排序)
(3)利用mapPartitions確定每個分區內的每個field_index共有多少數據(不同分區中的filed_value相對有序, 例如partiiton1 中的filed_value比partition2中的field_value小)
(4)利用第(3)步數據, 確定每個field_index中所需要的排名的數據在哪個分區以及分區內第幾條數據。例如要輸出field_index_6的13th位數據,假設第一個分區已經包含10條數據, 則目標數據在第二個分區的第3條數據
(5)轉換(4)計算結果為標準輸出格式
代碼:
(1)
/***?將數據源df轉換為(field_value,?field_index)格式的rdd*?@param?dataFrame*?@return*/def?getValueColumnPairs(dataFrame?:?DataFrame):?RDD[(Double,?Int)]?={dataFrame.rdd.flatMap{row:?Row?=>?row.toSeq.zipWithIndex.map{case?(v,?index)?=>?(v.toString.toDouble,?index)}}}
(3)
/***?對按照field_value排序后的sortedValueColumnPairs,?計算出每個分區上,?每個field_index分別有多少數據*?@param?sortedValueColumnPairs*?@param?numOfColumns*?@return*/def?getColumnsFreqPerPartition(sortedValueColumnPairs:?RDD[(Double,?Int)],numOfColumns?:?Int):?Array[(Int,?Array[Long])]?=?{val?zero?=?Array.fill[Long](numOfColumns)(0)????def?aggregateColumnFrequencies?(partitionIndex?:?Int,?valueColumnPairs?:?Iterator[(Double,?Int)])?=?{val?columnsFreq?:?Array[Long]?=?valueColumnPairs.aggregate(zero)((a?:?Array[Long],?v?:?(Double,?Int))?=>?{val?(value,?colIndex)?=?v??????????//increment?the?cell?in?the?zero?array?corresponding?to?this?columna(colIndex)?=?a(colIndex)?+?1La},(a?:?Array[Long],?b?:?Array[Long])?=>?{a.zip(b).map{?case(aVal,?bVal)?=>?aVal?+?bVal}})Iterator((partitionIndex,?columnsFreq))}sortedValueColumnPairs.mapPartitionsWithIndex(aggregateColumnFrequencies).collect()}
舉例說明:
假設對(1)中轉換后的數據, 按照field_value排序后, 各個分區的數據如下所示
Partition 1: (1.5, 0) (1.75, 1) (2.0, 2) (5.25, 0)
Partition 2: (7.5, 1) (9.5, 2)
則(2)的輸出結果為:
[(0, [2, 1, 1]), (1, [0, 1, 1])]
(4)
/***?計算每個field_index所需排位數據在第幾個分區的第幾條數據*?@param?targetRanks?排位數組*?@param?partitionColumnsFreq?每個分區的每個field_index包含多少數據*?@param?numOfColumns?field個數*?@return*/def?getRanksLocationsWithinEachPart(targetRanks?:?List[Long],partitionColumnsFreq?:?Array[(Int,?Array[Long])],numOfColumns?:?Int)?:?Array[(Int,?List[(Int,?Long)])]?=?{????//?二維數組,?存儲當前每個field_index,?遍歷到到第幾條數據val?runningTotal?=?Array.fill[Long](numOfColumns)(0)????//?The?partition?indices?are?not?necessarily?in?sorted?order,?so?we?need//?to?sort?the?partitionsColumnsFreq?array?by?the?partition?index?(the//?first?value?in?the?tuple).partitionColumnsFreq.sortBy(_._1).map?{??????//?relevantIndexList?存儲分區上,?滿足排位數組的field_index在該分區的第幾條數據case?(partitionIndex,?columnsFreq)?=>?val?relevantIndexList?=?new?mutable.MutableList[(Int,?Long)]()columnsFreq.zipWithIndex.foreach{?case?(colCount,?colIndex)?=>??????????//?當天field_index(即colIndex),?遍歷到第幾條數據val?runningTotalCol?=?runningTotal(colIndex)??????????//??當前field_index(即colIndex),排位數組中哪些排位位于當前分區val?ranksHere:?List[Long]?=?targetRanks.filter(rank?=>runningTotalCol?<?rank?&&?runningTotalCol?+?colCount?>=?rank)??????????//?計算出當前分區,當前field_index(即colIndex),?滿足排位數組的field_value在當前分區的位置relevantIndexList?++=?ranksHere.map(rank?=>?(colIndex,?rank?-?runningTotalCol))runningTotal(colIndex)?+=?colCount}(partitionIndex,?relevantIndexList.toList)}}
舉個例子:
假如目標排位:targetRanks: [5]
各分區各feild_index數據量:partitionColumnsFreq: [(0, [2, 3]), (1, [4, 1]), (2, [5, 2])]
字段個數:numOfColumns: 2
輸出結果: [(0, []), (1, [(0, 3)]), (2, [(1, 1)])]
(5)
/***?過濾出每個field_index?所需排位的數值*?@param?sortedValueColumnPairs*?@param?ranksLocations?(4)中計算出的滿足排位數組要求的每個分區上,每個field_index在該分區的第幾條數據*?@return*/def?findTargetRanksIteratively(?sortedValueColumnPairs?:?RDD[(Double,?Int)],?ranksLocations?:?Array[(Int,?List[(Int,?Long)])]):RDD[(Int,?Double)]?=?{sortedValueColumnPairs.mapPartitionsWithIndex((partitionIndex?:?Int,?valueColumnPairs?:?Iterator[(Double,?Int)])?=>?{????????//?當前分區上,?滿足排位數組的feild_index及其在該分區上的位置val?targetsInThisPart:?List[(Int,?Long)]?=?ranksLocations(partitionIndex)._2????????if?(targetsInThisPart.nonEmpty)?{??????????//?map中的key為field_index,?value為該feild_index在當前分區中的哪些位置上的數據滿足排位數組要求val?columnsRelativeIndex:?Map[Int,?List[Long]]?=?targetsInThisPart.groupBy(_._1).mapValues(_.map(_._2))val?columnsInThisPart?=?targetsInThisPart.map(_._1).distinct??????????//?存儲各個field_index,?在分區遍歷了多少條數據val?runningTotals?:?mutable.HashMap[Int,?Long]=?new?mutable.HashMap()runningTotals?++=?columnsInThisPart.map(columnIndex?=>?(columnIndex,?0L)).toMap??????????//?遍歷當前分區的數據源,?格式為(field_value,?field_index),?過濾出滿足排位數據要求的數據valueColumnPairs.filter{????????????case(value,?colIndex)?=>lazy?val?thisPairIsTheRankStatistic:?Boolean?=?{????????????????//?每遍歷一條數據,?runningTotals上對應的field_index?當前已遍歷數據量+1val?total?=?runningTotals(colIndex)?+?1LrunningTotals.update(colIndex,?total)columnsRelativeIndex(colIndex).contains(total)}(runningTotals?contains?colIndex)?&&?thisPairIsTheRankStatistic}.map(_.swap)}?else?{Iterator.empty}})}
分析:
(1)這種方法代碼可讀性較差
(2)需要遍歷兩遍原始數據
(3)相比于方案三, 更加有效避免executor內oom
(4)當field_value分布較離散的情況下, 這種方案相比于前三種, 效率更高
(5)上述算法中, 有兩個潛在的問題, 當field_value傾斜情況下(即某個范圍的值特別多),算法效率嚴重依賴于算法描述中的步驟(2)是否能將所有的field_value均勻的分配到各個partition;另一個問題是,當某些field_value重復現象比較多時, 是否可以合并對這些field_value的計數,而不是在一個partition中的iterator中挨個遍歷這些重復數據。
備注:上述內容(問題背景、解決算法)取自《High Performance Spark Best Practices for Scaling and Optimizing Apache Spark》(作者: Holden Karau and Rachel Warren)
免費體驗云安全(易盾)內容安全、驗證碼等服務
更多網易技術、產品、運營經驗分享請點擊。
相關文章:
【推薦】?[翻譯]pytest測試框架(一)
【推薦】?淺談js拖拽
【推薦】?HBase最佳實踐-集群規劃
轉載于:https://www.cnblogs.com/163yun/p/9881058.html
總結
以上是生活随笔為你收集整理的大数据算法:排位问题(2)的全部內容,希望文章能夠幫你解決所遇到的問題。
如果覺得生活随笔網站內容還不錯,歡迎將生活随笔推薦給好友。