目录

  • 1、为什么需要图计算
  • 2、图的概念
    • 2.1 图的基本概念及应用场景
    • 2.2 图的术语
      • 2.2.1 顶点(Vertex)和边(Edge)
      • 2.2.2 有向图和无向图
      • 2.2.3 有环图和无环图
      • 2.2.4 度
    • 2.3 图的经典表示法
  • 3、Spark GraphX
    • 3.1 简介
    • 3.2 GraphX核心抽象
    • 3.3 GraphX API
    • 3.4 图的算子
      • 3.4.1 属性算子(数据变换)
      • 3.4.2 结构算子(结构变换)
      • 3.4.3 join算子
      • 3.4.4 练习:找出用户粉丝数量
  • 4、常用图算法
    • 4.1 页面排名算法--PageRank
    • 4.2 连通分量--Connected Component
  • 5、Pregel计算框架

1、为什么需要图计算

  • 许多大数据以大规模图或网络的形式呈现
  • 许多非图结构的大数据,常会被转换为图模型进行分析
  • 图数据结构很好地表达了数据之间的关联性

2、图的概念

2.1 图的基本概念及应用场景

图是由顶点集合(vertex)及顶点间的关系集合(边edge)组成的一种网状数据结构

  • 通常表示为二元组:Gragh=(V,E)
  • 可以对事物之间的关系建模

应用场景

  • 在地图应用中寻找最短路径
  • 社交网络关系
  • 网页间超链接关系

2.2 图的术语

2.2.1 顶点(Vertex)和边(Edge)

一般关系图中,事物为顶点,关系为边
定义一个图:

Graph=(V,E)
集合V={v1,v2,v3}
集合E={(v1,v2),(v1,v3),(v2,v3)}

2.2.2 有向图和无向图

  • 有向图:在有向图中,一条边的两个顶点一般扮演者不同的角色,比如父子关系、页面A连接向页面B;
G=(V,E)
V={A,B,C,D,E}
E={<A,B>,<B,C>,<B,D>,<C,E>,<D,A>,<E,D>}  //关系用尖括号表示

  • 无向图:在一个无向图中,边没有方向,即关系都是对等的,比如qq中的好友。
G=(V,E)
V={A,B,C,D,E}
E={(A,B),(A,D),(B,C),(B,D),(C,E),(D,E)}

2.2.3 有环图和无环图

  • 有环图:包含一系列顶点连接的回路(环路),有环图是包含循环的,一系列顶点连接成一个环,在有环图中,如果不关心终止条件,算法可能永远在环上执行,无法退出。

  • 无环图:不包含循环,不能形成环,DAG即为有向无环图

2.2.4 度

指一个顶点所有边的数量。

  • 出度:指从当前顶点指向其他顶点的边的数量
  • 入度:其他顶点指向当前顶点的边的数量

2.3 图的经典表示法

邻接矩阵:

1、对于每条边,矩阵中相应单元格值为1
2、对于每个循环,矩阵中相应单元格值为2,方便在行或列上求得顶点度数

3、Spark GraphX

3.1 简介

  • GraphX是Spark提供分布式图计算API

  • GraphX特点:

    • 基于内存实现了数据的复用与快速读取
    • 通过弹性分布式属性图(Property Graph)统一了图视图与表视图
    • 与Spark Streaming、Spark SQL和Spark MLlib等无缝衔接
  • 针对某些领域,如社交网络、语言建模等,graph-parallel系统可以高效地执行复杂的图形算法,比一般的data-parallel系统更快

  • Graphx是将graph-parallel的data-parallel统一到一个系统中。允许用户将数据当成一个图或一个集合RDD,而简化数据移动或复杂操作。

3.2 GraphX核心抽象

  • 弹性分布式属性图(Resilient Distributed Property Graph)

    • 顶点和边都带属性的有向多重图


  • 一份物理存储,两种视图


对Graph视图的所有操作,最终都会转换成其关联的Table视图的RDD操作来完成。

3.3 GraphX API

  • Graph[VD,ED]
  • VertexRDD[VD]
  • EdgeRDD[ED]
  • EdgeTriplet[VD,ED]
  • Edge:样例类
  • VertexId:Long的别名

maven工程需要下载依赖:

    <dependency><groupId>org.apache.spark</groupId><artifactId>spark-graphx_2.11</artifactId><version>2.1.1</version></dependency>
