GraphX案例:分析家庭成员关系
现有两个文件,vertexs.csv和edges.csv,分别存储一个家庭的成员及他们的关系。下面使用Spark GraphX分析该家庭成员的关系。
实现代码如下:
import org.apache.spark.graphx.{Edge, Graph}
import org.apache.spark.sql.SparkSession
......
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[*]")
.appName("graphx demo")
.getOrCreate()
val vertexFile = "src/main/data/family/vertexs.csv"
val edgeFile = "src/main/data/family/edges.csv"
// 构造顶点
val vertices = spark.sparkContext
.textFile(vertexFile)
.map { line =>
val fields = line.split(",")
( fields(0).toLong, (fields(1), fields(2)) )
}
// 构造边
val edges = spark.sparkContext
.textFile(edgeFile)
.map { line =>
val fields = line.split(",")
Edge(fields(0).toLong, fields(1).toLong, fields(2))
}
// 默认缺失的顶点
val default = ("Unknown", "Missing")
// 构造图
val graph = Graph(vertices, edges, default)
// 查看图的顶点和边
println( "顶点数量:" + graph.vertices.count )
println( "边的数量:" + graph.edges.count )
// 统计年龄小于40岁的家庭成员数
val c1 = graph.vertices
.filter {
case (id, (name, age)) => age.toLong > 40
}
.count
println("\n年龄小于40岁的家庭成员数:" + c1)
// 统计父子或母子关系的数量
val c2 = graph.edges
.filter {
case Edge(from, to, property) => property == "Father" | property == "Mother"
}
.count
println("\n父子或母子关系的数量:" + c2)
// println( "Vertices count : " + c1 )
// println( "Edges count : " + c2 )
// 家庭成员影响力统计(使用page rank算法)
val tolerance = 0.0001
val ranking = graph.pageRank(tolerance).vertices
val rankByPerson = vertices.join(ranking).map {
case (id, ((person, age), rank)) => (rank, id, person)
}
println("\n家庭影响力:")
rankByPerson.collect().foreach {
case (rank, id, person) => println ( f"Rank $rank%1.2f id $id person $person")
}
/*
三角形计数算法提供了一个基于顶点的三角形数量计数,与这个顶点相关联。
这对于路由查找非常有用,因为在路由规划中需要生成最小的、无三角的生成树图。
*/
val tCount = graph.triangleCount().vertices
println("\n三角计数" + tCount.collect().mkString("\n"))
/*
当从数据创建一个大图时,它可能包含未连接的子图,即相互隔离的子图,并且它们之间不包含桥接或连接边。
连通组件算法和强连通组件算法提供了这种连接性的度量。
*/
val iterations = 1000
val connected = graph.connectedComponents().vertices
val connectedS = graph.stronglyConnectedComponents(iterations).vertices
// 然后将顶点计数与原始的顶点记录连接起来,这样连接计数就可以与顶点信息相关联,例如此人的姓名
val connByPerson = vertices.join(connected).map {
case (id, ((person,age), conn)) => (conn, id, person)
}
println("\n连通组件:")
connByPerson.collect().foreach {
case (conn, id, person) => println ( f"Weak $conn $id $person" )
}
val connByPersonS = vertices.join(connectedS).map {
case (id, ((person,age), conn )) => (conn, id, person)
}
println("\n强连通组件:")
connByPersonS.collect().foreach {
case (conn, id, person) => println ( f"Strong $conn $id $person" )
}
}
执行以上代码,输出内容如下:
顶点数量:6 边的数量:12 年龄小于40岁的家庭成员数:4 父子或母子关系的数量:4 家庭影响力: Rank 0.15 id 4 person Jim Rank 0.15 id 6 person Flo Rank 1.62 id 2 person Sarah Rank 1.82 id 1 person Mike Rank 1.13 id 3 person John Rank 1.13 id 5 person Kate 三角计数(4,0) (6,0) (2,2) (1,2) (3,1) (5,1) 连通组件: Weak 1 4 Jim Weak 1 6 Flo Weak 1 2 Sarah Weak 1 1 Mike Weak 1 3 John Weak 1 5 Kate 强连通组件: Strong 4 4 Jim Strong 6 6 Flo Strong 1 2 Sarah Strong 1 1 Mike Strong 1 3 John Strong 1 5 Kate