sparkGraphX 图操作:pregel(加强的aggregateMessages)
目錄
1、Pregel API:
2、代碼實(shí)現(xiàn):
使用pregal實(shí)現(xiàn)找出源頂點(diǎn)到每個(gè)節(jié)點(diǎn)最小花費(fèi)
使用pregel實(shí)現(xiàn)找出源節(jié)點(diǎn)到每個(gè)節(jié)點(diǎn)的最大深度
1、Pregel API:
圖本身就是內(nèi)在的遞歸的數(shù)據(jù)結(jié)構(gòu),因?yàn)橐粋€(gè)頂點(diǎn)的屬性可能依賴于其neighbor,而neighbor的屬性又依賴于他們的neighbour。所以很多重要的圖算法都會(huì)迭代計(jì)算每個(gè)頂點(diǎn)的屬性,直到達(dá)到一個(gè)穩(wěn)定狀態(tài)。
GraphX中的Pregel操作符是一個(gè)批量同步并行(bulk-synchronous parallel message abstraction)的messaging abstraction,用于圖的拓?fù)浣Y(jié)構(gòu)(topology of the graph)。The Pregel operator executes in a series of super steps in whichvertices receive the sum of their inbound messagesfrom the previous super step,compute a new valuefor the vertex property, and thensend messages to neighboring verticesin the next super step. Message是作為edge triplet的一個(gè)函數(shù)并行計(jì)算的,message的計(jì)算可以使用source和dest頂點(diǎn)的屬性。沒(méi)有收到message的頂點(diǎn)在super step中被跳過(guò)。迭代會(huì)在么有剩余的信息之后停止,并返回最終的圖。
pregel的定義:
def pregel[A]
????(initialMsg: A,//在第一次迭代中每個(gè)頂點(diǎn)獲取的起始
????msgmaxIter: Int = Int.MaxValue,//迭代計(jì)算的次數(shù)
????activeDir: EdgeDirection = EdgeDirection.Out
)(
????vprog: (VertexId, VD, A) => VD,//頂點(diǎn)的計(jì)算函數(shù),在每個(gè)頂點(diǎn)運(yùn)行,根據(jù)頂點(diǎn)的ID,屬性和獲取的inbound message來(lái)計(jì)算頂點(diǎn)的新屬性值。頂一次迭代的時(shí)候,inbound message為initialMsg,且每個(gè)頂點(diǎn)都會(huì)執(zhí)行一遍該函數(shù)。以后只有上次迭代中接收到信息的頂點(diǎn)會(huì)執(zhí)行。
????sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],//應(yīng)用于頂點(diǎn)的出邊(out edges)用于接收頂點(diǎn)發(fā)出的信息
????mergeMsg: (A, A) => A//合并信息的算法
)
算法實(shí)現(xiàn)的大致過(guò)程:
var g = mapVertices((vid, vdata) => vprog(vid, vdata, initMsg)).cache //第一步是根據(jù)initMsg在每個(gè)頂點(diǎn)執(zhí)行一次vprog算法,從而每個(gè)頂點(diǎn)的屬性都會(huì)迭代一次。
var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
var messagesCount = messages.count
var i = 0
while(activeMessages > 0 && i < maxIterations){
????g = g.joinVertices(messages)(vprog).cache
????val oldMessages = messages
????messages = g.mapReduceTriplets(
????????sendMsg,
? ? ? ? mergeMsg,
????????Some((oldMessages, activeDirection))
????).cache()
????activeMessages = messages.count
????i += 1
}
g
pregel算法的一個(gè)實(shí)例:將圖跟一些一些初始的score做關(guān)聯(lián),然后將頂點(diǎn)分?jǐn)?shù)根據(jù)出度大小向外發(fā)散,并自己保留一份:
//將圖中頂點(diǎn)添加上該頂點(diǎn)的出度屬性
val graphWithDegree = graph.outerJoinVertices(graph.outDegrees){
????case (vid, name, deg) => (name, deg match {
????????case Some(deg) => deg+0.0
????????case None => 1.25}
????)
}//將圖與初始分?jǐn)?shù)做關(guān)聯(lián)
val graphWithScoreAndDegree = graphWithDegree.outerJoinVertices(scoreRDD){
????case (vid, (name, deg), score) => (name,deg, score.getOrElse(0.0))
}
graphWithScoreAndDegree.vertices.foreach(x => println("++++++++++++id:"+x._1+"; deg: "+x._2._2+"; score:"+x._2._3))//將圖與初始分?jǐn)?shù)做關(guān)聯(lián)
val graphWithScoreAndDegree = graphWithDegree.outerJoinVertices(scoreRDD){
????case (vid, (name, deg), score) => (name,deg, score.getOrElse(0.0))
}
graphWithScoreAndDegree.vertices.foreach(x => println("++++++++++++id:"+x._1+"; deg: "+x._2._2+"; score:"+x._2._3))
算法的第一步:將0.0(也就是傳入的初始值initMsg)跟各個(gè)頂點(diǎn)的值相加(還是原來(lái)的值),然后除以頂點(diǎn)的出度。這一步很重要,不能忽略。 并且在設(shè)計(jì)的時(shí)候也要考慮結(jié)果會(huì)不會(huì)被這一步所影響。
解釋來(lái)源:https://www.jianshu.com/p/d9170a0723e4
?
2、代碼實(shí)現(xiàn):
使用pregal實(shí)現(xiàn)找出源頂點(diǎn)到每個(gè)節(jié)點(diǎn)最小花費(fèi)
package homeWorkimport org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.graphx.{Edge, Graph, VertexId} import org.apache.spark.graphx.util.GraphGeneratorsobject MapGraphX5 {def main(args: Array[String]): Unit = {//設(shè)置運(yùn)行環(huán)境val conf = new SparkConf().setAppName("Pregel API GraphX").setMaster("local")val sc = new SparkContext(conf)sc.setLogLevel("WARN")// 構(gòu)建圖val myVertices = sc.parallelize(Array((1L, 0), (2L, 0), (3L, 0), (4L, 0),(5L, 0)))val myEdges = sc.makeRDD(Array(Edge(1L, 2L, 2.5),Edge(2L, 3L, 3.6), Edge(3L, 4L, 4.5),Edge(4L, 5L, 0.1), Edge(3L, 5L, 5.2)))val myGraph = Graph(myVertices, myEdges)//設(shè)置源頂點(diǎn)val sourceId: VertexId = 1L//初始化數(shù)據(jù)集,是源頂點(diǎn)就為0.0,不是就設(shè)置為double的正無(wú)窮大val initialGraph = myGraph.mapVertices((id, _) =>if (id == sourceId) 0.0 else Double.PositiveInfinity)/*def pregel[A](initialMsg : A,maxIterations : scala.Int = { /* compiled code */ },activeDirection : org.apache.spark.graphx.EdgeDirection = { /* compiled code */ })(vprog : scala.Function3[org.apache.spark.graphx.VertexId, VD, A, VD],sendMsg : scala.Function1[org.apache.spark.graphx.EdgeTriplet[VD, ED],scala.Iterator[scala.Tuple2[org.apache.spark.graphx.VertexId, A]]],mergeMsg : scala.Function2[A, A, A])(implicit evidence$6 : scala.reflect.ClassTag[A]): org.apache.spark.graphx.Graph[VD, ED] = { /* compiled code */ } */val sssp: Graph[Double, Double] = initialGraph.pregel(//initialMsDouble.PositiveInfinity//maxIterations和activeDirection使用默認(rèn)值)(//vprog 更改數(shù)據(jù)集(id, dist, newDist) => math.min(dist, newDist),//sendMsgtriplet => { // Send Message//尋找1L頂點(diǎn)到每個(gè)頂點(diǎn)的最小花費(fèi)if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {//滿足sum(起始頂點(diǎn)+邊值) 小于 終止頂點(diǎn)當(dāng)前數(shù)據(jù)集中的值,就把sum發(fā)送給終止頂點(diǎn),更新數(shù)據(jù)集的數(shù)據(jù)Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))} else {Iterator.empty}},//mergeMsg 選擇當(dāng)前數(shù)據(jù)和發(fā)送數(shù)據(jù)的最小值傳送(a, b) => math.min(a, b))sssp.vertices.collect.foreach(println(_))} }使用pregel實(shí)現(xiàn)找出源節(jié)點(diǎn)到每個(gè)節(jié)點(diǎn)的最大深度
package pregelimport org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.graphx.{Edge, EdgeDirection, Graph}object Demo2 {def main(args: Array[String]): Unit = {//設(shè)置運(yùn)行環(huán)境val conf = new SparkConf().setAppName("Pregol Api GraphX").setMaster("local")val sc = new SparkContext(conf)sc.setLogLevel("WARN")// 構(gòu)建圖val myVertices = sc.parallelize(Array((1L, "張三"), (2L, "李四"), (3L, "王五"), (4L, "錢六"),(5L, "領(lǐng)導(dǎo)")))val myEdges = sc.makeRDD(Array( Edge(1L,2L,"朋友"),Edge(2L,3L,"朋友") , Edge(3L,4L,"朋友"),Edge(4L,5L,"上下級(jí)"),Edge(3L,5L,"上下級(jí)")))val myGraph = Graph(myVertices,myEdges)val g = myGraph.mapVertices((vid,vd)=>0)var newGraph: Graph[Int, String] = g.pregel(0)((id, attr, maxValue) => maxValue,triplet => { // Send Messageif (triplet.srcAttr + 1 > triplet.dstAttr) {Iterator((triplet.dstId, triplet.srcAttr + 1))} else {Iterator.empty}},(a: Int, b: Int) => math.max(a, b))newGraph.vertices.collect.foreach(println(_))}}
?
總結(jié)
以上是生活随笔為你收集整理的sparkGraphX 图操作:pregel(加强的aggregateMessages)的全部?jī)?nèi)容,希望文章能夠幫你解決所遇到的問(wèn)題。
- 上一篇: 蛮力法/任务分配
- 下一篇: java练习-String类、Strin