参考博客:https://blog.csdn.net/hanweileilei/article/details/89764466
大佬博客写的很详细,不用继续看这篇了,随便写一些记录一下。

Pregel框架:

Pregel是一种面向图算法的分布式编程框架,采用迭代的计算模型:在每一轮,每个顶点处理上一轮收到的消息,并发出消息给其它顶点,并更新自身状态和拓扑结构(出、入边)等。有意思的是,Pregel 这个名字的来源,是纪念著名的欧拉七桥问题,这七座桥就位于Pregel这条河上。

典型的Pregel计算流程是这样的:读取输入—>初始化图—>初始化完成—>运行一系列超步—>计算结束,其中从全局的角度看,每一个超步都是独立运行的。在以上的过程中,不难想象每一个节点会存在两种属性:活跃Active和非活跃Halt。当某个节点接收到了消息并且消息告知它需要执行计算了,那么该节点会将自己设置为Active状态;反之当某节点没有接收到消息或者“接收到了消息但是并不需要进行计算”,则将自己设置成Halt状态。该机制如下图所示:

Pregel的计算大致是这样一个过程:输入图数据且初始化---->所有节点置为Active且向周围节点SendMessage(包括正向边反向边双向边)----->每个节点接收消息切根据预定义的计算函数对消息进行处理随后更新自身信息或者设置为Inactive(Halt)---->每个活跃节点按照SendMessage函数继续发送消息---->下一个SuperStep开始----->直到所有节点Hale/Inactive结束计算。

下面还是以这幅图为例:下图中共4节点,从左到右依次为节点A/B/C/D。圈中的数字为节点的属性值,实线为节点之间的边(包含方向),虚线是不同超步之间的信息发送,带阴影的圈是不活跃的节点。我们的目的是让图中所有节点的属性值都变成最大的那个属性值。

superstep 0:首先所有节点设置为活跃,并且沿正向边向相邻节点发送自身的属性值(SendMessage)。

Superstep 1:所有节点接收到信息,节点A和节点D发现自己接受到的值比自己的大,所以更新自己的节点(这个过程可以看做是计算),并保持活跃。但是节点B和C没有接收到比自己大的值,所以不计算、不更新。活跃节点继续向相邻节点发送当前自己的属性值。

Superstep 2:节点C接受信息并计算,其它节点没接收到信息或者接收到但是不计算,所以接下来只有节点3活跃并发送消息。

Superstep 3:节点B和D接受到消息但是不计算所以不活跃,所有节点均不活跃,所以计算结束。

可以看出,在pregel计算框架中有两个核心的函数:sendmessage函数和F(Vertex)节点计算函数。

GraphX Pregel源码解析:
参考博客:https://www.jianshu.com/p/d926150542df

