GraphX之Pregel算法原理及Spark实现

  • Pregel
  • 案例:求顶点5到其他各点的最短距离
  • Pregel原理分析

Pregel

  • 源码
  def pregel[A: ClassTag](initialMsg: A,maxIterations: Int = Int.MaxValue,activeDirection: EdgeDirection = EdgeDirection.Either)(vprog: (VertexId, VD, A) => VD,sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],mergeMsg: (A, A) => A): Graph[VD, ED] = {Pregel(graph, initialMsg, maxIterations, activeDirection)(vprog, sendMsg, mergeMsg)}
参数 说明
initialMsg 图初始化的时候,开始模型计算的时候,所有节点都会先收到一个消息
maxlterations 最大迭代次数
activeDirection 规定了发送消息的方向
vprog 节点调用该消息将聚合后的数据和本节点进行属性的合并
sendMsg 激活态的节点调用该方法发送消息
mergeMsg 如果一个节点接收到多条消息,先用mergeMsg来将多条消息聚合成为一条消息,如果节点只收到一条消息,则不调用该函数

案例:求顶点5到其他各点的最短距离

  • 顶点的状态有两种:

    • (1)钝化态【类似于休眠,不做任何事情】
    • (2)激活态【干活】
  • 顶点能够处于激活态需要有的条件
    • (1)成功收到消息或者发送了任何一条消息
  • 代码展示
package nj.zb.kb09.suanfa
import org.apache.spark.SparkContext
import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
object PregelDemo2 {def main(args: Array[String]): Unit = {//创建SparkContextval spark: SparkSession = SparkSession.builder().master("local[*]").appName("PregelDemo2").getOrCreate()val sc: SparkContext = spark.sparkContext//创建顶点val vertexArray = Array((1L, ("Alice", 28)),(2L, ("Bob", 27)),(3L, ("Charlie", 65)),(4L, ("David", 42)),(5L, ("Ed", 55)),(6L, ("Fran", 50)))val vertexRDD: RDD[(VertexId, (String,Int))] = sc.makeRDD(vertexArray)//创建边,边的属性代表 相邻两个顶点之间的距离val edgeArray = Array(Edge(2L, 1L, 7),Edge(2L, 4L, 2),Edge(3L, 2L, 4),Edge(3L, 6L, 3),Edge(4L, 1L, 1),Edge(2L, 5L, 2),Edge(5L, 3L, 8),Edge(5L, 6L, 3))val edgeRDD: RDD[Edge[Int]] = sc.makeRDD(edgeArray)//创建图val graph1 = Graph(vertexRDD,edgeRDD)/* ************************** 使用pregle算法计算 ,顶点5 到 各个顶点的最短距离 ************************** *///被计算的图中 起始顶点idval srcVertexId=5L//给各个顶点的属性初始化,顶点5到自己距离为0,到其他顶点都是无穷大val initialGraph: Graph[Double, PartitionID] = graph1.mapVertices {case (vid, (name, age)) => if (vid == srcVertexId) {0.0} else {Double.PositiveInfinity}}//调用pregelval pregelGraph: Graph[Double, PartitionID] = initialGraph.pregel(Double.PositiveInfinity,Int.MaxValue,EdgeDirection.Out)((vid: VertexId, vd: Double, distMsg: Double) => {val minDist: Double = math.min(vd, distMsg)println(s"顶点${vid},属性${vd},收到消息${distMsg},合并后的属性${minDist}")minDist},(edgeTriplet: EdgeTriplet[Double, PartitionID]) => {if (edgeTriplet.srcAttr + edgeTriplet.attr < edgeTriplet.dstAttr) {println(s"顶点${edgeTriplet.srcId} 给 顶点${edgeTriplet.dstId} 发送消息 ${edgeTriplet.srcAttr + edgeTriplet.attr}")Iterator[(VertexId, Double)]((edgeTriplet.dstId, edgeTriplet.srcAttr + edgeTriplet.attr))} else {Iterator.empty}},(msg1: Double, msg2: Double) => math.min(msg1, msg2))//输出结果pregelGraph.triplets.collect.foreach(println)}
}

结果展示:

//-----------------各个顶点接受初始消息initialMsg---------------------
顶点4,属性Infinity,收到消息Infinity,合并后的属性Infinity
顶点5,属性0.0,收到消息Infinity,合并后的属性0.0
顶点1,属性Infinity,收到消息Infinity,合并后的属性Infinity
顶点2,属性Infinity,收到消息Infinity,合并后的属性Infinity
顶点3,属性Infinity,收到消息Infinity,合并后的属性Infinity
顶点6,属性Infinity,收到消息Infinity,合并后的属性Infinity
//-------------------------第一次迭代---------------------------------
顶点5 给 顶点3 发送消息 8.0
顶点5 给 顶点6 发送消息 3.0
顶点3,属性Infinity,收到消息8.0,合并后的属性8.0
顶点6,属性Infinity,收到消息3.0,合并后的属性3.0
//-------------------------第二次迭代---------------------------------
顶点3 给 顶点2 发送消息 12.0
顶点2,属性Infinity,收到消息12.0,合并后的属性12.0
//-------------------------第三次迭代---------------------------------
顶点2 给 顶点4 发送消息 14.0
顶点2 给 顶点1 发送消息 19.0
顶点1,属性Infinity,收到消息19.0,合并后的属性19.0
顶点4,属性Infinity,收到消息14.0,合并后的属性14.0
//-------------------------第四次迭代---------------------------------
顶点4 给 顶点1 发送消息 15.0
顶点1,属性19.0,收到消息15.0,合并后的属性15.0
//------------=-------第五次迭代不用发送消息---------------------------
((2,12.0),(1,15.0),7)
((2,12.0),(4,14.0),2)
((3,8.0),(2,12.0),4)
((3,8.0),(6,3.0),3)
((4,14.0),(1,15.0),1)
((2,12.0),(5,0.0),2)
((5,0.0),(3,8.0),8)
((5,0.0),(6,3.0),3)

Pregel原理分析

调用pregel方法之前,先把图的各个顶点的属性初始化,如下图所示:顶点5到自己的距离为0,所以设为0,其他顶点都设为正无穷大Double.PositiveInfinity。

