使用GraphX Pregel API
图本身是递归数据结构,顶点的属性依赖于它们邻居的属性,这些邻居的属性又依赖于自己邻居的属性。所以许多重要的图算法都是迭代的重新计算每个顶点的属性,直到满足某个确定的条件。
Pregel是一种面向图算法的分布式编程框架,采用迭代的计算模型:在每一轮,每个顶点处理上一轮收到的消息,并发出消息给其它顶点,并更新自身状态和拓扑结构(出、入边)等。GraphX实现了一个类似pregel的批量同步消息传递API,这是基于Bulk Synchronous Parallel (BSP,分布式批同步) 算法的。
GraphX中实现的这个更高级的Pregel操作是一个约束到图拓扑的批量同步(bulk-synchronous)并行消息抽象。Pregel操作符在一系列的超步(super steps)中执行。在每一次超步中,顶点的计算都是并行的,并且执行用户定义的同一个函数。每个顶点可以修改其自身的状态信息或以它为起点的出边的信息,从前序超步中接受消息,并传送给其后续超步,或者修改整个图的拓扑结构。边,在这种计算模式中并不是核心对象,没有相应的计算运行在其上。因此,在每一个超步中,总是按如下顺序执行:
- 顶点从上一个super step接收它们的入站(inbound)消息的总和;
- 为顶点属性计算一个新的值;
- 在下一个super step中向相邻的顶点发送消息。
消息通过边triplet的一个函数被并行计算,消息的计算既会访问源顶点特征也会访问目的顶点特征。在超步中,没有收到消息的顶点会被跳过。当没有消息遗留时,Pregel操作将结束迭代并返回最终的图。
Pregel计算模型中有三个重要的函数,分别是vertexProgram()、sendMessage()和messageCombiner()。
- vertexProgram:用户定义的顶点运行程序。它作用于每一个顶点,负责接收进来的信息,并计算新的顶点值。
- sendMsg:发送消息。
- mergeMsg:合并消息。
下面是使用Pregel API的一个示例。
import org.apache.spark.graphx._
import org.apache.spark.sql.SparkSession
......
// case class,用来代表顶点属性,存储一个用户的信息
case class User(name: String, age: Int)
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[*]")
.appName("graphx demo")
.getOrCreate()
// 构造顶点RDDs
val usersRDD = spark.sparkContext.parallelize(
List(
(1L, User("Alex", 26)),
(2L, User("Bill", 42)),
(3L, User("Carol", 18)),
(4L, User("Dave", 16)),
(5L, User("Eve", 45)),
(6L, User("Farell", 30)),
(7L, User("Garry", 32)),
(8L, User("Harry", 36)),
(9L, User("Ivan", 28)),
(10L, User("Jill", 48))
)
)
// 构造边RDDs
val followsRDD = spark.sparkContext.parallelize(
List(
Edge(1L, 2L, 1),
Edge(2L, 3L, 1),
Edge(3L, 1L, 1),
Edge(3L, 4L, 1),
Edge(3L, 5L, 1),
Edge(4L, 5L, 1),
Edge(6L, 5L, 1),
Edge(7L, 6L, 1),
Edge(6L, 8L, 1),
Edge(7L, 8L, 1),
Edge(7L, 9L, 1),
Edge(9L, 8L, 1),
Edge(8L, 10L, 1),
Edge(10L, 9L, 1),
Edge(1L, 11L, 1)
)
)
// 创建默认属性集(id为11的顶点没有任何属性)
// 它会将默认属性分配给没有被显式分配任何属性的顶点
val defaultUser = User("NA", 0)
// 构造图对象
val socialGraph = Graph(usersRDD, followsRDD, defaultUser)
// 根据从一个顶点的出度为每条边分配一个权重
val outDegrees = socialGraph.outDegrees
val outDegreesGraph = socialGraph.outerJoinVertices(outDegrees) {
(vId, vData, outDeg) => outDeg.getOrElse(0)
}
val weightedEdgesGraph = outDegreesGraph.mapTriplets{et => 1.0 / et.srcAttr }
// 接下来,为每个用户分配一个初始影响等级
val inputGraph = weightedEdgesGraph.mapVertices((id, vData) => 1.0)
// 初始化传递给pregel方法的参数
// 将传递给pregel方法的三个参数的第一个集合。
val firstMessage = 0.0
val iterations = 20
val edgeDirection = EdgeDirection.Out
// 接下来,定义作为参数传递给pregel方法的三个函数
val updateVertex = (vId: Long, vData: Double, msgSum: Double) => 0.15 + 0.85 * msgSum
val sendMsg = (triplet: EdgeTriplet[Double, Double]) => Iterator((triplet.dstId, triplet.srcAttr * triplet.attr))
val aggregateMsgs = (x: Double, y: Double ) => x + y
// 现在调用pregel方法
/* Pregel是一种分布式计算模型,用于处理具有数十亿顶点和数万亿条边的大型图形 */
val influenceGraph = inputGraph
.pregel(firstMessage, iterations, edgeDirection)(updateVertex, sendMsg, aggregateMsgs)
// 打印每个用户的姓名和影响排名来检查计算结果
val userNames = socialGraph.mapVertices{(vId, user) => user.name}.vertices
val userNamesAndRanks = influenceGraph
.outerJoinVertices(userNames){(vId, rank, optUserName) => (optUserName.get, rank)}
.vertices
userNamesAndRanks.collect.foreach{
case(vId, vData) => println(vData._1 +"的影响力: " + vData._2)
}
}
输出结果如下:
Harry的影响力: 0.9733813220346056 Alex的影响力: 0.2546939612521768 Ivan的影响力: 0.9670543985781628 Jill的影响力: 0.9718993399637271 Bill的影响力: 0.25824491587871073 NA的影响力: 0.25824491587871073 Carol的影响力: 0.36950816084343974 Dave的影响力: 0.2546939612521768 Eve的影响力: 0.47118379300959834 Farell的影响力: 0.1925 Garry的影响力: 0.15
注意,与标准的Pregel实现不同的是,GraphX的Pregel实现中的顶点仅仅能发送信息给邻居顶点,并且可以利用用户自定义的消息函数并行地构造消息。这些限制允许对GraphX进行额外的优化。
下面是另一个使用Pregel实现的图算法:最短路径算法。
import org.apache.spark.sql.SparkSession
import org.apache.spark.graphx._
import org.apache.spark.graphx.util.GraphGenerators
......
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[*]")
.appName("graphx demo")
.getOrCreate()
val graph = GraphGenerators
.logNormalGraph(spark.sparkContext, numVertices = 100)
.mapEdges(e => e.attr.toDouble)
val sourceId: VertexId = 42 // 最终的源
// 初始化图
val initialGraph = graph.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)
val sssp = initialGraph.pregel(Double.PositiveInfinity)(
(id, dist, newDist) => math.min(dist, newDist), // Vertex Program
triplet => { // 发送消息
if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
} else {
Iterator.empty
}
},
(a,b) => math.min(a,b) // 合并消息
)
println(sssp.vertices.take(10).mkString("\n"))
}
输出结果如下所示:
(96,2.0) (56,2.0) (16,1.0) (80,2.0) (48,2.0) (32,2.0) (0,2.0) (24,2.0) (64,2.0) (40,1.0)
上面的例子中,Vertex Program函数定义如下:
(id, dist, newDist) => math.min(dist, newDist)
这个函数的定义显而易见,当两个消息来的时候,取它们当中路径的最小值。同理Merge Message函数也是同样的含义。
Send Message函数中,会首先比较triplet.srcAttr + triplet.attr和triplet.dstAttr,即比较加上边的属性后,这个值是否小于目的节点的属性,如果小于,则发送消息到目的顶点。
Pregel框架的缺点
这个模型虽然简单,但是缺陷明显,那就是对于邻居数很多的顶点,它需要处理的消息非常庞大,而且在这个模式下,它们是无法被并发处理的。所以对于符合幂律分布的自然图,这种计算模型下很容易发生假死或者崩溃。