package org.apache.spark.graphximport scala.reflect.ClassTagimport org.apache.spark.internal.Logging/*** Implements a Pregel-like bulk-synchronous message-passing API.* --实现了类似Pregel的批量同步消息传递API** Unlike the original Pregel API, the GraphX Pregel API factors the sendMessage computation over* edges, enables the message sending computation to read both vertex attributes, and constrains* messages to the graph structure.  These changes allow for substantially more efficient* distributed execution while also exposing greater flexibility for graph-based computation.* ----与原始的Pregel API不同,GraphX Pregel API会通过边影响sendMessage计算,使* sendMessage计算能够读取顶点属性,并将消息约束(constrains)到图形结构。 这些更改允许基* 本上更有效的分布式执行,同时也为基于图的计算提供了更大的灵活性。** @example We can use the Pregel abstraction to implement PageRank:* ----使用Pregel抽象来实现PageRank的一个例子* {{{* val pagerankGraph: Graph[Double, Double] = graph*   // Associate the degree with each vertex ----将度数与每个顶点相关联*   .outerJoinVertices(graph.outDegrees) {*     (vid, vdata, deg) => deg.getOrElse(0)*   }*   // Set the weight on the edges based on the degree ---根据出度设置边的weight*   .mapTriplets(e => 1.0 / e.srcAttr)*   // Set the vertex attributes to the initial pagerank values---将顶点属性设置为初始pagerank值*   .mapVertices((id, attr) => 1.0)** def vertexProgram(id: VertexId, attr: Double, msgSum: Double): Double =*   resetProb + (1.0 - resetProb) * msgSum* def sendMessage(id: VertexId, edge: EdgeTriplet[Double, Double]): Iterator[(VertexId, Double)] =*   Iterator((edge.dstId, edge.srcAttr * edge.attr))* def messageCombiner(a: Double, b: Double): Double = a + b* val initialMessage = 0.0* // Execute Pregel for a fixed number of iterations.---执行Pregel进行固定次数的迭代。* Pregel(pagerankGraph, initialMessage, numIter)(*   vertexProgram, sendMessage, messageCombiner)* }}}**/
object Pregel extends Logging {/*** Execute a Pregel-like iterative vertex-parallel abstraction.  The* user-defined vertex-program `vprog` is executed in parallel on* each vertex receiving any inbound messages and computing a new* value for the vertex.  The `sendMsg` function is then invoked on* all out-edges and is used to compute an optional message to the* destination vertex. The `mergeMsg` function is a commutative* associative function used to combine messages destined to the* same vertex.* --- 一共三个函数:*     用户定义的顶点函数vprog在接收任何入点消息的每个顶点上并行执行,*     并计算顶点的新值;*     在所有出方向的边上执行sendMsg函数,并用于计算到目标顶点的可选消息;*     mergeMsg是用于组合【发往同一个顶点的消息】的交换关联函数。** On the first iteration all vertices receive the `initialMsg` and* on subsequent iterations if a vertex does not receive a message* then the vertex-program is not invoked.*  --在第一次迭代中,所有的顶点都接收intialMsg消息,在后续迭代中,如果顶点没有接收到消*    息,vprog将不会被执行** This function iterates until there are no remaining messages, or* for `maxIterations` iterations.   ---函数将循环迭代,直到没有剩余消息或者到达设定的最大迭代次数** @tparam VD the vertex data type ---点数据类型* @tparam ED the edge data type ---边数据类型* @tparam A the Pregel message type ---Pregel消息类型** @param graph the input graph.** @param initialMsg the message each vertex will receive at the first* iteration  --首轮初始Msg** @param maxIterations the maximum number of iterations to run for --设定的最大迭代次数** @param activeDirection the direction of edges incident to a vertex that received a message in* the previous round on which to run `sendMsg`. For example, if this is `EdgeDirection.Out`, only* out-edges of vertices that received a message in the previous round will run. The default is* `EdgeDirection.Either`, which will run `sendMsg` on edges where either side received a message* in the previous round. If this is `EdgeDirection.Both`, `sendMsg` will only run on edges where* *both* vertices received a message. * ---上一轮接收到消息的顶点所关联的边,将沿着边的方向执行sendMsg** @param vprog the user-defined vertex program which runs on each* vertex and receives the inbound message and computes a new vertex* value.  On the first iteration the vertex program is invoked on* all vertices and is passed the default message.  On subsequent* iterations the vertex program is only invoked on those vertices* that receive messages.* * ---用户定义的顶点程序,其在每个顶点上运行并接收入站消息并计算新的顶点值。 *    在第一次迭代中,顶点程序在所有顶点上被调用并被传递给默认消息。*    在后续迭代中,顶点程序仅在接收消息的顶点上被调用。* @param sendMsg a user supplied function that is applied to out* edges of vertices that received messages in the current* iteration* ---用户定义的函数,应用于当前迭代中接收到消息的定点所关联的out方向的边** @param mergeMsg a user supplied function that takes two incoming* messages of type A and merges them into a single message of type* A.  ''This function must be commutative and associative and* ideally the size of A should not increase.''* ---一个用户提供的函数,它接收两个类型为A的输入消息,并将它们合并成一个类型为A的单个*     消息。“该函数必须是可交换和关联的,理想情况下A的大小不应该增加" ,我理解成消息*     总是被合并的意思。** @return the resulting graph at the end of the computation  ---计算结束返回的结果**/def apply[VD: ClassTag, ED: ClassTag, A: ClassTag](graph: Graph[VD, ED],initialMsg: A,maxIterations: Int = Int.MaxValue,activeDirection: EdgeDirection = EdgeDirection.Either)(vprog: (VertexId, VD, A) => VD,sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],mergeMsg: (A, A) => A): Graph[VD, ED] ={require(maxIterations > 0, s"Maximum number of iterations must be greater than 0," +s" but got ${maxIterations}") ---require() 方法用在对参数的检验上,不通过则抛出 IllegalArgumentExceptionvar g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache()// compute the messagesvar messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg)var activeMessages = messages.count()// Loopvar prevG: Graph[VD, ED] = nullvar i = 0while (activeMessages > 0 && i < maxIterations) {// Receive the messages and update the vertices. 接收消息更新节点信息prevG = gg = g.joinVertices(messages)(vprog).cache()val oldMessages = messages// Send new messages, skipping edges where neither side received a message. We must cache// messages so it can be materialized on the next line, allowing us to uncache the previous// iteration. 发送新消息,跳过未接收到消息的边,必须cache消息以便在下一次循环使用messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()// The call to count() materializes `messages` and the vertices of `g`. This hides oldMessages// (depended on by the vertices of g) and the vertices of prevG (depended on by oldMessages// and the vertices of g).activeMessages = messages.count()--count()方法的调用实质化(执行了)messages和'g'图的点,隐藏了老的(上一轮的)Message和点logInfo("Pregel finished iteration " + i)// Unpersist the RDDs hidden by newly-materialized RDDs --对上一轮的消息和图反持久化oldMessages.unpersist(blocking = false)prevG.unpersistVertices(blocking = false)prevG.edges.unpersist(blocking = false)// count the iterationi += 1}messages.unpersist(blocking = false)g} // end of apply} // end of class Pregel

可以看到,pregel图算法要求传入6个参数:

参数 说明
initialMsg 图初始化的时候,开始模型计算的时候,所有节点都会先收到一个消息
maxIterations 最大迭代次数
activeDirection 规定了发送消息的方向
vprog 节点调用该消息将聚合后的数据和本节点进行属性的合并
sendMsg 激活态的节点调用该方法发送消息
mergeMsg 如果一个节点接收到多条消息,先用mergeMsg 来将多条消息聚合成为一条消息,如果节点只收到一条消息,则不调用该函数

案例:求顶点5 到 其他各顶点的 最短距离
在理解案例之前,首先要清楚关于 顶点 的两点知识:
顶点的状态有两种:
(1)、钝化态【类似于休眠,不做任何事】
(2)、激活态【干活】
顶点能够处于激活态需要有条件:
(1)、成功收到消息 或者
(2)、成功发送了任何一条消息

使用pregle算法计算,顶点5到各个顶点的最短距离:

第一步:创建顶点vertexRdd (VertexId,顶点属性)

// 创建顶点vertexRdd (VertexId,顶点属性)
val vertexArray = Array((1L, ("Alice", 28)),(2L, ("Bob", 27)),(3L, ("Charlie", 65)),(4L, ("David", 42)),(5L, ("Ed", 55)),(6L, ("Fran", 50))
)
val vertexRDD: RDD[(VertexId, (String, Int))] = sc.makeRDD(vertexArray)

第二步:创建边edgeRDD,Edge(srcVertexId,dstVertexId,边的属性) 在案例中边的属性代表 相邻两个顶点之间的距离

val edgeArray = Array(Edge(2L, 1L, 7),Edge(2L, 4L, 2),Edge(3L, 2L, 4),Edge(3L, 6L, 3),Edge(4L, 1L, 1),Edge(2L, 5L, 2),Edge(5L, 3L, 8),Edge(5L, 6L, 3)
)
val edgeRDD: RDD[Edge[Int]] = sc.makeRDD(edgeArray)

第三步:创建图(使用apply的方式创建) 注意:import org.apache.spark.graphx._

// 创建图(使用apply的方式创建) 注意:import org.apache.spark.graphx._
val graph = Graph.apply(vertexRDD, edgeRDD)

第四步:设置初始顶点,以及对原graph改变顶点的属性值

// 以顶点5为初始顶点,计算初始顶点到各个顶点的最短据离
val srcVertexId = 5L
// 初始顶点的初始值是0,其余顶点为正无穷
//mapVertices对graph的顶点信息改变属性值
val initialGraph: Graph[Double, PartitionID] = graph.mapVertices {case (id, (name, age)) => {if (id == srcVertexId)0.0elseDouble.PositiveInfinity}
}

补充:mapVertices方法,(VertexId, VD) => VD2 VertexId不变,改变顶点的属性值

/*** Transforms each vertex attribute in the graph using the map function.** @note The new graph has the same structure.  As a consequence the underlying index structures* can be reused.** @param map the function from a vertex object to a new vertex value** @tparam VD2 the new vertex data type** @example We might use this operation to change the vertex values* from one type to another to initialize an algorithm.* {{{* val rawGraph: Graph[(), ()] = Graph.textFile("hdfs://file")* val root = 42* var bfsGraph = rawGraph.mapVertices[Int]((vid, data) => if (vid == root) 0 else Math.MaxValue)* }}}**/
def mapVertices[VD2: ClassTag](map: (VertexId, VD) => VD2)(implicit eq: VD =:= VD2 = null): Graph[VD2, ED]

打印原graph信息和新生成的graph信息,直观感受一下,顶点属性发生了变化:

//原graph的triplets信息
graph.triplets.collect().foreach(println)
/*((2,(Bob,27)),(1,(Alice,28)),7)
((2,(Bob,27)),(4,(David,42)),2)
((3,(Charlie,65)),(2,(Bob,27)),4)
((3,(Charlie,65)),(6,(Fran,50)),3)
((4,(David,42)),(1,(Alice,28)),1)
((2,(Bob,27)),(5,(Ed,55)),2)
((5,(Ed,55)),(3,(Charlie,65)),8)
((5,(Ed,55)),(6,(Fran,50)),3)*/
println("--------------")
//新graph的triplets信息,改变了原graph的属性值,变成了只有距离信息。
initialGraph.triplets.collect().foreach(println)
/*((2,Infinity),(1,Infinity),7)
((2,Infinity),(4,Infinity),2)
((3,Infinity),(2,Infinity),4)
((3,Infinity),(6,Infinity),3)
((4,Infinity),(1,Infinity),1)
((2,Infinity),(5,0.0),2)
((5,0.0),(3,Infinity),8)
((5,0.0),(6,Infinity),3)*/
println("--------------")

第五步:实现图算法pregel

val pregelGraph: Graph[Double, PartitionID] = initialGraph.pregel(//initialMsg: A,//每个顶点在第一次迭代时将收到的信息Double.PositiveInfinity,//maxIterations: Int = Int.MaxValue//迭代运行的最大数量Int.MaxValue,//activeDirection: EdgeDirection = EdgeDirection.Either//上一轮接收到消息的顶点所关联的边,将沿着边的方向执行sendMsgEdgeDirection.Out
)(//vprog: (VertexId, VD, A) => VD//用户定义的顶点程序,其在每个顶点上运行并接收入站消息并计算新的顶点值。在第一次迭代中,顶点程序在所有顶点上被调用并被传递给默认消息。在后续迭代中,顶点程序仅在接收消息的顶点上被调用。(vid: VertexId, vd: Double, disMsg: Double) => {val minDist: Double = math.min(vd, disMsg)println(s"顶点${vid},属性${vd},收到消息${disMsg},合并后的属性${minDist}")minDist},//sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)]//用户定义的函数,应用于当前迭代中接收到消息的定点所关联的out方向的边(edgeTriplet) => {if (edgeTriplet.srcAttr + edgeTriplet.attr < edgeTriplet.dstAttr) {println(s"顶点${edgeTriplet.srcId} 给顶点${edgeTriplet.dstId} 发送消息 ${edgeTriplet.srcAttr + edgeTriplet.attr}")Iterator.apply((edgeTriplet.dstId, edgeTriplet.srcAttr + edgeTriplet.attr))} else {Iterator.empty}},//mergeMsg: (A, A) => A //一个用户提供的函数,它接收两个类型为A的输入消息,并将它们合并成一个类型为A的单个消息。“该函数必须是可交换和关联的,理想情况下A的大小不应该增加" ,我理解成消息总是被合并的意思。(msg1: Double, msg2: Double) => {math.min(msg1, msg2)}
)

输出结果:

顶点6,属性Infinity,收到消息Infinity,合并后的属性Infinity
顶点3,属性Infinity,收到消息Infinity,合并后的属性Infinity
顶点5,属性0.0,     收到消息Infinity,合并后的属性0.0
顶点1,属性Infinity,收到消息Infinity,合并后的属性Infinity
顶点4,属性Infinity,收到消息Infinity,合并后的属性Infinity
顶点2,属性Infinity,收到消息Infinity,合并后的属性Infinity
顶点5 给顶点3 发送消息 8.0
顶点5 给顶点6 发送消息 3.0
顶点3,属性Infinity,收到消息5.0,合并后的属性5.0
顶点6,属性Infinity,收到消息5.0,合并后的属性5.0
顶点3 给顶点2 发送消息 9.0
顶点2,属性Infinity,收到消息3.0,合并后的属性3.0
顶点2 给顶点4 发送消息 5.0
顶点2 给顶点1 发送消息 10.0
顶点1,属性Infinity,收到消息2.0,合并后的属性2.0
顶点4,属性Infinity,收到消息2.0,合并后的属性2.0
((2,3.0),(1,2.0),7)
((2,3.0),(4,2.0),2)
((3,5.0),(2,3.0),4)
((3,5.0),(6,5.0),3)
((4,2.0),(1,2.0),1)
((2,3.0),(5,0.0),2)
((5,0.0),(3,5.0),8)
((5,0.0),(6,5.0),3)

在案例中,具体发生了什么:
当调用pregel方法开始:
首先,所有顶点都将接收到一条初始消息initialMsg ,使所有顶点都处于激活态(红色标识的节点)。

第一次迭代开始:

所有顶点以EdgeDirection.Out的边方向调用sendMsg方法发送消息给目标顶点,如果 源顶点的属性+边的属性<目标顶点的属性,则发送消息。否则不发送。 发送成功的只有两条边:
5—>3(0+8<Double.Infinity , 成功)
5—>6(0+3<Double.Infinity , 成功)
3—>2(Double.Infinity+4>Double.Infinity , 失败)
3—>6(Double.Infinity+3>Double.Infinity , 失败)
2—>1(Double.Infinity+7>Double.Infinity , 失败)
2—>4(Double.Infinity+2>Double.Infinity , 失败)
2—>5(Double.Infinity+2>Double.Infinity , 失败)
4—>1(Double.Infinity+1>Double.Infinity , 失败)。

sendMsg方法执行完成之后,根据顶点处于激活态的条件,顶点5 成功地分别给顶点3 和 顶点6 发送了消息,顶点3 和 顶点6 也成功地接受到了消息。所以 此时只有5,3,6 三个顶点处于激活态,其他顶点全部钝化。然后收到消息的顶点3和顶点6都调用vprog方法,将收到的消息 与 自身的属性合并。如下图2所示。到此第一次迭代结束。

第二次迭代开始: 顶点3 给 顶点6 发送消息失败,顶点3 给 顶点2 发送消息成功,此时 顶点3 成功发送消息,顶点2 成功接收消息,所以顶点2 和 顶点3 都成为激活状态,其他顶点都成为钝化状态。然后顶点2 调用vprog方法,将收到的消息 与 自身的属性合并。 见图3. 至此第二次迭代结束。

第三次迭代开始: 顶点3分别发送消息给顶点2失败 和 顶点6失败,顶点2 分别发消息给 顶点1成功、顶点4成功、顶点5失败 ,所以 顶点2、顶点1、顶点4 成为激活状态,其他顶点为钝化状态。顶点1 和 顶点4分别调用vprog方法,将收到的消息 与 自身的属性合并。见图4。至此第三次迭代结束

第四次迭代开始: 顶点2 分别发送消息给 顶点1失败 和 顶点4失败。顶点4 给 顶点1发送消息成功,顶点1 和 顶点4 进入激活状态,其他顶点进入钝化状态。顶点1 调用vprog方法,将收到的消息 与 自身的属性合并 。见图5

第五次迭代开始: 顶点4 再给 顶点1发送消息失败,顶点4 和 顶点1 进入钝化状态,此时全图都进入钝化状态。至此结束,见图6.

项目实战—航班飞行网图分析
Spark GraphX API
Spark GraphX PageRank
Spark GraphX Pregel
需求概述
探索航班飞行网图数据
构建航班飞行网图
使用Spark GraphX完成下列任务
统计航班飞行网图中机场的数量
统计航班飞行网图中航线的数量
计算最长的飞行航线(Point to Point)
找出最繁忙的机场
找出最重要的飞行航线(PageRank)
找出最便宜的飞行航线(SSSP)
问题分析1:数据探索
下载数据(USA Flight Dataset)
https://drive.google.com/file/d/0B7Yoht-ttAeuaWdGZkRsSkVkN00/view
数据格式
文件格式为CSV,字段之间分隔符为“,”
依次为:#日、周#、航空公司、飞机注册号、航班号、起飞机场编号、起飞机场、到达机场编号、到达机场、预计起飞时间(时分)、起飞时间、起飞延迟(分钟)、到达预计时间、到达时间、到达延迟(分钟)、预计飞行时间、飞行距离
问题分析2:构建航班飞行网图
创建属性图Graph[VD,ED]
装载CSV为RDD,每个机场作为顶点。关键字段:起飞机场编号、起飞机场、到达机场编号、到达机场、飞行距离
初始化顶点集airports:RDD[(VertexId,String)],顶点属性为机场名称
初始化边集lines:RDD[Edge],边属性为飞行距离
问题分析3:统计航班飞行网图中机场与航线的数量
机场数量
求顶点个数:Graph.numVertices
航线数量
求边的个数:Graph.numEdges
问题分析4:计算最长的飞行航线
最大的边属性
对triplets按飞行距离排序(降序)并取第一个
graph.triplets.sortBy(_.attr,ascending=false)
问题分析5:找出最繁忙的机场
哪个机场到达航班最多
计算顶点的入度并排序
问题分析6:找出最重要的飞行航线
PageRank
收敛误差:0.05
问题分析7:找出最便宜的飞行航线
定价模型
price = 180.0 + distance * 0.15
SSSP问题
从初始指定的源点到达任意点的最短距离
pregel
初始化源点(0)与其它顶点(Double.PositiveInfinity)
初始消息(Double.PositiveInfinity)
vprog函数计算最小值
sendMsg函数计算进行是否下一个迭代
mergeMsg函数合并接受的消息,取最小值
项目总结
Spark GraphX API
vertices、edges、triplets、
numEdges、numVertices
inDegrees、outDegrees、degrees
mapVertices、mapEdges、mapTriplets
Spark GraphX PageRank
Spark GraphX Pregel

数据示例:

月中第几天,周中第几天,航空公司,飞机注册号,航班号,起飞机场编号,起飞机场,到达机场编号,到达机场,预计起飞时间,起飞时间,起飞延迟,预计到达时间,到达时间,到达延迟,预计飞行时间,飞行距离
1,3,AA,N338AA,1,12478,JFK,12892,LAX,900,914,14,1225,1238,13,385,2475
2,4,AA,N338AA,1,12478,JFK,12892,LAX,900,857,0,1225,1226,1,385,2475
4,6,AA,N327AA,1,12478,JFK,12892,LAX,900,1005,65,1225,1324,59,385,2475
5,7,AA,N323AA,1,12478,JFK,12892,LAX,900,1050,110,1225,1415,110,385,2475
6,1,AA,N319AA,1,12478,JFK,12892,LAX,900,917,17,1225,1217,0,385,2475
7,2,AA,N328AA,1,12478,JFK,12892,LAX,900,910,10,1225,1212,0,385,2475
8,3,AA,N323AA,1,12478,JFK,12892,LAX,900,923,23,1225,1215,0,385,2475
9,4,AA,N339AA,1,12478,JFK,12892,LAX,900,859,0,1225,1204,0,385,2475
10,5,AA,N319AA,1,12478,JFK,12892,LAX,900,929,29,1225,1245,20,385,2475
11,6,AA,N328AA,1,12478,JFK,12892,LAX,900,915,15,1225,1344,79,385,2475
12,7,AA,N338AA,1,12478,JFK,12892,LAX,900,854,0,1225,1208,0,385,2475
13,1,AA,N332AA,1,12478,JFK,12892,LAX,900,855,0,1225,1152,0,385,2475
14,2,AA,N339AA,1,12478,JFK,12892,LAX,900,853,0,1225,1144,0,385,2475
15,3,AA,N786AA,1,12478,JFK,12892,LAX,900,851,0,1225,1201,0,385,2475
16,4,AA,N783AA,1,12478,JFK,12892,LAX,900,856,0,1225,1216,0,385,2475
17,5,AA,N783AA,1,12478,JFK,12892,LAX,900,855,0,1225,1154,0,385,2475
18,6,AA,N783AA,1,12478,JFK,12892,LAX,900,854,0,1225,1156,0,385,2475
19,7,AA,N786AA,1,12478,JFK,12892,LAX,900,858,0,1225,1152,0,385,2475
20,1,AA,N783AA,1,12478,JFK,12892,LAX,900,855,0,1225,1209,0,385,2475

scala代码:

/*** 探索航班飞行网图数据* 构建航班飞行网图* 使用Spark GraphX完成下列任务* (1)统计航班飞行网图中机场的数量* (2)统计航班飞行网图中航线的数量* (3)计算最长的飞行航线(Point to Point)* (4)找出最繁忙的机场* (5)找出最重要的飞行航线(PageRank)* (6)找出最便宜的飞行航线(SSSP)*/
object GraphXFlights {def main(args: Array[String]): Unit = {// 创建SparkSessionval spark: SparkSession = SparkSession.builder().appName(this.getClass.getName).master("local[4]").getOrCreate()// 创建SparkContextval sc: SparkContext = spark.sparkContext// 数据加载val lines: RDD[String] = sc.textFile("file:///E:\\ideaProjects\\SparkLearn\\data\\spark\\graphx\\USA Flight Datset - Spark Tutorial - Edureka.csv").repartition(1)// 获取首行val firstline: String = lines.first()// 去掉表头, 思路: 和首行一样的, 就去掉val flights: RDD[Array[String]] = lines.filter(_ != firstline).map(_.split(","))// 通过机场构建点集合val airports: RDD[(Long, String)] = flights.flatMap(x => Array((x(5).toLong, x(6)), (x(7).toLong, x(8)))).distinct()// 通过航线构建边集合val airlines: RDD[Edge[Int]] = flights.map(x => (x(5).toLong, x(7).toLong, x(16).toInt)).distinct().map(x => Edge(x._1, x._2, x._3))// 通过飞机场和航线构建图结构val graph = Graph(airports, airlines, "unknow")// (1)统计航班飞行网图中机场的数量val numAirports: Long = graph.numVertices// (2)统计航班飞行网图中航线的数量val numRoutes: Long = graph.numEdges// (3)计算最长的飞行航线(Point to Point)graph.triplets.sortBy(_.attr, false).take(1)// (4)找出最繁忙的机场graph.inDegrees.sortBy(_._2, false).take(1)graph.outDegrees.sortBy(_._2, false).take(1)// (5)找出最重要的飞行航线(PageRank)graph.pageRank(0.05).vertices.takeOrdered(3)(Ordering.by(_._2))// (6)找出最便宜的飞行航线(SSSP)graph.mapEdges(e => 180.0 + e.attr * 0.15)// 赋给源点一个初值, 0.0 其他点赋无穷大.mapVertices((id, attr) => if (id == 12478) 0.0 else Double.PositiveInfinity).pregel(Double.PositiveInfinity)(// 接收消息时的处理函数(id, dist, new_dist) => math.min(dist, new_dist),// 发送消息的处理函数triplet => {// 如果当前点的价格+边长的价格<目标点已有的价格就发送消息[当前点的价格+边长的价格]// 否则就不发送消息if (triplet.srcAttr + triplet.attr < triplet.dstAttr)Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))else  Iterator.empty},// 合并消息的函数, 求最小值(a,b)=>math.min(a,b)// 取价格最便宜的前三个遍历).vertices.takeOrdered(3)(Ordering.by(_.attr)).foreach(println)}
}

2020.11.26课堂笔记(sparkGraphx算法之pregel)相关推荐

