发布日期:2022-08-03 VIP内容

GraphX案例:分析真实航班数据

我们使用2014年1月份的航班数据。对于每次航班,我们都有以下信息:

在这个场景中,我们将机场表示为顶点,而航线表示为边。我们对机场和航线的可视化很感兴趣,我们想知道有多少机场起飞或抵达。

// 首先导入依赖包
import org.apache.spark.graphx.{Edge, Graph, VertexId}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._

然后,定义一个case calss:

// 使用case class来定义Flight Schema,与CSV数据文件相对应
case class Flight(dofM:String,
                  dofW:String,
                  carrier:String,
                  tailnum:String,
                  flnum:Integer,
                  org_id:Long,
                  origin:String,
                  dest_id:Long,
                  dest:String,
                  crsdeptime:Double,
                  deptime:Double,
                  depdelaymins:Double,
                  crsarrtime:Double,
                  arrtime:Double,
                  arrdelay:Double,
                  crselapsedtime:Double,
                  dist:Integer
         )

接下来,加载数据文件,分别构造机场信息的RDD和航线信息的RDD。并分别以这两个RDD为顶点和边,来创建图模型。有两种方式可以实现:一种是直接加载为RDD,另一种是先加载为DataFrame,再转换为RDD。

方式一:直接加载为RDD,再将元素封装为Flight类,得到RDD[Flight]。

def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("graphx demo")
      .getOrCreate()

    // 1) 定义顶点
    // 加载数据文件到一个RDD
    val flightData = "src/main/data/flight/rita2014jan.csv"
    val textRDD = spark.sparkContext.textFile(flightData)

    // 一种方式: 加载为RDD,再将元素封装为Flight类,得到RDD[Flight]
    // 下面的函数将数据文件中的一行解析为Flight类
    def parseFlight(str: String): Flight = {
      // 这一行是针对arrtime和arrdelay这两列,这两列有空值
      val line = str.split(",").map(word => if(word=="") "-1" else word)
      Flight(
        line(0),
        line(1),
        line(2),
        line(3),
        line(4).toInt,
        line(5).toLong,
        line(6),
        line(7).toLong,
        line(8),
        line(9).toDouble,
        line(10).toDouble,
        line(11).toDouble,
        line(12).toDouble,
        line(13).toDouble,
        line(14).toDouble,
        line(15).toDouble,
        line(16).toInt
      )
    }

    // 解析
    val flightsRDD = textRDD.map(parseFlight).cache()
    println("统计样本数:" + flightsRDD.count)
  }

执行以上代码,输出内容如下:

统计样本数:441622

方式二:先加载为DataFrame/Dataset,再转为RDD[Flight]。

def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("graphx demo")
      .getOrCreate()

    // schema
    val schema = new StructType()
      .add("dofM",StringType,nullable = true)
      .add("dofW",StringType,nullable = true)
      .add("carrier",StringType,nullable = true)
      .add("tailnum",StringType,nullable = true)
      .add("flnum",IntegerType,nullable = true)
      .add("org_id",LongType,nullable = true)
      .add("origin",StringType,nullable = true)
      .add("dest_id",LongType,nullable = true)
      .add("dest",StringType,nullable = true)
      .add("crsdeptime",DoubleType,nullable = true)
      .add("deptime",DoubleType,nullable = true)
      .add("depdelaymins",DoubleType,nullable = true)
      .add("crsarrtime",DoubleType,nullable = true)
      .add("arrtime",DoubleType,nullable = true)
      .add("arrdelay",DoubleType,nullable = true)
      .add("crselapsedtime",DoubleType,nullable = true)
      .add("dist",IntegerType,nullable = true)

    /* 另一种根据case class获得schema的方式
    import org.apache.spark.sql.types.StructType
    import org.apache.spark.sql.catalyst.ScalaReflection
    val schema = ScalaReflection.schemaFor[Flight].dataType.asInstanceOf[StructType]
    */
    /* 或者,试一试
    import org.apache.spark.sql.Encoders
    val schema = Encoders.product[Flight].schema
    */

    val flightData = "src/main/data/flight/rita2014jan.csv"
    val flightsDF = spark.read
      .option("header","false")
      .option("delimiter", ",")
      .option("timestampFormat", "yyyy-MM-dd HH:mm:ss")
      .schema(schema)
      .csv(flightData)

    flightsDF.cache
    flightsDF.printSchema
    println(flightsDF.count)
    flightsDF.show(5,truncate = false)

    println("------------------------")
    import spark.implicits._
    // 数据集中空值判断
    println("具有空值的行:" + flightsDF.where($"arrtime".isNull or $"arrdelay".isNull).count)
