Hama学习总结
Hama學(xué)習(xí)筆記
1. Hama定義
Hama是基于HDFS上的BSP模型實現(xiàn),其執(zhí)行不須要MapReduce。
例證例如以下: 在單點調(diào)試的Hama系統(tǒng)上,僅僅執(zhí)行NameNode、DataNode、BSPMasterRunner、GroomServerRunner和 ZooKeeperRunner進程。就可以執(zhí)行PageRank程序。
2.MapReduce與BSP差別
運行機制:MapReduce是一個數(shù)據(jù)流模型,每一個任務(wù)僅僅是對輸入數(shù)據(jù)進行處理,產(chǎn)生的輸出數(shù)據(jù)作為還有一個任務(wù)的輸入數(shù)據(jù)。并行任務(wù)之間獨立地進行,串行任務(wù)之間以磁盤和數(shù)據(jù)復(fù)制作為交換介質(zhì)和接口。
BSP是一個狀態(tài)模型,各個子任務(wù)在本地的子圖數(shù)據(jù)上進行計算、通信、改動圖的狀態(tài)等操作。并行任務(wù)之間通過消息通信交流中間計算結(jié)果,不須要像MapReduce那樣對全體數(shù)據(jù)進行復(fù)制。
迭代處理:MapReduce模型理論上須要連續(xù)啟動若干作業(yè)才干夠完畢圖的迭代處理,相鄰作業(yè)之間通過分布式文件系統(tǒng)交換所有數(shù)據(jù)。BSP模型僅啟動一個作業(yè)。利用多個超步就能夠完畢迭代處理。兩次迭代之間通過消息傳遞中間計算結(jié)果。
因為降低了作業(yè)啟動、調(diào)度開銷和磁盤存取開銷,BSP模型的迭代運行效率較高。
數(shù)據(jù)切割:基于BSP的圖處理模型,須要對載入后的圖數(shù)據(jù)進行一次再分布的過程,以確定消息通信時的路由地址。比如,各任務(wù)并行載入數(shù)據(jù)過程中。依據(jù)一定的映射策略。將讀入的數(shù)據(jù)又一次分發(fā)到相應(yīng)的計算任務(wù)上(一般是放在內(nèi)存中),既有磁盤I/O又有網(wǎng)絡(luò)通信,開銷非常大。可是一個BSP作業(yè)僅需一次數(shù)據(jù)切割,在之后的迭代計算過程中除了消息通信之外。不再須要進行數(shù)據(jù)的遷移。而基于MapReduce的圖處理模型。普通情況下,不須要專門的數(shù)據(jù)切割處理。可是Map階段和Reduce階段次年在中間結(jié)果的Shuffle過程。添加了磁盤I/O和網(wǎng)絡(luò)通信開銷。
總結(jié):MapReduce發(fā)送數(shù)據(jù)+消息。而Hama僅僅發(fā)送消息。在Hama的超步迭代過程中,當(dāng)某個BSPPeer收到其它BSPPeer發(fā)送過來的某頂點的消息。進行消息處理,而后要把處理結(jié)果發(fā)送到該節(jié)點的鄰接節(jié)點,因此該節(jié)點的數(shù)據(jù)信息也必須存在該BSPPeer中,故必須在對數(shù)據(jù)載入到內(nèi)存時進行一次Hash再分布。
以下分析Hama中數(shù)據(jù)再分布的機制,源代碼位于GraphJobRunner.loadVertices()方法中。首先獲取每一個BSPPeer的數(shù)據(jù)分片大小splitSize。舉比例如以下表1所看到的:
表 1 BSPPeer數(shù)據(jù)量信息
|
Peer序號 |
BSPPeer1 |
BSPPeer2 |
BSPPeer3 |
|
數(shù)據(jù)量 |
62M |
64M |
54M |
在GraphJobRunner.partitionMultiSteps(BSPPeer,splitSize)方法中,每一個BSPPeer把自己的splitSize發(fā)送給MasterPeer。
進行同步后,在MasterPeer上找到最大全部BSPPeer上最大的splitSize賦值給maxSplitSize,即maxSplitSize等于BSPPeer2上的64M。
然后依照例如以下公式計算計算數(shù)據(jù)載入后Hash再分布的同步次數(shù)steps:
maxSplitSize/conf.getLong("hama.graph.multi.step.partitioning.interval",20000000) +1
由此公式可知。用戶可配置hama.graph.multi.step.partitioning.interval的大小。但在hama-default.xml未找到此項。
hama.graph.multi.step.partitioning.interval含義:表示Hash再分布時進行同步的最大塊單元,默認(rèn)是20M。
steps = 64M / 20M + 1 = 4 (進行4次同步)
然后MasterPeer把該steps值發(fā)送給全部的BSPPeer。并在每一個BSPPeer中賦值給GraphJobRunner. partitioningSteps變量(值為4)。
在每一個BSPPeer計算各自的Hash再分布時的塊同步單元:interval = splitSize / partitioningSteps。計算結(jié)果例如以下表 2所看到的:
表 2 每一個BSPPeer進行Hash再分布的塊信息
|
Peer序號 |
BSPPeer1 |
BSPPeer2 |
BSPPeer3 |
|
數(shù)據(jù)量 |
62M |
64M |
54M |
|
partitioningSteps值 |
4 |
4 |
4 |
|
Interval值 |
15M |
16M |
13M |
|
每次同步塊大小(M) |
15、15、15、17 |
16、16、16、16 |
13、13、13、15 |
每一個BSPPeer依次從HDFS上讀取數(shù)據(jù),并依據(jù)Hash進行發(fā)送(每讀入一個頂點就發(fā)送一次),當(dāng)發(fā)送量達(dá)到自己的塊同步單元后(BSPPeer1:15M,BSPPeer2:16M,BSPPeer3:13M)。進行一次同步。各BSPPeer把接受到的數(shù)據(jù)載入的內(nèi)存中,即存儲于GraphJobRunner.Vertices變量中。按此進行3(partitioningSteps-1)次。
最后一次中,BSPPeer1發(fā)送17M數(shù)據(jù),BSPPeer2發(fā)送16M數(shù)據(jù)。BSPPeer3發(fā)送15M數(shù)據(jù),再進行同步,而后載入到GraphJobRunner.Vertices中。
數(shù)據(jù)Hash重分布之后,每一個BSPPeer上的頂點vertices大小分布可能例如以下表3所看到的,當(dāng)中如果每一個頂點的大小40byte(實際每一個頂點大小會不同,如PageRank。
此處僅僅是為了舉例說明算法)。再補充GraphJobRunner中vertices的定義:
List<Vertex<V, E, M>> vertices =new ArrayList<Vertex<V, E, M>>()
表 3 BSPPeer進行Hash重分布后Vertices.size信息
|
Peer序號 |
BSPPeer1 |
BSPPeer2 |
BSPPeer3 |
|
數(shù)據(jù)量 |
80.2 M |
40.16 M |
59.64 M |
|
Vertices.size |
2005 K |
1004 K |
1491 K |
以下闡述Hama的數(shù)據(jù)修復(fù)(Repair)機制。源代碼位于GraphJobRunner. repair()方法中,此方法在loadVertices()方法的最后調(diào)用。
先用單個BSPPeer上的樣例介紹數(shù)據(jù)修復(fù)的概念。
如對于PageRank,眼下實際有四個頂點。例如以下圖1所看到的。而用戶輸入的數(shù)據(jù)例如以下:
1 2 3
2 3
3 1 4
圖 1 PageRank圖
但用戶沒有寫4頂點的信息。應(yīng)該寫為: 4 鄰接頂點,當(dāng)其鄰接邊為空的時候,也應(yīng)該寫為:4 空(實際不寫“空”,為了文檔描寫敘述方便)。數(shù)據(jù)修復(fù)的目的就是添加:“4 邊空”這條信息。事實上是把4頂點作為懸掛頂點來處理。
在超步(S-1)中3頂點會把其PR值的1/2發(fā)送給頂點4應(yīng)該所在的BSPPeer(實際沒有4頂點的信息)。在超步S中,若數(shù)據(jù)載入時沒有進行過數(shù)據(jù)修復(fù),則BSPPeer沒有4頂點的信息,不如直接把其臨邊作為空處理即可。這和數(shù)據(jù)修復(fù)效果一樣。這樣做不是更加簡單嗎?為什么要花那么大的代價進行數(shù)據(jù)修復(fù)呢?
解釋:上述在計算過程中直接把其鄰接邊作為空的方案是不對的。由于在計算頂點總數(shù)(等于每一個BSPPeer上的Vertices.size之和)時就會出錯,導(dǎo)致給每一個頂點的初始值就會出錯。然后再導(dǎo)致aggregator出錯。
每一個BSPPeer獲取其上Vertices的大小,都發(fā)送給MasterPeer。
在MasterPeer上找到最小的minVerticesSize,再計算數(shù)據(jù)修復(fù)時的同步次數(shù)multiSteps。公式例如以下:
multiSteps = min { minVerticesSize , ( partitioningSteps * 2 ) }
分析:一般minVerticesSize都大于( partitioningSteps* 2 )。如對上例minVerticesSize的大小為1000k,而( partitioningSteps *2 ) = 4*2 = 8,故multiSteps的值為8。
然后MasterPeer把此值發(fā)送給全部的BSPPeer,每一個BSPPeer存儲于自己的變量multiSteps中。在每一個BSPPeer計算各自數(shù)據(jù)修復(fù)時的塊同步單元:vertices.size/
multiSteps。注意:此時進行同步的單元不是數(shù)據(jù)量大小,而是頂點的數(shù)目。計算結(jié)果例如以下所看到的。
表 4 每一個BSPPeer進行數(shù)據(jù)修復(fù)時的同步信息
|
Peer序號 |
BSPPeer1 |
BSPPeer2 |
BSPPeer3 |
|
數(shù)據(jù)量 |
80.2 M |
40.16 M |
59.64 M |
|
Vertices.size |
2005 K |
1004 K |
1491 K |
|
頂點同步單元 |
250 K |
125 K |
186 K |
|
multiSteps次同步后剩余 |
5 k |
4 k |
3 k |
每一個BSPPeer依次從內(nèi)存(vertices變量)上讀取每一個頂點,獲取其鄰接頂點后,再依據(jù)其Hash值把鄰接頂點的id發(fā)送到對應(yīng)的BSPPeer上。當(dāng)發(fā)送頂點的數(shù)目達(dá)到各自的同步單元后(BSPPeer1:250 K,BSPPeer2:125 K。BSPPeer3:186 K),進行一次同步。各BSPPeer把接收到的數(shù)據(jù)存儲于暫時變量tmp(其定義為:new HashMap<V, Vertex<V, E, M>>()。V用來存儲鄰接頂點的id,Vertex是以鄰接頂點id為VertexID且Edges為空的頂點)中。
按此進行multiSteps次(8)。注意:與數(shù)據(jù)載入后Hash再分布時的(partitioningSteps-1)次不同。
進行multiSteps后,三個BSPPeer節(jié)點依舊剩余5 K、4 K 、3 K。再進行最后一次同步。各BSPPeer依舊后收到的數(shù)據(jù)載入到tmp變量中。
然后每一個BSPPeer掃描自己的vertices。把VertexID屬于tmp的從tmp中刪除。
最后把tmp中剩余的頂點相應(yīng)的Vertex(以鄰接頂點id為VertexID且Edges為空)增加到GraphJobRunner.Vertices中,至此數(shù)據(jù)修復(fù)完畢。
總結(jié)
- 上一篇: 优美的段落摘抄大全250个
- 下一篇: 小清新的网名92个