import org.apache.spark.graphx.{Edge, Graph}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object GraphxDemo1 {def main(args: Array[String]): Unit = {val conf :SparkConf= new SparkConf().setAppName("graphxDemo1").setMaster("local[2]")val sc = SparkContext.getOrCreate(conf)//创建vertices顶点rdd,(1L,1)中的1L代表顶点,1代表该顶点的属性val vd:RDD[(Long,Int)]=sc.makeRDD(Seq((1L,1),(2L,2),(3L,3)))//创建edges边rdd,(Edge(1L,2L,1)中的1L和2L代表顶点,1代表两顶点间的关系val ed:RDD[Edge[Int]]=sc.makeRDD(Seq(Edge(1L,2L,1),Edge(2L,3L,2)))//创建graph对象val graph=Graph(vd,ed)//获取graph图对象的顶点信息graph.vertices.collect.foreach(println)graph.vertices.foreach(x=>println(s"${x._1}-->${x._2}"))//获取graph图对象的边信息graph.edges.collect.foreach(println)graph.edges.foreach(x=>println(s"src:${x.srcId},dst:${x.dstId},attr:${x.attr}"))//获取顶点和边的整体信息graph.triplets.collect.foreach(println)}
}
/*
(2,2)
(1,1)
(3,3)
1-->1
3-->3
2-->2
Edge(1,2,1)
Edge(2,3,2)
src:1,dst:2,attr:1
src:2,dst:3,attr:2
((1,1),(2,2),1)
((2,2),(3,3),2)
*/

关于vertices、edges、triplets所表示的含义如下图:

Spark shell需要导入Spark Graph包

//导入Spark Graph包
scala> import org.apache.spark.graphx._//通过文件加载
followers.txt内容是
2 3
1 4
3 2
4 3scala> val graphLoad=GraphLoader.edgeListFile(sc,"file:///root/test/followers.txt")
graphLoad: org.apache.spark.graphx.Graph[Int,Int] = org.apache.spark.graphx.impl.GraphImpl@6d0c8cd0scala> graphLoad.vertices.collect
res6: Array[(org.apache.spark.graphx.VertexId, Int)] = Array((4,1), (1,1), (3,1), (2,1))scala> graphLoad.edges.collect
res7: Array[org.apache.spark.graphx.Edge[Int]] = Array(Edge(1,4,1), Edge(2,3,1), Edge(3,2,1), Edge(4,3,1))scala> graphLoad.triplets.collect
res8: Array[org.apache.spark.graphx.EdgeTriplet[Int,Int]] = Array(((1,1),(4,1),1), ((2,1),(3,1),1), ((3,1),(2,1),1), ((4,1),(3,1),1))

属性图应用示例:构建用户合作关系属性图

  • 顶点属性:用户名、职业
  • 边属性:合作关系
import org.apache.spark.graphx.{Edge, Graph}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object GraphDemo2 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[1]").setAppName("graphDemo2")val sc = SparkContext.getOrCreate(conf)val user:RDD[(Long,(String,String))] = sc.parallelize(Array((3L,("rxin","student")),(7L,("jgonzal","postdoc")),(5L,("franklin","professor")),(2L,("istoica","professor"))))val relationship:RDD[Edge[String]]=sc.parallelize(Array(Edge(3L,7L,"Collaborator"),Edge(5L,3L,"Advisor"),Edge(2L,5L,"Colleague"),Edge(5L,7L,"PI")))val graphUser=Graph(user,relationship)graphUser.vertices.collect.foreach(println)graphUser.edges.collect.foreach(println)graphUser.triplets.collect.foreach(println)}
}
/*
(3,(rxin,student))
(7,(jgonzal,postdoc))
(5,(franklin,professor))
(2,(istoica,professor))
Edge(2,5,Colleague)
Edge(3,7,Collaborator)
Edge(5,3,Advisor)
Edge(5,7,PI)
((2,(istoica,professor)),(5,(franklin,professor)),Colleague)
((3,(rxin,student)),(7,(jgonzal,postdoc)),Collaborator)
((5,(franklin,professor)),(3,(rxin,student)),Advisor)
((5,(franklin,professor)),(7,(jgonzal,postdoc)),PI)
*/

属性图应用示例:

  • 构建用户社交网络关系

    • 顶点:用户名、年龄
    • 边:打call次数
  • 找出大于30岁的用户
  • 假设打call超过5次,表示真爱。请找出他(她)们
import org.apache.spark.graphx.{Edge, EdgeTriplet, Graph}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object GraphDemo3 {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[1]").setAppName("graphDemo3")val sc = SparkContext.getOrCreate(conf)//构建用户社交网络关系val user:RDD[(Long,(String,Int))]  = sc.parallelize(Array((1L, ("Alice", 28)),(2L, ("Bob", 27)),(3L, ("Charlie", 65)),(4L, ("David", 42)),(5L, ("Ed", 55)),(6L, ("Fran", 50))))val userCall:RDD[Edge[Int]] = sc.parallelize(Array(Edge(2L, 1L, 7),Edge(4L, 1L, 1),Edge(5L, 2L, 2),Edge(3L, 2L, 4),Edge(5L, 3L, 8),Edge(2L, 4L, 2),Edge(5L, 6L, 3),Edge(3L, 6L, 7)))val graphUserCall=Graph(user,userCall)//找出大于30岁的用户//方法1:利用元组,(1L, ("Alice", 28))是二维元组,graphUserCall.vertices.filter(x=>x._2._2>30).collect().foreach(println)//方法2:利用样例类graphUserCall.vertices.filter{case(id,(name,age))=>age>30}.collect().foreach(println)//假设打call超过5次,表示真爱。请找出,格式:*** like ***,stage:*// graphUserCall.triplets.collect().foreach(println)的输出格式是((2,(Bob,27)),(1,(Alice,28)),7)//看着像三元组,实际并不是,它的类型是RDD[EdgeTriplet[(String,Int),Int]]//val triplets:RDD[EdgeTriplet[(String,Int),Int]] = graphUserCall.triplets//所以千万不能用三元组处理!//可以使用算子srcAttr,dstAttr,attr//((2,(Bob,27)),(1,(Alice,28)),7)中,// srcAttr代表(Bob,27),dstAttr代表(Alice,28),attr代表7//而(Bob,27),(Alice,28)是二元组,这样就好解决了。graphUserCall.triplets.filter(x=>x.attr>5).collect().foreach(x=>println(x.srcAttr._1+" like "+x.dstAttr._1+",stage:"+x.attr))}
}
/*输出:
(4,(David,42))
(6,(Fran,50))
(3,(Charlie,65))
(5,(Ed,55))
Bob like Alice,stage:7
Charlie like Fran,stage:7
Ed like Charlie,stage:8
*/