  1. 2020.11.10课堂笔记(Apache Spark 分布式计算原理)

    Spark WordCount运行原理 一.RDD的依赖关系 为什么需要划分Stage 数据本地化- 移动计算,而不是移动数据 保证一个Stage内不会发生数据移动 Lineage:血统.遗传 RDD ...

  2. 2020.8.4课堂笔记(IO 缓冲流,对象流)

    课前复习: 1.什么是io流,如何分类 流向: 输入:InputStream,Reader 输出:OutputStream,Writer 按读取的单元: 字节流:InputStream,OutputS ...

  3. 2020.10.20课堂笔记(java8新特性 lambda表达式)

    一.什么是Lambda? 我们知道,对于一个Java变量,我们可以赋给其一个"值". 如果你想把"一块代码"赋给一个Java变量,应该怎么做呢? 比如,我想把右 ...

  4. 2020.4.22课堂笔记(继承、多态、抽象类、抽象方法)

    在继承的情况下,子类的方法满足以下条件: 1.方法名相同 2.参数列表相同 3.返回值类型相同,或者是父类返回值类型的子类 4.访问修饰符的权限不能小于父类方法 5.抛出的异常不能大于父类(严于) 多 ...

  5. 2020.8.5课堂笔记(多线程)

    线程的创建方式: 继承Thread类: 实现Runnable接口 实现Callable接口 1.创建线程有哪几种方式 2.start()和run()方法的区别 3.什么是线程安全,通过什么方式实现线程 ...

