发布日期:2022-08-03
VIP内容
GraphX案例:分析真实航班数据
我们使用2014年1月份的航班数据。对于每次航班,我们都有以下信息:
在这个场景中,我们将机场表示为顶点,而航线表示为边。我们对机场和航线的可视化很感兴趣,我们想知道有多少机场起飞或抵达。
// 首先导入依赖包
import org.apache.spark.graphx.{Edge, Graph, VertexId}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
然后,定义一个case calss:
// 使用case class来定义Flight Schema,与CSV数据文件相对应
case class Flight(dofM:String,
dofW:String,
carrier:String,
tailnum:String,
flnum:Integer,
org_id:Long,
origin:String,
dest_id:Long,
dest:String,
crsdeptime:Double,
deptime:Double,
depdelaymins:Double,
crsarrtime:Double,
arrtime:Double,
arrdelay:Double,
crselapsedtime:Double,
dist:Integer
)
接下来,加载数据文件,分别构造机场信息的RDD和航线信息的RDD。并分别以这两个RDD为顶点和边,来创建图模型。有两种方式可以实现:一种是直接加载为RDD,另一种是先加载为DataFrame,再转换为RDD。
方式一:直接加载为RDD,再将元素封装为Flight类,得到RDD[Flight]。
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[*]")
.appName("graphx demo")
.getOrCreate()
// 1) 定义顶点
// 加载数据文件到一个RDD
val flightData = "src/main/data/flight/rita2014jan.csv"
val textRDD = spark.sparkContext.textFile(flightData)
// 一种方式: 加载为RDD,再将元素封装为Flight类,得到RDD[Flight]
// 下面的函数将数据文件中的一行解析为Flight类
def parseFlight(str: String): Flight = {
// 这一行是针对arrtime和arrdelay这两列,这两列有空值
val line = str.split(",").map(word => if(word=="") "-1" else word)
Flight(
line(0),
line(1),
line(2),
line(3),
line(4).toInt,
line(5).toLong,
line(6),
line(7).toLong,
line(8),
line(9).toDouble,
line(10).toDouble,
line(11).toDouble,
line(12).toDouble,
line(13).toDouble,
line(14).toDouble,
line(15).toDouble,
line(16).toInt
)
}
// 解析
val flightsRDD = textRDD.map(parseFlight).cache()
println("统计样本数:" + flightsRDD.count)
}
执行以上代码,输出内容如下:
统计样本数:441622
方式二:先加载为DataFrame/Dataset,再转为RDD[Flight]。
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.master("local[*]")
.appName("graphx demo")
.getOrCreate()
// schema
val schema = new StructType()
.add("dofM",StringType,nullable = true)
.add("dofW",StringType,nullable = true)
.add("carrier",StringType,nullable = true)
.add("tailnum",StringType,nullable = true)
.add("flnum",IntegerType,nullable = true)
.add("org_id",LongType,nullable = true)
.add("origin",StringType,nullable = true)
.add("dest_id",LongType,nullable = true)
.add("dest",StringType,nullable = true)
.add("crsdeptime",DoubleType,nullable = true)
.add("deptime",DoubleType,nullable = true)
.add("depdelaymins",DoubleType,nullable = true)
.add("crsarrtime",DoubleType,nullable = true)
.add("arrtime",DoubleType,nullable = true)
.add("arrdelay",DoubleType,nullable = true)
.add("crselapsedtime",DoubleType,nullable = true)
.add("dist",IntegerType,nullable = true)
/* 另一种根据case class获得schema的方式
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.catalyst.ScalaReflection
val schema = ScalaReflection.schemaFor[Flight].dataType.asInstanceOf[StructType]
*/
/* 或者,试一试
import org.apache.spark.sql.Encoders
val schema = Encoders.product[Flight].schema
*/
val flightData = "src/main/data/flight/rita2014jan.csv"
val flightsDF = spark.read
.option("header","false")
.option("delimiter", ",")
.option("timestampFormat", "yyyy-MM-dd HH:mm:ss")
.schema(schema)
.csv(flightData)
flightsDF.cache
flightsDF.printSchema
println(flightsDF.count)
flightsDF.show(5,truncate = false)
println("------------------------")
import spark.implicits._
// 数据集中空值判断
println("具有空值的行:" + flightsDF.where($"arrtime".isNull or $"arrdelay".isNull).count)
// println("具有空值的行:" + flightsDF.where("arrtime is null or arrdelay is null").count)
flightsDF.where($"arrtime".isNull or $"arrdelay".isNull).show()
// 删除空值
// val cleanedFlightsDF = flightsDF.na.drop
// cleanedFlightsDF.count
// 或对指定的列空值填充
// val cleanedFlightsDF = flightsDF.na.fill(-1,Seq("arrtime","arrdelay")) // 注意:-1不带双引号
val cleanedFlightsDF = flightsDF.na.fill(Map("arrtime"->"-1","arrdelay"->"-1"))
// val cleanedFlightsDF = flightsDF.na.fill(Map("arrtime" -> -1,"arrdelay" -> -1)) // 真是奇怪的用法啊!
val flightsDS = cleanedFlightsDF.as[Flight]
val flightsRDD = flightsDS.rdd
flightsRDD.cache
println(flightsRDD.count)
}
将源数据加载到RDD中之后,接下来着手构建图模型。定义机场作为顶点,每个顶点是一个元组,由机场ID和机场名称组成 (flight.org_id, flight.origin)。例如:
ID Property(V) 10397 ATL
边是机场之间的航线。边必须具有源、目标和属性。在我们的例子中,边包括:
Edge origin id → src (Long) Edge destination id → dest (Long) Edge property distance → distance (Long)
构建图模型的代码实现如下:
// 1)定义一个顶点RDD
val airports = flightsRDD.map(flight => (flight.org_id, flight.origin)).distinct
println("\n以机场为顶点:")
airports.take(5).foreach(println)
// 定义一个默认顶点nowhere
val nowhere = "nowhere"
// 将机场ID映射为3个字母的代码,用于输出
// Map(13024 -> LMT, 10785 -> BTV,…)
val airportMap = airports
.map {
case ((org_id), name) => org_id -> name
}
.collect
.toList
.toMap
// 2)定义一个边RDD
val edges = flightsRDD
.map(flight => ((flight.org_id, flight.dest_id), flight.dist))
.distinct
.map {
case ((org_id, dest_id), distance) => Edge(org_id.toLong, dest_id.toLong, distance)
}
println("\n以航线为边:")
edges.take(5).foreach(println)
// 创建属性图
// 要创建一个图形,需要有一个顶点RDD、一个边RDD和一个默认顶点
val graph = Graph(airports, edges, nowhere)
// 图顶点
println("\n图顶点(机场):")
graph.vertices.take(5).foreach(println)
// 图的边
println("\n图的边(航线):")
graph.edges.take(5).foreach(println)
执行以上代码,输出内容如下:
以机场为顶点: (14057,PDX) (12177,HOB) (14262,PSP) (10529,BDL) (11977,GRB) 以航线为边: Edge(14869,14683,1087) Edge(14683,14771,1482) Edge(12982,14831,2466) Edge(14893,10800,358) Edge(11042,10721,563) 图顶点(机场): (10208,AGS) (10268,ALO) (14828,SIT) (14698,SBP) (12278,ICT) 图的边(航线): Edge(10135,10397,692) Edge(10135,13930,654) Edge(10140,10397,1269) Edge(10140,10821,1670) Edge(10140,11259,580)
我们已经根据真实航班数据,构建了一个图模型。接下来对该图模型进行分析,尝试回答以下问题:
- 有多少机场?
- 有多少航线?
- 哪些航线的距离大于1000英里?
- 根据距离对航线进行排序。
- 所有机场中,最高的出度、入度和出入度和分别是多少?
- 找出入港航班最多的3个机场。
- 找出出港航班最多的3个机场。
- 找出成本最低的航班。
实现代码如下:
// 有多少机场?
println("\n有多少机场? " + graph.numVertices)
// 有多少航线?
println("\n有多少航线? " + graph.numEdges)
// 哪些航线的距离大于1000?
println("\n哪些航线的距离大于1000? ")
graph.edges
.filter {
case ( Edge(org_id, dest_id,distance))=> distance > 1000
}
.take(3)
.foreach(println)
// triplets
// graph.triplets.take(3).foreach(println)
// 排序并输出距离最远的航线
println("\n根据距离对航线进行排序 ")
graph.triplets
.sortBy(_.attr, ascending=false)
.map(triplet => "距离: " + triplet.attr + "," + triplet.srcAttr + " - " + triplet.dstAttr + ".")
.take(10)
.foreach(println)
// 计算最高度数的顶点
def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
if (a._2 > b._2) a else b
}
// 最高入度
println("\n最高入度:" + graph.inDegrees.reduce(max))
// 最高出度
println("\n最高出度:" + graph.outDegrees.reduce(max))
// 最高出入度
println("\n最高出入度:" + graph.degrees.reduce(max))
// 获取id为10397的机场名称
println("\nid为10397的机场名称:" + airportMap(10397))
// 哪个机场的入港航班最多? top 3
println("\n哪3个机场的入港航班最多?")
graph.inDegrees
.collect
.sortWith(_._2 > _._2)
.map(x => (airportMap(x._1), x._2))
.take(3)
.foreach(println)
// 哪个机场的离港航班最多?
println("\n哪3个机场的出港航班最多?")
graph.outDegrees
.join(airports)
.sortBy(_._2._1, ascending=false)
.take(3)
.foreach(println)
// 根据PageRank,最重要的机场是哪个?
val ranks = graph.pageRank(0.1).vertices
println("\n最重要的机场是:")
ranks
.join(airports)
.sortBy(_._2._1, ascending = false)
.map(_._2._2) // 获得机场名称
.take(3)
.foreach(println)
// Pregel
// 计算最便宜的机票,使用下面的公式计算机票:50 + distance / 20
// 开始顶点
val sourceId = 13024
// 边包含机票成本计算的图
val gg = graph.mapEdges(e => 50.toDouble + e.attr.toDouble/20 )
// 初始化图,除源点外的所有顶点的距离均为无穷大
val initialGraph = gg.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)
// 在图上调用pregel
val ssp = initialGraph.pregel(Double.PositiveInfinity)(
// 顶点程序
(id, price, newPrice) => math.min(price, newPrice),
// 发送消息
triplet => {
if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
} else {
Iterator.empty
}
},
// 合并消息
(a,b) => math.min(a,b)
)
// 各个航线的票价
println("\n各个航线的票价:")
ssp.edges.take(4).foreach(println)
// 将机场代码转换为机场名称显示,按票价排序
println("\n将机场代码转换为机场名称显示,按票价排序:")
ssp.edges
.map{
case (Edge(org_id, dest_id, price))=> (airportMap(org_id), airportMap(dest_id), price)
}
.takeOrdered(10)(Ordering.by(_._3))
.foreach(println)
// 各个机场的最低票价
println("\n各个机场的最低票价:")
ssp.vertices.take(4).foreach(println)
// 将机场代码转换为机场名称显示,并按最低票价排序
println("\n将机场代码转换为机场名称显示,并按最低票价排序:")
ssp.vertices
.map(x => (airportMap(x._1), x._2))
.collect
.sortWith(_._2 < _._2)
.take(5)
.foreach(println)
执行以上代码,输出内容如下:
有多少机场? 301 有多少航线? 4090 哪些航线的距离大于1000英里? Edge(10140,10397,1269) Edge(10140,10821,1670) Edge(10140,12264,1628) 根据距离对航线进行排序 距离: 4983,JFK - HNL. 距离: 4983,HNL - JFK. 距离: 4963,EWR - HNL. 距离: 4963,HNL - EWR. 距离: 4817,HNL - IAD. 距离: 4817,IAD - HNL. 距离: 4502,ATL - HNL. 距离: 4502,HNL - ATL. 距离: 4243,HNL - ORD. 距离: 4243,ORD - HNL. 最高入度:(10397,152) 最高出度:(10397,153) 最高出入度:(10397,305) id为10397的机场名称:ATL 哪3个机场的入港航班最多? (ATL,152) (ORD,145) (DFW,143) 哪3个机场的出港航班最多? (10397,(153,ATL)) (13930,(146,ORD)) (11298,(143,DFW)) 最重要的机场是: ATL ORD DFW 各个航线的票价: Edge(10135,10397,84.6) Edge(10135,13930,82.7) Edge(10140,10397,113.45) Edge(10140,10821,133.5) 将机场代码转换为机场名称显示,按票价排序: (WRG,PSG,51.55) (PSG,WRG,51.55) (CEC,ACV,52.8) (ACV,CEC,52.8) (ORD,MKE,53.35) (IMT,RHI,53.35) (MKE,ORD,53.35) (RHI,IMT,53.35) (STT,SJU,53.4) (SJU,STT,53.4) 各个机场的最低票价: (10208,277.79999999999995) (10268,260.7) (14828,261.65) (14698,125.25) 将机场代码转换为机场名称显示,并按最低票价排序: (LMT,0.0) (PDX,62.05) (SFO,65.75) (EUG,117.35) (RDM,117.85)