查看图信息

  • 顶点数量:val numVertices: Long
  • 边数量:val numEdges: Long
  • 度:val degrees: VertexRDD[Int]
    • 入度:val inDegrees: VertexRDD[Int]
    • 出度:val outDegrees: VertexRDD[Int]
//使用上述的示例println(graphUserCall.numVertices)      //输出:6println(graphUserCall.numEdges)        //输出:8graphUserCall.degrees.collect().foreach(println)   //输出:(4,2)(1,2)(6,2)(3,3)(5,3)(2,4)graphUserCall.inDegrees.collect().foreach(println)    //输出:(4,1)(1,2)(6,2)(3,1)(2,2)graphUserCall.outDegrees.collect().foreach(println)//输出:(4,1)(3,2)(5,3)(2,2)

3.4 图的算子

3.4.1 属性算子(数据变换)

用于属性图属性数据变换

  • 类似于RDD的map操作

    • mapVertices–>修改顶点的属性
    • mapEdges–>修改边的关系(也只是改变值,不改变结构)
    • mapTriplets
class Graph[VD, ED] {def mapVertices[VD2](map: (VertexId, VD) => VD2): Graph[VD2, ED]def mapEdges[ED2](map: Edge[ED] => ED2): Graph[VD, ED2]def mapTriplets[ED2](map: EdgeTriplet[VD, ED] => ED2): Graph[VD, ED2]
}

使用上述示例:用户社交网络关系(打call),这次用spark-shell进行操作。

//导包
scala> import org.apache.spark.graphx._//创建顶点RDD
scala> val user=sc.parallelize(Array(| (1L,("Alice",28)),| (2L,("Bob",27)),| (3L,("Charlie",65)),| (4L,("David",42)),| (5L,("Ed",55)),| (6L,("Age",50))))//创建边RDD
scala> val userCall=sc.parallelize(| Array(| Edge(4L,1L,1),| Edge(2L,1L,7),| Edge(5L,2L,2),| Edge(3L,2L,4),| Edge(5L,3L,8),| Edge(2L,4L,2),| Edge(5L,6L,3),| Edge(3L,6L,3)))//创建graph对象
scala> val graphUserCall=Graph(user,userCall)scala> graphUserCall.vertices.collect.foreach(println)
(4,(David,42))
(1,(Alice,28))
(6,(Age,50))
(3,(Charlie,65))
(5,(Ed,55))
(2,(Bob,27))//获取graph图对象的顶点信息
scala> graphUserCall.vertices.collect.foreach(println)
(4,(David,42))
(1,(Alice,28))
(6,(Age,50))
(3,(Charlie,65))
(5,(Ed,55))
(2,(Bob,27))//现需要改变顶点属性
//方法1:使用模式匹配
scala> val t1_graph=graphUserCall.mapVertices{case(vertextId,(name,age))=>(vertextId,name)}scala> t1_graph.vertices.collect.foreach(println)
(4,(4,David))
(1,(1,Alice))
(6,(6,Age))
(3,(3,Charlie))
(5,(5,Ed))
(2,(2,Bob))//方法2:使用元组
scala> val t2_graph=graphUserCall.mapVertices((id,attr)=>(id,attr._1))scala> t2_graph.vertices.collect.foreach(println)
(4,(4,David))
(1,(1,Alice))
(6,(6,Age))
(3,(3,Charlie))
(5,(5,Ed))
(2,(2,Bob))//获取graph图对象的边信息
scala> graphUserCall.edges.collect.foreach(println)
Edge(2,1,7)
Edge(2,4,2)
Edge(3,2,4)
Edge(3,6,3)
Edge(4,1,1)
Edge(5,2,2)
Edge(5,3,8)
Edge(5,6,3)//需求:将打call次数乘以7
scala> val t3_graph=graphUserCall.mapEdges(x=>Edge(x.srcId,x.dstId,x.attr*7.0))scala> t3_graph.edges.collect.foreach(println)
Edge(2,1,Edge(2,1,49.0))
Edge(2,4,Edge(2,4,14.0))
Edge(3,2,Edge(3,2,28.0))
Edge(3,6,Edge(3,6,21.0))
Edge(4,1,Edge(4,1,7.0))
Edge(5,2,Edge(5,2,14.0))
Edge(5,3,Edge(5,3,56.0))
Edge(5,6,Edge(5,6,21.0))
//显然这样不是我们想要的,mapEdges(x=>...)里的x是指的是打call次数,而不是Edge(2,1,7)整体scala> val t3_graph=graphUserCall.mapEdges(x=>x.attr*7.0)scala> t3_graph.edges.collect.foreach(println)
Edge(2,1,49.0)
Edge(2,4,14.0)
Edge(3,2,28.0)
Edge(3,6,21.0)
Edge(4,1,7.0)
Edge(5,2,14.0)
Edge(5,3,56.0)
Edge(5,6,21.0)//需求:打call次数加上出度顶点的年龄,使用mapTriplets
scala> val t4_graphUserCall=graphUserCall.mapTriplets(x=>(x.attr+x.srcAttr._2))scala> t4_graphUserCall.triplets.collect.foreach(println)
((2,(Bob,27)),(1,(Alice,28)),34)
((2,(Bob,27)),(4,(David,42)),29)
((3,(Charlie,65)),(2,(Bob,27)),69)
((3,(Charlie,65)),(6,(Age,50)),68)
((4,(David,42)),(1,(Alice,28)),43)
((5,(Ed,55)),(2,(Bob,27)),57)
((5,(Ed,55)),(3,(Charlie,65)),63)
((5,(Ed,55)),(6,(Age,50)),58)/*总结一下:
1、mapVertices,修改的是顶点的属性VD
2、mapEdges,修改的是边的关系ED
3、mapTriplets,修改的是边的关系ED
*/

