沉沙
2018-09-28
来源 :
阅读 1093
评论 0
摘要:本篇教程介绍了大数据应用 Spark GraphX实例(3),希望阅读本篇文章以后大家有所收获,帮助大家对大数据云计算大数据应用的理解更加深入。
本篇教程介绍了大数据应用 Spark GraphX实例(3),希望阅读本篇文章以后大家有所收获,帮助大家对大数据云计算大数据应用的理解更加深入。
<
7. 图的聚合操作
图的聚合操作主要的方法有:
(1) Graph.mapReduceTriplets():该方法有一个mapFunc和一个reduceFunc,mapFunc对图中的每一个EdgeTriplet进行处理,生成一个或者多个消息,并且将这些消息发送个Edge的一个或者两个顶点,reduceFunc对发送到每一个顶点上的消息进行合并,生成最终的消息,最后返回一个VertexRDD(不包括没有收到消息的顶点);
(2) Graph.pregel():该方法采用BSP模型,包括三个函数vprog、sendMsg和mergeMsg,vprog是运行在每个节点上的顶点更新函数,接收消息,然后对顶点属性更新,sendMsg生成发送给下一次迭代的消息,mergeMsg对同一个顶点接收到的多个消息进行合并,迭代一直进行到收敛,或者达到了设置的最大迭代次数为止。
代码:
// 聚合操作
println("*************************************************************")
println("聚合操作")
println("*************************************************************")
println("找出年纪最大的追求者:")
val oldestFollower:VertexRDD[(String,Int)] = userGraph.mapReduceTriplets[(String,Int)](
// 将源顶点的属性发送给目标顶点,map过程
edge => Iterator((edge.dstId,(edge.srcAttr.name,edge.srcAttr.age))),
// 得到最大追求者,reduce过程
(a,b) => if(a._2>b._2) a else b
)
userGraph.vertices.leftJoin(oldestFollower){(id,user,optOldestFollower) =>
optOldestFollower match{
case None => s"${user.name} does not have any followers."
case Some(oldestAge) => s"The oldest age of ${user.name} \'s followers is ${oldestAge._2}(${oldestAge._1})."
}
}.collect.foreach{case(id,str) => println(str)}
println
// 找出追求者的平均年龄
println("找出追求者的平均年龄:")
val averageAge:VertexRDD[Double] = userGraph.mapReduceTriplets[(Int,Double)](
// 将源顶点的属性(1,Age)发送给目标顶点,map过程
edge => Iterator((edge.dstId,(1,edge.srcAttr.age.toDouble))),
// 得到追求者的数量和总年龄
(a,b) => ((a._1+b._1),(a._2+b._2))
).mapValues((id,p) => p._2/p._1)
userGraph.vertices.leftJoin(averageAge){(id,user,optAverageAge) =>
optAverageAge match{
case None => s"${user.name} does not have any followers."
case Some(avgAge) => s"The average age of ${user.name} \'s followers is $avgAge."
}
}.collect.foreach{case(id,str) => println(str)}
println
// 聚合操作2
println("*************************************************************")
println("聚合操作2")
println("*************************************************************")
println("找出3到各顶点的最短距离:")
// 定义源点
val sourceId:VertexId = 3L
val initialGraph = graph.mapVertices((id,_) => if(id==sourceId) 0.0 else Double.PositiveInfinity)
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
(id,dist,newDist) => math.min(dist,newDist),
// 权重计算
triplet=>{
if(triplet.srcAttr + triplet.attr < triplet.dstAttr){
Iterator((triplet.dstId, triplet.srcAttr+triplet.attr))
} else{
Iterator.empty
}
},
// 最短距离
(a,b) => math.min(a,b)
)
println(sssp.vertices.collect.mkString("\n"))
运行结果:
*************************************************************
聚合操作
*************************************************************
找出年纪最大的追求者:
The oldest age of Peter 's followers is 27(Henry).
The oldest age of Kate 's followers is 55(Charlie).
The oldest age of Henry 's followers is 55(Charlie).
The oldest age of Alice 's followers is 32(Peter).
The oldest age of Charlie 's followers is 35(Mike).
Mike does not have any followers.
找出追求者的平均年龄:
The average age of Peter 's followers is 27.0.
The average age of Kate 's followers is 45.0.
The average age of Henry 's followers is 45.0.
The average age of Alice 's followers is 29.5.
The average age of Charlie 's followers is 35.0.
Mike does not have any followers.
*************************************************************
聚合操作2
*************************************************************
找出3到各顶点的最短距离:
(4,9.0)
(6,3.0)
(2,7.0)
(1,10.0)
(3,0.0)
(5,Infinity)
本文由职坐标整理并发布,希望对同学们有所帮助。了解更多详情请关注职坐标大数据云计算大数据应用频道!
喜欢 | 0
不喜欢 | 0
您输入的评论内容中包含违禁敏感词
我知道了

请输入正确的手机号码
请输入正确的验证码
您今天的短信下发次数太多了,明天再试试吧!
我们会在第一时间安排职业规划师联系您!
您也可以联系我们的职业规划师咨询:
版权所有 职坐标-一站式AI+学习就业服务平台 沪ICP备13042190号-4
上海海同信息科技有限公司 Copyright ©2015 www.zhizuobiao.com,All Rights Reserved.
沪公网安备 31011502005948号