发布日期:2022-08-09 VIP内容

保存和加载GraphFrame

由于GraphFrame是在DataFrames API上构建的,因此它们支持保存和加载各种数据源。在下面的代码中,展示了如何将顶点和边保存到HDFS上的Parquet文件中,然后从持久化存储中重新创建顶点和边DataFrame,并创建图模型。

首先定义两个case class,代表顶点的边的结构,代码如下:

// case class定义顶点和边的schema
case class Airport(id: String, city: String) extends Serializable

case class Flight(id: String, src: String,dst: String, dist: Double, delay: Double) extends Serializable

然后创建边DataFrame、顶点DataFrame并构造图模型,代码如下:

// 创建顶点DataFrame
val vertices = spark.createDataFrame(
      Array(
        Airport("SFO","San Francisco"),
        Airport("ORD","Chicago"),
        Airport("DFW","Dallas Fort Worth")
      )
    )

// 创建边DataFrame
val edges = spark.createDataFrame(
      Array(
        Flight("SFO_ORD_2017-01-01_AA","SFO","ORD",1800, 40),
        Flight("ORD_DFW_2017-01-01_UA","ORD","DFW",800, 0),
        Flight("DFW_SFO_2017-01-01_DL","DFW","SFO",1400, 10)
      )
    )

// 定义图
val graph = GraphFrame(vertices, edges)

分别保存顶点和边,代码如下:

// 用于保存和加载图模型
graph.vertices.write.mode("overwrite").parquet("tmp/gf/vertices")
graph.edges.write.mode("overwrite").parquet("tmp/gf/edges")

然后可以从持久化存储中重新创建顶点和边DataFrame,并重新创建图,代码如下:

val v = spark.read.parquet("tmp/gf/vertices")
val e = spark.read.parquet("tmp/gf/edges")
    
// 然后创建图
val g = GraphFrame(v, e)
g.triplets.show(false)

执行上面的代码,输出内容如下:

+------------------------+-----------------------------------------------+------------------------+
|src                     |edge                                           |dst                     |
+------------------------+-----------------------------------------------+------------------------+
|[SFO, San Francisco]    |[SFO_ORD_2017-01-01_AA, SFO, ORD, 1800.0, 40.0]|[ORD, Chicago]          |
|[ORD, Chicago]          |[ORD_DFW_2017-01-01_UA, ORD, DFW, 800.0, 0.0]  |[DFW, Dallas Fort Worth]|
|[DFW, Dallas Fort Worth]|[DFW_SFO_2017-01-01_DL, DFW, SFO, 1400.0, 10.0]|[SFO, San Francisco]    |
+------------------------+-----------------------------------------------+------------------------+