注意:

  • 这里每一个操作产生一个新图,其顶点和边被用户定义的map函数修改了。
  • 在每一个实例图结构不受影响,仅仅是属性图属性数据变换。

3.4.2 结构算子(结构变换)

属性图结构变换

  • reverse:改变边的方向
  • subgraph:生成满足顶点和边的条件的子图
class Graph[VD, ED] {def reverse: Graph[VD, ED]def subgraph(epred: EdgeTriplet[VD,ED] => Boolean,vpred: (VertexId, VD) => Boolean): Graph[VD, ED]}

此处注意epred和vpred的区别,下面演示示例详细说明。

scala> graphUserCall.triplets.collect.foreach(println)
((2,(Bob,27)),(1,(Alice,28)),7)
((2,(Bob,27)),(4,(David,42)),2)
((3,(Charlie,65)),(2,(Bob,27)),4)
((3,(Charlie,65)),(6,(Age,50)),3)
((4,(David,42)),(1,(Alice,28)),1)
((5,(Ed,55)),(2,(Bob,27)),2)
((5,(Ed,55)),(3,(Charlie,65)),8)
((5,(Ed,55)),(6,(Age,50)),3)
//reverse 改变边的方向
scala> val reverse_graphUserCall=graphUserCall.reversescala> reverse_graphUserCall.triplets.collect.foreach(println)
((1,(Alice,28)),(2,(Bob,27)),7)
((1,(Alice,28)),(4,(David,42)),1)
((2,(Bob,27)),(3,(Charlie,65)),4)
((2,(Bob,27)),(5,(Ed,55)),2)
((3,(Charlie,65)),(5,(Ed,55)),8)
((4,(David,42)),(2,(Bob,27)),2)
((6,(Age,50)),(3,(Charlie,65)),3)
((6,(Age,50)),(5,(Ed,55)),3)//subgraph之vpred
//vpred: (VertexId, VD) => Boolean
//注意匿名函数的参数是顶点信息,也就意味着会每个顶点进行判断
//需求:截取年龄小于65的子图(包括所有顶点)
scala> graphUserCall.subgraph(vpred=(id,attr)=>attr._2<65).triplets.collect.foreach(println)
((2,(Bob,27)),(1,(Alice,28)),7)
((2,(Bob,27)),(4,(David,42)),2)
((4,(David,42)),(1,(Alice,28)),1)
((5,(Ed,55)),(2,(Bob,27)),2)
((5,(Ed,55)),(6,(Age,50)),3)//subgraph之epred
//epred: EdgeTriplet[VD,ED] => Boolean
//注意匿名函数的参数是顶点和边的整体信息,如果需要过滤顶点信息,需要使用srcAttr和dstAttr
//需求:截取年龄小于65的子图(仅过滤出度顶点,也就是起始顶点)
scala> graphUserCall.subgraph(epred=(ep)=>ep.srcAttr._2<65).triplets.collect.foreach(println)
((2,(Bob,27)),(1,(Alice,28)),7)
((2,(Bob,27)),(4,(David,42)),2)
((4,(David,42)),(1,(Alice,28)),1)
((5,(Ed,55)),(2,(Bob,27)),2)
((5,(Ed,55)),(3,(Charlie,65)),8) //比vpred多了此条
((5,(Ed,55)),(6,(Age,50)),3)

3.4.3 join算子

从外部的RDDs加载数据,修改顶点属性。

  • joinVertices
  • outerJoinVertices