  • 当调用pregel方法开始

首先,所有顶点都将接收到一条初始消息initialMsg,使所有顶点都处于激活态(红色标识的节点)。

  • 第一次迭代开始

所有顶点以EdgeDirection.Out的边方法调用sendMsg方法发送消息给目标顶点,如果源顶点的属性+边的属性<目标顶点的属性,则发送消息,否则不发送。
发送成功的只有两条边:

5—>3(0+8<Double.Infinity , 成功),
5—>6(0+3<Double.Infinity , 成功)
3—>2(Double.Infinity+4>Double.Infinity , 失败)
3—>6(Double.Infinity+3>Double.Infinity , 失败)
2—>1(Double.Infinity+7>Double.Infinity , 失败)
2—>4(Double.Infinity+2>Double.Infinity , 失败)
2—>5(Double.Infinity+2>Double.Infinity , 失败)
4—>1(Double.Infinity+1>Double.Infinity , 失败)

sendMsg方法执行完成之后,根据顶点处于激活态的条件,顶点5成功地分别给顶点3和顶点6发送了消息,顶点3和顶点6页成功的接收到了消息。所以,此时只有5、3、6三个顶点处于激活态,其他顶点全部钝化。然后收到消息的顶点3和顶点6都调用vprog方法,将受到的消息与自身的属性合并。如下图所示,到此第一次迭代结束。

  • 第二次迭代开始

顶点3给顶点6发送消息失败,顶点3给顶点2发送消息成功,此时,顶点3成功发送消息,顶点2成功接收消息,所以顶点2和顶点3都成为激活态。其他顶点都成为钝化态。然后顶点2调用vprog方法,将收到的消息与自身的属性合并。如下图所示,到此第二次迭代结束。

  • 第三次迭代开始

顶点3分别发送消息给顶点2失败和顶点6失败,顶点2分别发消息给顶点1成功、顶点4成功和顶点5失败,所以顶点2、顶点1、顶点4称为激活态,其他顶点为钝化态,顶点1和顶点4分别调用vprog方法,将收到的消息与自身的属性合并。如下图所示,到此第三次迭代结束。

  • 第四次迭代开始

顶点2分别发送消息给顶点1失败和顶点4失败。顶点4给顶点1发送消息成功,顶点1和顶点4进入激活态,其他顶点进入钝化态。顶点1调用vprog方法,将收到的消息与自身的属性合并。如下图所示,到此第四次迭代结束。

  • 第五次迭代开始

顶点4再给顶点1发送消息失败,顶点4和顶点1进入钝化态,此时全部顶点都进入钝化态,到此结束。

大数据——GraphX之Pregel算法原理及Spark实现相关推荐

  1. 大数据相关技术和算法

    大数据的关键技术: 大数据的关键技术分为分析技术和处理技术,可用于大数据分析的关键技术主要包括A/B测试,关联规则挖掘,数据挖掘,集成学习,遗传算法,机器学习,自然语言处理,模式识别,预测模型,信号处 ...