//    println("具有空值的行:" + flightsDF.where("arrtime is null or arrdelay is null").count)
    flightsDF.where($"arrtime".isNull or $"arrdelay".isNull).show()

    // 删除空值
    // val cleanedFlightsDF = flightsDF.na.drop
    // cleanedFlightsDF.count

    // 或对指定的列空值填充
    // val cleanedFlightsDF = flightsDF.na.fill(-1,Seq("arrtime","arrdelay"))     // 注意:-1不带双引号
    val cleanedFlightsDF = flightsDF.na.fill(Map("arrtime"->"-1","arrdelay"->"-1"))
//    val cleanedFlightsDF = flightsDF.na.fill(Map("arrtime" -> -1,"arrdelay" -> -1))  // 真是奇怪的用法啊!
    val flightsDS = cleanedFlightsDF.as[Flight]

    val flightsRDD = flightsDS.rdd
    flightsRDD.cache
    println(flightsRDD.count)
  }

将源数据加载到RDD中之后,接下来着手构建图模型。定义机场作为顶点,每个顶点是一个元组,由机场ID和机场名称组成 (flight.org_id, flight.origin)。例如:

ID	      	Property(V)
10397	 	ATL

边是机场之间的航线。边必须具有源、目标和属性。在我们的例子中,边包括:

Edge origin id                  → src (Long)
Edge destination id             → dest (Long)
Edge property distance          → distance (Long)

构建图模型的代码实现如下:

// 1)定义一个顶点RDD
val airports = flightsRDD.map(flight => (flight.org_id, flight.origin)).distinct
println("\n以机场为顶点:")
airports.take(5).foreach(println)

// 定义一个默认顶点nowhere
val nowhere = "nowhere"

// 将机场ID映射为3个字母的代码,用于输出
// Map(13024 -> LMT, 10785 -> BTV,…)
val airportMap = airports
      .map {
        case ((org_id), name) => org_id -> name
      }
      .collect
      .toList
      .toMap

// 2)定义一个边RDD
val edges = flightsRDD
      .map(flight => ((flight.org_id, flight.dest_id), flight.dist))
      .distinct
      .map {
        case ((org_id, dest_id), distance) => Edge(org_id.toLong, dest_id.toLong, distance)
      }
println("\n以航线为边:")
edges.take(5).foreach(println)

// 创建属性图
// 要创建一个图形,需要有一个顶点RDD、一个边RDD和一个默认顶点
val graph = Graph(airports, edges, nowhere)

// 图顶点
println("\n图顶点(机场):")
graph.vertices.take(5).foreach(println)

// 图的边
println("\n图的边(航线):")
graph.edges.take(5).foreach(println)

执行以上代码,输出内容如下:

以机场为顶点:
(14057,PDX)
(12177,HOB)
(14262,PSP)
(10529,BDL)
(11977,GRB)

以航线为边:
Edge(14869,14683,1087)
Edge(14683,14771,1482)
Edge(12982,14831,2466)
Edge(14893,10800,358)
Edge(11042,10721,563)

图顶点(机场):
(10208,AGS)
(10268,ALO)
(14828,SIT)
(14698,SBP)
(12278,ICT)

图的边(航线):
Edge(10135,10397,692)
Edge(10135,13930,654)
Edge(10140,10397,1269)
Edge(10140,10821,1670)
Edge(10140,11259,580)

我们已经根据真实航班数据,构建了一个图模型。接下来对该图模型进行分析,尝试回答以下问题:

  • 有多少机场?
  • 有多少航线?
  • 哪些航线的距离大于1000英里?
  • 根据距离对航线进行排序。
  • 所有机场中,最高的出度、入度和出入度和分别是多少?
  • 找出入港航班最多的3个机场。
  • 找出出港航班最多的3个机场。
  • 找出成本最低的航班。

实现代码如下:

    // 有多少机场?
    println("\n有多少机场? " + graph.numVertices)

    // 有多少航线?
    println("\n有多少航线? " + graph.numEdges)

    // 哪些航线的距离大于1000?
    println("\n哪些航线的距离大于1000? ")
    graph.edges
      .filter {
        case ( Edge(org_id, dest_id,distance))=> distance > 1000
      }
      .take(3)
      .foreach(println)

    // triplets