  6. PythonPyqt5项目开发完成后如何使用pyinstaller打包——以Pycharm编辑器为例(目前为止最正确的版本,成功打包日期为2020.11.26)

    (请先看置顶博文)https://blog.csdn.net/GenuineMonster/article/details/104495419 最近用Python开发了一个可视化界面,开发过程如鱼得水 ...

  7. 11.11计算机网络课堂笔记

    传输层 TCP 流量控制 读取的慢,发送过来的快,需要协调发送方和接收方的速率,防止缓冲区溢出导致丢包 拥塞控制 太多的数据源发送了太多的数据 当网络的发送和接收速率接近带宽上限时,流量强度接近1,排 ...

  8. 失望、难过、挫败 2020.11.26日记

    生活日记 今天和连下了多天雨的天气一样,很丧. 下午李老师的课结课了,以后周四就没课了.我们小组展示的还算马马虎虎吧. 学习日记 今天是非常挫败的一天,花了一个晚上看链表的一道题目还是没写出来,数据结 ...

  9. 杜国光博士,基于视觉的机器人抓取--物体定位,位姿估计到抓取估计课堂笔记

    基于视觉的机器人抓取--物体定位,位姿估计到抓取估计课堂笔记 杜国光博士在智东西公开课上讲了<基于视觉的机器人抓取--物体定位,位姿估计到抓取估计>的精彩课程 满满的干货,记下来,后面慢慢 ...

最新文章