class Graph[VD, ED] {def joinVertices[U](table: RDD[(VertexId, U)])(map: (VertexId, VD, U) => VD): Graph[VD, ED]def outerJoinVertices[U, VD2](table: RDD[(VertexId, U)])(map: (VertexId, VD, Option[U]) => VD2): Graph[VD2, ED]
}
//仍使用打call的示例
//新创建一个RDD
scala> val two=sc.makeRDD(Array((1L,"kgc.cn"),(2L,"qq.com"),(3L,"163.com"),(7L,"so.com")))//joinVertices
//用法:graph.joinVertices(RDD)
scala> graphUserCall.joinVertices(two)((id,v,cmpy)=>(v._1+"@"+cmpy,v._2)).vertices.collect.foreach(println)
(4,(David,42))
(1,(Alice@kgc.cn,28))
(6,(Age,50))
(3,(Charlie@163.com,65))
(5,(Ed,55))
(2,(Bob@qq.com,27))//outerJoinVertices
//用法:graph.outerJoinVertices(RDD)
scala> graphUserCall.outerJoinVertices(two)((id,v,cmpy)=>(v._1+"@"+cmpy,v._2)).vertices.collect.foreach(println)
(4,(David@None,42))   //RDD中的顶点不匹配时,值为None
(1,(Alice@Some(kgc.cn),28))
(6,(Age@None,50))
(3,(Charlie@Some(163.com),65))
(5,(Ed@None,55))
(2,(Bob@Some(qq.com),27))

属性算子mapVertices可以修改顶点的属性,join算子也可以修改顶点的属性,区别是join算子是从外部的RDDs加载数据。

需求:计算用户粉丝数量

scala> case class User(name:String,age:Int,inDeg:Int,outDeg:Int)//将顶点入度、出度存入顶点属性中
scala> val t2_graph=graphUserCall.outerJoinVertices(graphUserCall.inDegrees){case(id,u,indeg)=>User(u._1,u._2,indeg.getOrElse(0),0)}.outerJoinVertices(graphUserCall.outDegrees){case(id,u,outdeg)=>User(u.name,u.age,u.inDeg,outdeg.getOrElse(0))}scala> t2_graph.vertices.collect.foreach(println)
(4,User(David,42,1,1))
(1,User(Alice,28,2,0))
(6,User(Age,50,2,0))
(3,User(Charlie,65,1,2))
(5,User(Ed,55,0,3))
(2,User(Bob,27,2,2))

3.4.4 练习:找出用户粉丝数量

需求:现有数据格式如下,创建图并计算每个用户的粉丝数量。
格式:((User*, ),(User,*))

  • (User*, *)=(用户名,用户ID)
  • 第一个用户表示被跟随者(followee)
  • 第二个用户表示跟随者(follower)
