Pregel

Pregel 是 Google 自 2009 年开始对外公开的图计算算法和系统, 主要用于解决无法在单机环境下计算的大规模图论计算问题

pregel封装源码

首先需要了解一下几个概念

  • 顶点的状态
    激活态和钝化态。
  • 顶点激活的条件
    成功发送一条消息,或者成功接收一条消息
def pregel[A: ClassTag](initialMsg: A, // 参数初始消息maxIterations: Int = Int.MaxValue, // 最大迭代次数activeDirection: EdgeDirection = EdgeDirection.Either) // 发送消息的边的方向(默认是沿边方向出)(vprog: (VertexId, VD, A) => VD, // 节点处理消息的函数,其作用是接受消息,并进行处理,根据处理结果更新节点属性sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], //节点发送消息的函数,其作用是根据所定义的标准,判断是否向邻居节点发送消息,如果满足条件,发送Iterator[(目标节点id,消息)];如果不满足条件,发送Iterator.emptymergeMsg: (A, A) => A) // 消息合并函数,由于图中每一个节点可能有多个邻居与它连接,所以可能每一个节点会接收到多个节点发送来的消息,该函数就是将接收到的多个消息进行合并处理: Graph[VD, ED] = {Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg)}

pregel底层源码

源码中用到一些函数,了解他们有助于对代码的理解

  • mapReduceTriplets
def mapReduceTriplets[VD: ClassTag, ED: ClassTag, A: ClassTag](g: Graph[VD, ED],mapFunc: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],reduceFunc: (A, A) => A,activeSetOpt: Option[(VertexRDD[_], EdgeDirection)] = None): VertexRDD[A]

该mapReduceTriplets运算符将用户定义的map函数作为输入,并且将map作用到每个triplet,并可以得到triplet上所有的顶点的信息。用户定义的reduce功能将合并所有目标顶点相同的信息。该mapReduceTriplets操作返回VertexRDD [A] ,包含所有以每个顶点作为目标节点集合消息(类型A),没有收到消息的顶点不包含在返回VertexRDD。
activeSetOpt参数指定了哪些和顶点相邻的边包含在map阶段。如果该方向是in,则用户定义的mpa函数将仅仅作用目标顶点在活跃集合中的边。如果方向是 out,则该map函数将仅仅作用在那些源顶点在活跃集中的边。如果方向是either,则map 函数将仅在任一顶点在活跃集中的边。如果方向是 both,则map函数将仅作用在两个顶点都在活跃集中。