//    graph.triplets.take(3).foreach(println)

    // 排序并输出距离最远的航线
    println("\n根据距离对航线进行排序 ")
    graph.triplets
      .sortBy(_.attr, ascending=false)
      .map(triplet => "距离: " + triplet.attr + "," + triplet.srcAttr + " - " + triplet.dstAttr + ".")
      .take(10)
      .foreach(println)

    // 计算最高度数的顶点
    def max(a: (VertexId, Int), b: (VertexId, Int)): (VertexId, Int) = {
      if (a._2 > b._2) a else b
    }

    // 最高入度
    println("\n最高入度:" + graph.inDegrees.reduce(max))

    // 最高出度
    println("\n最高出度:" + graph.outDegrees.reduce(max))

    // 最高出入度
    println("\n最高出入度:" + graph.degrees.reduce(max))

    // 获取id为10397的机场名称
    println("\nid为10397的机场名称:" + airportMap(10397))

    // 哪个机场的入港航班最多? top 3
    println("\n哪3个机场的入港航班最多?")
    graph.inDegrees
      .collect
      .sortWith(_._2 > _._2)
      .map(x => (airportMap(x._1), x._2))
      .take(3)
      .foreach(println)

    // 哪个机场的离港航班最多?
    println("\n哪3个机场的出港航班最多?")
    graph.outDegrees
      .join(airports)
      .sortBy(_._2._1, ascending=false)
      .take(3)
      .foreach(println)

    // 根据PageRank,最重要的机场是哪个?
    val ranks = graph.pageRank(0.1).vertices
    println("\n最重要的机场是:")
    ranks
      .join(airports)
      .sortBy(_._2._1, ascending = false)
      .map(_._2._2)           // 获得机场名称
      .take(3)
      .foreach(println)

    // Pregel
    // 计算最便宜的机票,使用下面的公式计算机票:50 + distance / 20
    // 开始顶点
    val sourceId = 13024

    // 边包含机票成本计算的图
    val gg = graph.mapEdges(e => 50.toDouble + e.attr.toDouble/20 )

    // 初始化图,除源点外的所有顶点的距离均为无穷大
    val initialGraph = gg.mapVertices((id, _) => if (id == sourceId) 0.0 else Double.PositiveInfinity)

    // 在图上调用pregel
    val ssp = initialGraph.pregel(Double.PositiveInfinity)(
      // 顶点程序
      (id, price, newPrice) => math.min(price, newPrice),
      // 发送消息
      triplet => {
        if (triplet.srcAttr + triplet.attr < triplet.dstAttr) {
          Iterator((triplet.dstId, triplet.srcAttr + triplet.attr))
        } else {
          Iterator.empty
        }
      },
      // 合并消息
      (a,b) => math.min(a,b)
    )

    // 各个航线的票价
    println("\n各个航线的票价:")
    ssp.edges.take(4).foreach(println)

    // 将机场代码转换为机场名称显示,按票价排序
    println("\n将机场代码转换为机场名称显示,按票价排序:")
    ssp.edges
      .map{
        case (Edge(org_id, dest_id, price))=> (airportMap(org_id), airportMap(dest_id), price)
      }
      .takeOrdered(10)(Ordering.by(_._3))
      .foreach(println)

    // 各个机场的最低票价
    println("\n各个机场的最低票价:")
    ssp.vertices.take(4).foreach(println)

    // 将机场代码转换为机场名称显示,并按最低票价排序
    println("\n将机场代码转换为机场名称显示,并按最低票价排序:")
    ssp.vertices
      .map(x => (airportMap(x._1), x._2))
      .collect
      .sortWith(_._2 < _._2)
      .take(5)
      .foreach(println)

执行以上代码,输出内容如下:

有多少机场? 301

有多少航线? 4090

哪些航线的距离大于1000英里? 
Edge(10140,10397,1269)
Edge(10140,10821,1670)
Edge(10140,12264,1628)

根据距离对航线进行排序 
距离: 4983,JFK - HNL.
距离: 4983,HNL - JFK.
距离: 4963,EWR - HNL.
距离: 4963,HNL - EWR.
距离: 4817,HNL - IAD.
距离: 4817,IAD - HNL.
距离: 4502,ATL - HNL.
距离: 4502,HNL - ATL.
距离: 4243,HNL - ORD.
距离: 4243,ORD - HNL.

最高入度:(10397,152)

最高出度:(10397,153)

最高出入度:(10397,305)

id为10397的机场名称:ATL

哪3个机场的入港航班最多?
(ATL,152)
(ORD,145)
(DFW,143)

哪3个机场的出港航班最多?
(10397,(153,ATL))
(13930,(146,ORD))
(11298,(143,DFW))

最重要的机场是:
ATL
ORD
DFW

各个航线的票价:
Edge(10135,10397,84.6)
Edge(10135,13930,82.7)
Edge(10140,10397,113.45)
Edge(10140,10821,133.5)

将机场代码转换为机场名称显示,按票价排序:
(WRG,PSG,51.55)
(PSG,WRG,51.55)
(CEC,ACV,52.8)
(ACV,CEC,52.8)
(ORD,MKE,53.35)
(IMT,RHI,53.35)
(MKE,ORD,53.35)
(RHI,IMT,53.35)
(STT,SJU,53.4)
(SJU,STT,53.4)

各个机场的最低票价:
(10208,277.79999999999995)
(10268,260.7)
(14828,261.65)
(14698,125.25)

将机场代码转换为机场名称显示,并按最低票价排序:
(LMT,0.0)
(PDX,62.05)
(SFO,65.75)
(EUG,117.35)
(RDM,117.85)