import org.apache.spark.graphx.{Edge, Graph}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.apache.spark.{SparkConf, SparkContext}import scala.util.matching.Regexobject Test01 {def main(args: Array[String]): Unit = {// 创建SparkSessionval spark: SparkSession = SparkSession.builder().appName(this.getClass.getName).master("local[1]").getOrCreate()// 创建SparkContextval sc: SparkContext = spark.sparkContext// 编写正则表达式, 用于提取字段val pattern: Regex ="""\(\((User\d+,\d+)\),\((User\d+,\d+)\)\)""".r// 读取数据文件val twitters: RDD[(Array[String], Array[String])] = sc.textFile("D:\\上课PPT\\直播课资料\\2-Spark阶段\\02-spark\\twitter_graph_data.txt")// 正则表达式的模式匹配, 返回一个Option数据类型.map(line => line match {case pattern(followee, follower) => (Some(followee), Some(follower))case _ => (None, None)// 过滤掉值为None的}).filter(x => x._1 != None && x._2 != None)// 正则表达式模式匹配后返回两组值, 分别对这两组值进行切割处理, 返回一个(Array[String], Array[String]).map(x => (x._1.get.split(","), x._2.get.split(",")))// 使用flatMap对数据进行扁平化处理, 构建点集合// 也可以使用union对两个RDD求并集, 这里不做演示val verts: RDD[(Long, String)] = twitters.flatMap(x => Array((x._1(1).toLong, x._1(0)), (x._2(1).toLong, x._2(0)))).distinct()// 构建边集合val edges: RDD[Edge[String]] = twitters.map(x => Edge(x._2(1).toLong, x._1(1).toLong, "follow"))// 构建图// 有可能会出现一种情况, 在边集合中出现的点在点集合中不存在val graph = Graph(verts,edges,"")// 求入度, 按照入度的值进行倒序排列// 注意: 要想实现全局有序, 需要重新分成一个区graph.inDegrees.repartition(1).sortBy(x=>x._2,false).foreach(println)}
}
/*输出结果:
(123004655,56)
(36851222,56)
(59804598,54)
(63644892,46)
(14444530,42)
......
./

4、常用图算法

4.1 页面排名算法–PageRank

  • 用于评估网页链接的质量和数量,以确定该网页的重要性和权威性的相对分数,范围为0到10
  • 从本质上讲,PageRank是找出图中顶点(网页链接)的重要性
  • GraphX提供了PageRank API用于计算图的PageRank

原理部分详见:PageRank算法原理剖析及Spark实现

class Graph[VD, ED] {def pageRank(tol: Double, resetProb: Double = 0.15): Graph[Double, Double]
}
//tol:收敛时允许的误差,越小越精确, 确定迭代是否结束的参数
//resetProb:随机重置概率
//返回的仍是Graph,格式:(1,1.5453030618621122),本质上是找出顶点的重要性。
//使用不同的收敛误差,结果进行对比
scala> graphUserCall.pageRank(0.1).vertices.sortBy(x=> -(x._2)).collect.foreach(println)
(1,1.5453030618621122)
(4,1.035409289731306)
(6,1.0247865028119143)
(2,1.0247865028119143)
(3,0.7698396167465112)
(5,0.5998750260362425)scala> graphUserCall.pageRank(0.01).vertices.sortBy(x=> -(x._2)).collect.foreach(println)
(1,1.7757164399923602)
(6,1.0009207604397985)
(2,1.0009207604397985)
(4,0.9727164143364966)
(3,0.7024005336419639)
(5,0.5473250911495823)scala> graphUserCall.pageRank(0.001).vertices.sortBy(x=> -(x._2)).collect.foreach(println)
(1,1.7924127957615186)
(6,0.9969646507526428)
(2,0.9969646507526428)
(4,0.9688717814927128)
(3,0.6996243163176442)
(5,0.5451618049228396)scala> graphUserCall.pageRank(0.0001).vertices.sortBy(x=> -(x._2)).collect.foreach(println)
(1,1.7924127957615186)
(6,0.9969646507526428)
(2,0.9969646507526428)
(4,0.9688717814927128)
(3,0.6996243163176442)
(5,0.5451618049228396)

4.2 连通分量–Connected Component

  • 连通分量是一个子图,其中任何两个顶点通过一条边或一系列边相互连接,其顶点是原始图顶点集的子集,其边是原始图边集的子集
class Graph[VD, ED] {def connectedComponents(): Graph[VertexID, ED]
}

示例参考博客:GraphX之Connected Components

首先准备数据源
links.csv

1,2,friend
1,3,sister
2,4,brother
3,2,boss
4,5,client
1,9,friend
6,7,cousin
7,9,coworker
8,9,father
10,11,colleague
10,12,colleague
11,12,colleague

people.csv

4,Dave,25
6,Faith,21
8,Harvey,47
2,Bob,18
1,Alice,20
3,Charlie,30
7,George,34
9,Ivy,21
5,Eve,30
10,Lily,35
11,Helen,35
12,Ann,35

结构图:

scala> import org.apache.spark.graphx._scala> case class Person(name:String,age:Int)scala> val people =sc.textFile("file:///root/test/people.csv")scala> val peopleRDD=people.map(line=>line.split(',')).map(x=>(x(0).toLong,Person(x(1),x(2).toInt)))scala> val links=sc.textFile("file:///root/test/links.csv")scala> val linkRDD=links.map(x=>{val row =x.split(',');Edge(row(0).toLong,row(1).toLong,row(2))})scala> val graph=Graph(peopleRDD,linkRDD)
//使用connectedComponents
scala> val cc=graph.connectedComponentsscala> cc.vertices.collect.foreach(println)
(4,1)
(11,10)
(1,1)
(6,1)
(8,1)
(3,1)
(9,1)
(7,1)
(12,10)
(10,10)
(5,1)
(2,1)
//从结果中可以看到通过计算之后的图,每个顶点多了一个属性,这个属性表示的就是这个顶点所在的连通图中的最小顶点id。
//例如顶点11所在的连通图中的最小顶点id是10,顶点4所在的连通图中的最小顶点id是1。
//显然cc的格式还不是我们想要的。
//经过connectedComponents得到的结果,可以知道哪些顶点在一个连通图中,这样就可以将一个大图拆分成若干个连通子图。scala>val newGraph= cc.outerJoinVertices(peopleRDD)((id,mincc,people)=>(mincc,people.get.name,people.get.age))scala> newGraph.vertices.collect.foreach(println)
(4,(1,Dave,25))
(11,(10,Helen,35))
(1,(1,Alice,20))
(6,(1,Faith,21))
(8,(1,Harvey,47))
(3,(1,Charlie,30))
(9,(1,Ivy,21))
(7,(1,George,34))
(12,(10,Ann,35))
(10,(10,Lily,35))
(5,(1,Eve,30))
(2,(1,Bob,18))scala> cc.vertices.map(_._2).collect.distinct.foreach(mincc=>{val sub=newGraph.subgraph(vpred=(id,attr)=> attr._1==mincc);println(sub.triplets.collect.foreach(println))})
((1,(1,Alice,20)),(2,(1,Bob,18)),friend)
((1,(1,Alice,20)),(3,(1,Charlie,30)),sister)
((1,(1,Alice,20)),(9,(1,Ivy,21)),friend)
((2,(1,Bob,18)),(4,(1,Dave,25)),brother)
((3,(1,Charlie,30)),(2,(1,Bob,18)),boss)
((4,(1,Dave,25)),(5,(1,Eve,30)),client)
((6,(1,Faith,21)),(7,(1,George,34)),cousin)
((7,(1,George,34)),(9,(1,Ivy,21)),coworker)
((8,(1,Harvey,47)),(9,(1,Ivy,21)),father)
()
((10,(10,Lily,35)),(11,(10,Helen,35)),colleague)
((10,(10,Lily,35)),(12,(10,Ann,35)),colleague)
((11,(10,Helen,35)),(12,(10,Ann,35)),colleague)
()
/*分析:
1、通过connectedComponents得到的新图的顶点属性已经没有了原始的那些信息,所以需要和原始信息作一个join,例如val newGraph = cc.outerJoinVertices(peopleRDD)((id, cc, p)=>(cc,p.get.name,p.get.age))
2、cc.vertices.map(_._2).collect.distinct会得到所有连通图中id最小的顶点编号
3、通过连通图中最小顶点编号,使用subgraph方法得到每个连通子图
*/

总结一下实现思路:

  • 1、已有图graph,通过connectedComponents可以得到每个顶点所在的连通图中的最小顶点id,格式:(某顶点id,该顶点连通图最小顶点id),也是一个graph,取名为cc。
  • 2、将这个最小id加入到对应顶点的属性中,可以使用outerJoinVertices,得到一个新的graph。
  • 3、可以使用subgraph取出子图。

5、Pregel计算框架

可用于导航路线的优选。

class Graph[VD, ED] {  def pregel[A](initialMsg: A, maxIterations: Int, activeDirection: EdgeDirection)(vprog: (VertexID, VD, A) => VD,sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexID,A)],mergeMsg: (A, A) => A): Graph[VD, ED]
}
  • initialMsg:在“superstep 0”之前发送至顶点的初始消息
  • maxIterations:将要执行的最大迭代次数
  • activeDirection:发送消息方向(默认是出边方向:EdgeDirection.Out)
  • vprog:用户定义函数,用于顶点接收消息
  • sendMsg:用户定义的函数,用于确定下一个迭代发送的消息及发往何处
  • mergeMsg:用户定义的函数,在vprog前,合并到达顶点的多个消息