object Pregel extends Logging { // 共七个参数,一个图和上面封装的六个参数,返回值是一个graphdef apply[VD: ClassTag, ED: ClassTag, A: ClassTag](graph: Graph[VD, ED],initialMsg: A, //  初始化消息maxIterations: Int = Int.MaxValue, // 最大迭代次数,默认为int类型的最大值activeDirection: EdgeDirection = EdgeDirection.Either) // 边的活跃方向,默认为either,即出度边或入度边都可以(vprog: (VertexId, VD, A) => VD, // 计算节点属性VD和合并后的消息A,生成新的属性 sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], // 发送消息mergeMsg: (A, A) => A) // 一个节点可能收到多条消息,需要合并消息,若只有一条消息,则不调用该函数: Graph[VD, ED] ={require(maxIterations > 0, s"Maximum number of iterations must be greater than 0," +s" but got ${maxIterations}")var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache() // 用初始化消息初次迭代vprog函数,激活所有节点,仅在初始化时使用// compute the messages 根据发送、聚合信息的函数计算下次迭代用的信息var messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg)var activeMessages = messages.count() // 统计活跃节点的个数// Loop 循环,不断迭代初始化后的图var prevG: Graph[VD, ED] = nullvar i = 0while (activeMessages > 0 && i < maxIterations) { // 迭代终止条件为活跃节点的个数为0或者超过最大迭代次数// Receive the messages and update the vertices.prevG = gg = g.joinVertices(messages)(vprog).cache() // join消息,用vprog产生的值替换原来节点的属性val oldMessages = messages// Send new messages, skipping edges where neither side received a message. We must cache// messages so it can be materialized on the next line, allowing us to uncache the previous// iteration.messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache() // Some((oldMessages, activeDirection))参数的作用是:它使我们在发送新的消息时,会忽略掉那些两端都没有接收到消息的边,减少计算量// The call to count() materializes `messages` and the vertices of `g`. This hides oldMessages// (depended on by the vertices of g) and the vertices of prevG (depended on by oldMessages// and the vertices of g).activeMessages = messages.count() // 从新计算一轮迭代后活跃节点个数logInfo("Pregel finished iteration " + i)// Unpersist the RDDs hidden by newly-materialized RDDs oldMessages.unpersist(blocking = false)  // 旧消息和旧图的释放prevG.unpersistVertices(blocking = false)prevG.edges.unpersist(blocking = false)// count the iterationi += 1}messages.unpersist(blocking = false)g} // end of apply} // end of class Pregel

示例

计算5号顶点到其他节点最短的距离

import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSessionobject PregelDemo {def main(args: Array[String]): Unit = {val spark: SparkSession = SparkSession.builder().appName("pregel").master("local[*]").getOrCreate()val sc: SparkContext = spark.sparkContextimport spark.implicits._// 顶点val vertexArray = Array((1L, ("Alice", 28)),(2L, ("Bob", 27)),(3L, ("Charlie", 65)),(4L, ("David", 42)),(5L, ("Ed", 55)),(6L, ("Fran", 50)))val vertices: RDD[(Long, (String, Int))] = sc.makeRDD(vertexArray)// 边val edgeArray = sc.parallelize(Array(Edge(2L, 1L, 7),Edge(2L, 4L, 2),Edge(3L, 2L, 4),Edge(3L, 6L, 3),Edge(4L, 1L, 1),Edge(2L, 5L, 2),Edge(5L, 3L, 8),Edge(5L, 6L, 3)))// 生成graphval graph = Graph(vertices,edgeArray)// 初始化起始顶点idval srcVertexId = 5L// 顶点5到自己的距离为0,其他顶点都设为正无穷大Double.PositiveInfinityval initialGraph: Graph[Double, Int] = graph.mapVertices((vid, vd) => {if (srcVertexId == vid)0elseDouble.PositiveInfinity})// 调用封装好的pregelval pregelGraph: Graph[Double, PartitionID] = initialGraph.pregel(Double.PositiveInfinity,  // 初始化信息Int.MaxValue, // 最大迭代次数EdgeDirection.Out // 边的方向,这里定义为出度边方向)((vid:VertexId, vd: Double, disMsg: Double) => {  // vprog函数,此处为取当前节点属性和消息的最小值val minDist = math.min(vd, disMsg)println(s"顶点${vid},属性${vd},收到消息${disMsg},合并后的属性${minDist}") // 打印一下,方便理解minDist // 将vd, disMsg中较小的值当做返回值},(edgeTriplet: EdgeTriplet[Double, PartitionID]) => { // sendMsg函数,只有当源节点的属性值+边的值<目标节点的属性值时才发送消息,否则不发送if (edgeTriplet.srcAttr + edgeTriplet.attr < edgeTriplet.dstAttr) {println(s"顶点${edgeTriplet.srcId} 给 顶点${edgeTriplet.dstId} 发送消息 ${edgeTriplet.srcAttr + edgeTriplet.attr}")Iterator[(VertexId, Double)]((edgeTriplet.dstId, edgeTriplet.srcAttr + edgeTriplet.attr))} else {Iterator.empty}},(msg1: Double, msg2: Double) => math.min(msg1, msg2) // 收到多个消息时,取值最小的)pregelGraph.triplets.collect().foreach(println) // 查看}
}

结果:

//  各个顶点接受初始消息initialMsg
顶点5,属性0.0,收到消息Infinity,合并后的属性0.0
顶点3,属性Infinity,收到消息Infinity,合并后的属性Infinity
顶点1,属性Infinity,收到消息Infinity,合并后的属性Infinity
顶点6,属性Infinity,收到消息Infinity,合并后的属性Infinity
顶点2,属性Infinity,收到消息Infinity,合并后的属性Infinity
顶点4,属性Infinity,收到消息Infinity,合并后的属性Infinity// 第一次迭代
顶点5 给 顶点3 发送消息 8.0
顶点5 给 顶点6 发送消息 3.0
顶点3,属性Infinity,收到消息8.0,合并后的属性8.0
顶点6,属性Infinity,收到消息3.0,合并后的属性3.0// 第二次迭代
顶点3 给 顶点2 发送消息 12.0
顶点2,属性Infinity,收到消息12.0,合并后的属性12.0// 第三次迭代
顶点2 给 顶点4 发送消息 14.0
顶点2 给 顶点1 发送消息 19.0
顶点4,属性Infinity,收到消息14.0,合并后的属性14.0
顶点1,属性Infinity,收到消息19.0,合并后的属性19.0
// 第四次迭代
顶点4 给 顶点1 发送消息 15.0
顶点1,属性19.0,收到消息15.0,合并后的属性15.0
// 第五次迭代 由于条件不满足故退出// 输出结果
((2,12.0),(1,15.0),7) // 含义为5到2的距离最短为12,5到1的距离最短为15,2到1的距离为7
((2,12.0),(4,14.0),2)
((3,8.0),(2,12.0),4)
((3,8.0),(6,3.0),3)
((4,14.0),(1,15.0),1)
((2,12.0),(5,0.0),2)
((5,0.0),(3,8.0),8)
((5,0.0),(6,3.0),3)

Spark GraphX之pregel相关推荐

  1. pregel 与 spark graphX 的 pregel api

    简介 在Hadoop兴起之后,google又发布了三篇研究论文,分别阐述了了Caffeine.Pregel.Dremel三种技术,这三种技术也被成为google的新"三驾马车",其 ...

  2. Spark GraphX算法 - Pregel算法

    1.官网地址 http://spark.apache.org/docs/latest/graphx-programming-guide.html#pregel-api 2.demo样例

  3. Spark GraphX 中的 pregel函数(转载)

    文章目录 pregel函数源码 与 各个参数介绍: 案例: 求顶点5 到 其他各顶点的 最短距离 pregel原理分析 一篇关于 Spark GraphX 中 pregel函数 的笔记,通过一个小案例 ...

  4. 顶会论文阅读-22年CCF A级别spark graphX研究

    这篇文章是从dblp上面自行下载的唐老师发的A类文章,主要讲的是对spark源码当中sparkgraphX模块的优化: incgraph:基于Spark GraphX的分布式增量图计算模型和框架: 原 ...

  5. Spark Graphx Pregel(pregel参数详解,pregel调用实现过程的详细解释)

    Spark Graphx Pregel 一.Pregel概述 1.什么是pregel? 2.pregel应用场景 二.Pregel源码及参数解释 1.源码 2.参数详细解释 (1)initialMsg ...

  6. Spark GraphX Pregel 应用

    一.Pregel介绍 Pregel是一种基于BSP模型实现的并行图处理系统. BSP(Bulk Synchronous Parallel Computing Model,块同步并行计算模型,又称&qu ...

  7. 大数据——GraphX之Pregel算法原理及Spark实现

    GraphX之Pregel算法原理及Spark实现 Pregel 案例:求顶点5到其他各点的最短距离 Pregel原理分析 Pregel 源码 def pregel[A: ClassTag](init ...

  8. Spark GraphX 中的PageRank算法、pregel函数、航班飞行网图分析

    PageRank算法 PageRank算法原理剖析及Spark实现 - 简书 (jianshu.com) import org.apache.spark.SparkContext import org ...

  9. Spark GraphX相关使用方法

    Spark GraphX是一个分布式图处理框架,Spark GraphX基于Spark平台提供对图计算和图挖掘简洁易用的而丰富多彩的接口,极大的方便了大家对分布式图处理的需求.Spark GraphX ...

最新文章

  1. SugarCRM 主表-自定义字段
  2. 计算机组成解疑补漏之SDR、DDR及相关计算
  3. php的toast,使用toast组件实现提示用户忘记输入用户名或密码功能
  4. win10树莓派改ip_Window 10通过网线和Wifi连接树莓派
  5. python默认编码方式_关于设置python默认编码方式的问题
  6. java 反射 配置文件_简单模仿配置文件的反射机制
  7. Codeforces Round #552 (Div. 3)
  8. Java飞机大战项目
  9. openssl 加盐_nodejs-md5加盐到解密比对
  10. 如何将图片压缩到200K以内,有什么好方法吗?
  11. 模块度Q——复杂网络社区划分评价标准
  12. uniapp获取当前经纬度 地图 支持搜索
  13. [CF235C] Cyclical Quest
  14. spring中 @EnableXXX 注解的实现
  15. 【课程设计】8086汇编实现打字小游戏
  16. Android结束进程的几种方法
  17. php操作Word之com组件-获取word文档页码和更新目录
  18. java放3个按钮_java编写三个按钮
  19. JCE cannot authenticate the provider BC问题解决
  20. 如何将曲谱的各音符转换成频率数组和持续时间

热门文章

  1. Apriori算法详解与实现
  2. 学生出勤率平时成绩java_学员考勤、作业规定及平时成绩评定办法
  3. 模糊时间的柔性车间调度问题-Python实现遗传算法求解
  4. 记一些c++的新手入门实战写法
  5. OSChina 周一乱弹 ——妹子知道去哪儿找男友了么
  6. 西门子plc vb和c语言区别,三菱plc 与西门子plc 的区别及优缺点
  7. Antlr4介绍和Helloworld
  8. GitHub 下载慢的终极解决办法,简单到爆
  9. AI笔记整理篇 -程序员35岁中年危机是真的吗?
  10. 今天,阿里巴巴报告厅里只谈快乐的数学