  2. 字王:大数据与黑天鹅算法2.0

    字王:大数据与黑天鹅算法2.0 wiki百科:"黑天鹅"隐喻那些意外事件:它们极为罕见,在通常的预期之外. 如果一种理论.模型和算法,能够在一年内,捕获一只黑天鹅,无疑是成功的.科 ...

  3. 大数据的几大经典算法

    大数据的几大经典算法 一. CART: 分类与回归树 CART, Classification and Regression Trees.在分类树下面有两个关键的思想:第一个 是关于递归地划分自变量空 ...

  4. 大数据算法_大数据时代,机器学习算法该如何升级?

    文 /杨晓宁 随着产业界数据量的爆炸式增长,大数据概念受到越来越多的关注.由于大数据的海量.复杂多样.变化快的特性,对于大数据环境下的应用问题,传统的小数据上的机器学习算法很多已不再适用.因此,研究大 ...

  5. 大数据系列之数据仓库Hive原理

    Hive系列博文,持续更新~~~ 大数据系列之数据仓库Hive原理 大数据系列之数据仓库Hive安装 大数据系列之数据仓库Hive中分区Partition如何使用 大数据系列之数据仓库Hive命令使用 ...

  6. Spark商业案例与性能调优实战100课》第20课:大数据性能调优的本质和Spark性能调优要点分析

    Spark商业案例与性能调优实战100课>第20课:大数据性能调优的本质和Spark性能调优要点分析 基于本元想办法,大智若愚,大巧若拙!深入彻底的学习spark技术内核!

  7. 【大数据入门】Hadoop技术原理与应用之基于Hadoop的数据仓库Hive

    基于Hadoop的数据仓库Hive 文章目录 基于Hadoop的数据仓库Hive @[toc] 6.1 概述 6.1.1 数据仓库概念 6.1.2 传统数据仓库面临的挑战 6.1.3 Hive简介 6 ...

  8. 【用户画像】大数据之用户画像的原理、应用与实现

    什么是用户画像 用户画像:通过各个维度对用户或者产品特征属性的刻画,并对这些特征分析统计挖掘潜在价值信息.完美地抽象出一个用户的信息全貌,可以看作企业应用大数据的根基.用户画像使用标签来量化用户特征属 ...

  9. cdh mysql sqoop 驱动_大数据技术之Sqoop学习——原理、安装、使用案例、常用命令...

    第1章 Sqoop 简介 Sqoop 是一款开源的工具,主要用于在 Hadoop(Hive) 与传统的数据库 (mysql,postgresql,...) 间进行数据的高校传递,可以将一个关系型数据库 ...

最新文章

  1. 数据分析系列:绘制折线图(matplotlib)2
  2. python 珠玑妙算
  3. MFC 对话框Picture Control(图片控件)中静态和动态显示Bmp图片
  4. java炸弹游戏_java实现数字炸弹
  5. PHP脚本占用内存太多,解决方案
  6. php数组foreach循环添加键值对_在PHP的foreach循环中插入一个$key作为变量
  7. 【kali】kali环境下安装dvwa
  8. VB.NET 网络通讯示例(服务端)
  9. 2019年创新中国网课答案
  10. vscode 力扣插件
  11. MySQL学习笔记(二)
  12. 我个人总结的Halcon内存管理心得笔记,关于C#/C++内存释放
  13. 6 errors and 0 warnings potentially fixable with the`--fix` option
  14. 屏幕进入省电模式计算机未输出,戴尔 U2719DC 显示器使用与故障处理指南
  15. 看机器学习如何还原图像色彩
  16. win10 修复打印机服务器,Windows Update修复了打印机错误(win10/win7)
  17. 卡普的21个NP完全问题-问题描述
  18. 我在做售前-5.如何应聘售前
  19. python中去除全角空格
  20. 如何check tcpdump 和pcap log以及 DPL logging

热门文章

  1. wget -O- 命令解释
  2. Python中的下划线、双下划线
  3. android擦动画,Android中动画的使用
  4. 树莓派控制ZD-8731两相步进电机驱动器
  5. 笔记本清灰后组装后出现蓝屏,并不断的循环重新启动。
  6. 三分钟入门大数据之什么是用户画像?
  7. SQL语句的书写顺序和执行顺序
  8. 【githubdailyshare】微软最近在 GitHub 上开源了一个 AI 音乐项目,基于深度学习,可自动完成音乐创作
  9. 《Java之面向对象:下》
  10. 铁肩担责任 淬炼造奇迹——专访鸿策集团董事长陈圆道