preple原理分析:https://blog.csdn.net/hanweileilei/article/details/89764466

import org.apache.spark.graphx._
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object PregelDemo {def main(args: Array[String]): Unit = {//1、创建SparkContextval conf = new SparkConf().setMaster("local[1]").setAppName("PregelDemo")val sc = SparkContext.getOrCreate(conf)//2、创建顶点val vertexArray = Array((1L, ("Alice", 28)),(2L, ("Bob", 27)),(3L, ("Charlie", 65)),(4L, ("David", 42)),(5L, ("Ed", 55)),(6L, ("Fran", 50)))val vertexRDD: RDD[(VertexId, (String,Int))] = sc.makeRDD(vertexArray)//3、创建边,边的属性代表 相邻两个顶点之间的距离val edgeArray = Array(Edge(2L, 1L, 7),Edge(2L, 4L, 2),Edge(3L, 2L, 4),Edge(3L, 6L, 3),Edge(4L, 1L, 1),Edge(2L, 5L, 2),Edge(5L, 3L, 8),Edge(5L, 6L, 3))val edgeRDD: RDD[Edge[Int]] = sc.makeRDD(edgeArray)//4、创建图(使用apply方式创建)val graph = Graph(vertexRDD, edgeRDD)//被计算的图中,起始顶点的idval srcVertexId=5Lval initialGraph:Graph[Double,Int]=graph.mapVertices{case (vid,(name,age))=>if(vid==srcVertexId) 0.0 else Double.PositiveInfinity}//initialGraph.vertices.collect().foreach(println)val pregelGraph:Graph[Double,Int]=initialGraph.pregel(Double.PositiveInfinity,Int.MaxValue,EdgeDirection.Out)((vid:VertexId,vd:Double,disMsg:Double)=>{val minDist:Double=math.min(vd,disMsg)println("vprog"+vid+" "+vd+" "+disMsg+" "+minDist)minDist},(edgeTriplet:EdgeTriplet[Double,PartitionID])=>{if (edgeTriplet.srcAttr+edgeTriplet.attr<edgeTriplet.dstAttr){println("sendMsg:"+edgeTriplet.srcId+" "+edgeTriplet.srcAttr+" "+edgeTriplet.dstId+" "+edgeTriplet.dstAttr)Iterator[(VertexId,Double)]((edgeTriplet.dstId,edgeTriplet.srcAttr+edgeTriplet.attr))}else{Iterator.empty}},(msg1:Double,msg2:Double)=>{math.min(msg1,msg2)})pregelGraph.vertices.collect().foreach(println)}
}
/*输出:
vprog4 Infinity Infinity Infinity
vprog1 Infinity Infinity Infinity
vprog6 Infinity Infinity Infinity
vprog3 Infinity Infinity Infinity
vprog5 0.0 Infinity 0.0
vprog2 Infinity Infinity Infinity
sendMsg:5 0.0 3 Infinity
sendMsg:5 0.0 6 Infinity
vprog6 Infinity 3.0 3.0
vprog3 Infinity 8.0 8.0
sendMsg:3 8.0 2 Infinity
vprog2 Infinity 12.0 12.0
sendMsg:2 12.0 1 Infinity
sendMsg:2 12.0 4 Infinity
vprog4 Infinity 14.0 14.0
vprog1 Infinity 19.0 19.0
sendMsg:4 14.0 1 19.0
vprog1 19.0 15.0 15.0
(4,14.0)
(1,15.0)
(6,3.0)
(3,8.0)
(5,0.0)
(2,12.0)
*/

基于Spark GraphX的图形数据分析相关推荐

  1. 基于Spark GraphX 的图形数据分析

    文章目录 为什么需要图计算 一.图(Graph)的基本概念 二.Spark GraphX 简介 三.GraphX API 1.属性图应用示例-1 2.属性图应用示例-2 3.查看图信息 4.图的算子 ...

  2. 杨鹏谈世纪佳缘推荐算法:基于Spark GraphX,弃GBDT和LR用FM

     杨鹏谈世纪佳缘推荐算法:基于Spark GraphX,弃GBDT和LR用FM 发表于2015-09-30 09:53| 1447次阅读| 来源CSDN| 2 条评论| 作者杨鹏 机器学习推荐算法 ...

  3. 【计算机大数据毕设之基于spark+hadoop的大数据分析论文写作参考案例】

    [计算机大数据毕设之基于spark+hadoop的大数据分析论文写作参考案例-哔哩哔哩] https://b23.tv/zKOtd3L 目  录 一 引言​1 二 系统分析​2 2.1 必要性和可行性 ...

  4. 基于Spark技术的银行客户数据分析

    基于Spark技术的银行客户数据分析 1. 实验室名称: 2. 实验项目名称: 一.业务场景 二.数据集说明 三.操作步骤 阶段一.启动HDFS.Spark集群服务和zeppelin服务器 阶段二.准 ...

  5. 【毕业设计_课程设计】基于Spark网易云音乐数据分析

    文章目录 0 项目说明 1 系统模块 2 分析内容 3 界面展示 4 项目工程 0 项目说明 基于Spark网易云音乐数据分析 提示:适合用于课程设计或毕业设计,工作量达标,源码开放 1 系统模块 包 ...

  6. 基于Spark的音乐专辑数据分析

    每天天都在努力学习的我们 前言 本篇博客讲解的内容依旧是使用Spark进行相关的数据分析,按理来说数据分析完之后应该搞一搞可视化的,由于目前时间紧张,顾不得学习可视化了,先来看一下此次的内容把. 在K ...

  7. 顶会论文阅读-22年CCF A级别spark graphX研究

    这篇文章是从dblp上面自行下载的唐老师发的A类文章,主要讲的是对spark源码当中sparkgraphX模块的优化: incgraph:基于Spark GraphX的分布式增量图计算模型和框架: 原 ...

  8. 基于Spark的气象数据分析

    研究背景与方案 1.1.研究背景 在大数据时代背景下,各行业数据的规模大幅度增加,数据类别日益复杂,给数据分析工作带来极大挑战.气象行业和人们的生活息息相关,随着信息时代的发展,大数据技术的出现为气象 ...

  9. 基于 Spark 的数据分析香港六合彩开奖号码采集官网实践

    引言: Spark是在借鉴了MapReduce之上发展香港六合彩开奖号码采集官网vip7.maltapi.com而来的,继承了其分布式并行计算的优点并改进了MapReduce明显的缺陷.Spark主要 ...

最新文章

  1. CentOS7 设置用户密码规则
  2. c++输出数据的二进制表示形式
  3. Spring及SpringBoot @Async配置步骤及注意事项
  4. DCMTK:可加载的DICOM数据字典
  5. 160809325贺彦
  6. 怎么查linux上谁删了文件,如何在 Linux 下快速找到被删除的文件?
  7. 三种传递gRPC动态参数方式的使用体验
  8. react sql格式化_为SQL Server数据库损坏做准备; 初步React与分析
  9. Session何时创建实例
  10. 在Mac中用快捷键快速插入日期时间
  11. uboot开机logo
  12. 冰点还原精灵授权问题解答
  13. 酒桌上的学问(搜集整理帖)
  14. 量子纠缠在量子计算机中的作用,解密量子计算机,量子叠加和量子纠缠是制胜关键...
  15. 数字信号常用典型序列(1)
  16. 计算机专业设计(论文)内容及要求,计算机专业毕业设计要求.doc
  17. matlab曼德勃罗集,YaK与您一起欣赏BBC纪录片:''''神秘的混沌理论''''
  18. 豫科技版计算机七年级上册,一上册 信息技术
  19. 《GPU编程与CG语言之阳春白雪下里巴人》阅读笔记 第一章+第二章
  20. 水果店毛利点计算公式,水果店月度毛利怎么算

热门文章

  1. Intel CPU简介
  2. 直播平台源码中直播系统捕获音视频的步骤
  3. 自定义视频播放器与慢放滚轮
  4. 学习记录-用Excel控制SIMPACK操作
  5. 安装Discuz开源论坛
  6. 封装Win10步骤和注意事项
  7. 蓝牙室内定位之AOA室内定位技术详解--新导智能
  8. 组件嵌套(对应米斯特吴19)
  9. 10. Android MultiMedia框架完全解析 - MediaExtractor::Create函数的解析和FslExtractor分析
  10. 前端 开关按钮样式_如何使用HTML5+css3制作出12种常用的按钮开关样式(附完整代码)...