使用GraphX分析航班数据
下面是一个简单的示例,我们将分析三个航班。每次航班,我们都有以下信息:
在这个场景中,我们将机场表示为顶点,而航线表示为边。对于我们的图,我们将有三个顶点,每个顶点代表一个机场。机场之间的距离为航线属性,如下图所示:
其中代表机场的顶点表如下:
代表航线的边表如下:
首先,我们将导入GraphX包:
import org.apache.spark.graphx.{Edge, Graph}
import org.apache.spark.sql.SparkSession
然后构建图来表示机场及航线数据。代码如下:
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[*]")
.appName("graphx demo")
.getOrCreate()
// 我们将机场定义为顶点
// 每个顶点由以下内容组成(机场ID,机场名称)
val vertices = Array((1L, "SFO"),(2L, "ORD"),(3L,"DFW"))
val vRDD = spark.sparkContext.parallelize(vertices)
// 定义一个默认的顶点,叫做nowhere
val nowhere = "nowhere"
// 边是机场之间的航线。边必须具有源、目标和属性。
// 在这里,边包括: (出港机场id,入港机场id, 航线距离)
val edges = Array(Edge(1L,2L,1800),Edge(2L,3L,800),Edge(3L,1L,1400))
val eRDD = spark.sparkContext.parallelize(edges)
// 下面创建属性图
val graph = Graph(vRDD, eRDD, nowhere)
// 图的顶点(所有机场)
println("所有机场:")
graph.vertices.collect.foreach(println)
// 图的边(所有航线)
println("\n所有航线:")
graph.edges.collect.foreach(println)
}
执行以上代码,输出内容如下:
所有机场: (1,SFO) (2,ORD) (3,DFW) 所有航线: Edge(1,2,1800) Edge(2,3,800) Edge(3,1,1400)
接下来,我们需要回答以下问题:
- 1)有多少个机场?
- 2)有多少条航线?
- 3)哪些航线的距离 > 1000英里?
- 4)显示所有机场和航线的信息。
- 5)排序并输出最长距离的航线。
代码如下所示:
// 1. 有多少个机场?
println("有多少个机场? " + graph.numVertices)
// 2. 有多少条航线?
println("有多少条航线? " + graph.numEdges)
// 3. 哪些航线的距离 > 1000英里?
println("\n哪些航线的距离超过了1000英里?")
graph.edges
.filter {
case Edge(src, dst, dis) => dis > 1000
}
.collect
.foreach(println)
// 4. 显示所有机场和航线的信息
println("\n显示所有机场和航线的信息:")
graph.triplets.collect.foreach(println)
// 5. 排序并输出最长距离的航线
println("----------------------------------")
graph.triplets
.sortBy(_.attr, ascending=false)
.map(triplet => "距离: " + triplet.attr + "," + triplet.srcAttr + "-" + triplet.dstAttr + ".")
.collect
.foreach(println)
执行以上代码,输出内容如下:
有多少个机场? 3 有多少条航线? 3 哪些航线的距离超过了1000英里? Edge(1,2,1800) Edge(3,1,1400) 显示所有机场和航线的信息: ((1,SFO),(2,ORD),1800) ((2,ORD),(3,DFW),800) ((3,DFW),(1,SFO),1400) ---------------------------------- 距离: 1800,SFO-ORD. 距离: 1400,DFW-SFO. 距离: 800,ORD-DFW.