  1. [转] 前后端分离之JWT用户认证
  2. linux构建web主机
  3. [NOTE] XVWA靶场练习笔记
  4. JAVA_list总结
  5. js如何上传大文件到服务器,js将文件上传到远程服务器
  6. GoogLeNet的心路历程(三)
  7. windows 安装mysql的时候最后执行一直停留在Write configuration file
  8. 精选10款超酷的HTML5/CSS3菜单
  9. hiredis中异步的实现小结
  10. JAVA:实现crc校验算法(附完整源码)
  11. 域名 ip 校验正则表达式
  12. ADNI静息态功能核磁共振成像数据预处理总流程
  13. Coursera machine learning week 6 excise
  14. android手势_您可能不知道的七个Android手势
  15. Armstrong 一个n位数等于各个位数n次方之和
  16. MySQL数据逻辑备份
  17. 雷电2接口_雷电3和Type-C接口一样吗?差别很大
  18. 外企面试,哪有你想象的那么难!(已收埃森哲、NTTDATA等8家外企offer)
  19. QQ群78928780记录整理:90523花絮-部分
  20. Allegro PCB 图纸大小及坐标原点位置

热门文章

  1. Input Events(输入事件)
  2. 无限制免费版,完美您的WEB应用 PAZU WEB打印控件
  3. Apollo星火计划学习笔记——Apollo决策规划技术详解及实现(以交通灯场景检测为例)
  4. Android毕业设计-微圈
  5. 北邮计算机博士读几年,[扒你一褂]看了北邮博士自杀遗书,请各个学校的在读博士、硕士晒晒每个月的收入,再说...
  6. 自旋锁和互斥锁的区别
  7. 三菱FX5U与西门子S7-1200通过简单CPU通信设置实现以太网通信功能的具体方法步骤
  8. 单张人像生成视频!中国团队提出FaceAnime:最新3D人脸视频生成模型
  9. 生病了,在家养病,真是无聊!
  10. 汇编语言(十五)校汇编语言程序设计竞赛总结——双窗口显示