发布日期:2022-08-03 VIP内容

GraphFrames的基本使用

在本节中,我们将使用Spark GraphFrames来建立图模型并进行研究。图的顶点和边被存储为DataFrame,并且支持基于Spark SQL和DataFrame的查询对它们进行操作。由于DataFrame可以支持各种数据源,因此可以从关系表、文件(JSON、Parquet、Avro和CSV)等等读取输入顶点和边信息。

顶点DataFrame必须包含一个叫做id的列,它为每个顶点指定唯一的ID。类似地,边DataFrame必须包含名为src(源顶点的ID)和dst(目标顶点的ID)的两列。顶点DataFrame和边DataFrame都可以包含额外的属性列。

GraphFrames公开了一个简洁的语言集成API,它统一了图分析和关系查询,集成了图算法、模式匹配和查询。机器学习代码、外部数据源和UDF可以与GraphFrames集成来构建更复杂的应用程序。

1. 添加GraphFrames依赖

因为GraphFrames并不存在于Spark的发行包中,因此需要首先添加其依赖包。当前GraphFrames的最新版本是0.8.1。请注意指定兼容的Spark版本,目前直接支持的是Spark 3.0,对应的Scala版本为2.12。根据开发方式的不同,使用不同的依赖添加方式,说明如下:

(1) 如果是通过IDE进行开发(如IntelliJ IDEA),则可以在Maven项目的pom.xml配置文件中添加如下的依赖:

<dependency>
    <groupId>graphframes</groupId>
    <artifactId>graphframes</artifactId>
    <version>0.8.1-spark3.0-s_2.12</version>
</dependency>

(2) 或者SBT项目的build.sbt配置文件中添加如下依赖:

libraryDependencies += "graphframes" % "graphframes" % "0.8.1-spark3.0-s_2.12"

(3) 如果是使用spark-shell,则可以在启动spark-shell时,通过--packages选项参数指定依赖,命令如下:

$ spark-shell --master spark://localhost:7077  --packages graphframes:graphframes:0.8.1-spark3.0-s_2.12

然后spark-shell会自动下载GraphFrames jar包并缓存。

(4) 如果是使用Zeppelin Notebook进行Spark图计算开发,则最简单的方式是先下载graphframes-0.8.1-spark3.0-s_2.12.jar包,并将其拷贝到$SPARK_HOME/jars/目录下和$ZEPPELIN_HOME/lib/目录下,然后重启Spark和Zeppelin(如果已经启动了的话)。

2. 构造图模型

假设现在要分析一个社交网络,如下图所示。

首先需要导入相应的依赖包,代码如下:

import org.apache.spark.sql.SparkSession
import org.graphframes._

可以从顶点和边DataFrames创建GraphFrame图对象。顶点DataFrame应该包含一个名为id的特殊列,它为图中的每个顶点指定惟一的id。边DataFrame应该包含两个特殊列:src(边的源顶点ID)和dst(边的目标顶点ID)。

这两个DataFrame都可以有任意其他列,这些列可以表示顶点和边属性,代码如下:

// 顶点DataFrame
val v = spark.createDataFrame(List(
      ("a", "Alice", 34, 234, "Apples"),
      ("b", "Bob", 36, 23232323, "Bananas"),
      ("c", "Charlie", 30, 2123, "Grapefruit"),
      ("d", "David", 29, 2321111, "Bananas"),
      ("e", "Esther", 32, 1, "Watermelon"),
      ("f", "Fanny", 36, 333, "Apples" ),
      ("g", "Gabby", 60, 23433, "Oranges")
)).toDF("id", "name", "age", "cash", "fruit")

// 边DataFrame
val e = spark.createDataFrame(List(
      ("a", "b", "friend"),
      ("b", "c", "follow"),
      ("c", "b", "follow"),
      ("f", "c", "follow"),
      ("e", "f", "follow"),
      ("e", "d", "friend"),
      ("d", "a", "friend"),
      ("a", "e", "friend")
    )).toDF("src", "dst", "relationship")

v.show()
e.show()

// 使得这些顶点和这些边构建图模型
val g = GraphFrame(v, e)

这个示例图还可来自GraphFrames的examples包,代码如下:

import org.graphframes.examples.Graphs

val g2 = examples.Graphs.friends

3. 简单图查询

GraphFrame提供简单的图查询,如节点的入度、出度和度数。此外,由于GraphFrame将图表示为顶点DataFrame和边DataFrame,因此很容易直接对顶点和边DataFrame进行强大的查询。例如,查看图的顶点和边信息,代码如下:

// 查看顶点和边
g.vertices.show()
g.edges.show()

输出结果如下:

+---+-------+---+--------+----------+
| id|    name|age|     cash|     fruit|
+---+-------+---+--------+----------+
|  a|   Alice| 34|     234|     Apples|
|  b|     Bob| 36|23232323|   Bananas|
|  c|Charlie| 30|     2123|Grapefruit|
|  d|   David| 29| 2321111|   Bananas|
|  e| Esther| 32|         1|Watermelon|
|  f|   Fanny| 36|     333|     Apples|
|  g|   Gabby| 60|   23433|    Oranges|
+---+-------+---+--------+----------+

+---+---+------------+
|src|dst|relationship|
+---+---+------------+
|  a|   b|       friend|
|  b|   c|       follow|
|  c|   b|       follow|
|  f|   c|       follow|
|  e|   f|       follow|
|  e|   d|       friend|
|  d|   a|       friend|
|  a|   e|       friend|
+---+---+------------+

查看所有节点的入度,代码如下:

// 查看所有节点的入度
g.inDegrees.show()
g.inDegrees.sort("id").show()

输出结果如下:

+---+--------+
| id|inDegree|
+---+--------+
|  f|         1|
|  e|         1|
|  d|         1|
|  c|         2|
|  b|         2|
|  a|         1|
+---+--------+

+---+--------+
| id|inDegree|
+---+--------+
|  a|         1|
|  b|         2|
|  c|         2|
|  d|         1|
|  e|         1|
|  f|         1|
+---+--------+

查看所有节点的出度,代码如下:

// 查看所有节点的出度
g.outDegrees.show()
g.outDegrees.sort("id").show()

输出结果如下:

+---+---------+
| id|outDegree|
+---+---------+
|  f|          1|
|  e|          2|
|  d|          1|
|  c|          1|
|  b|          1|
|  a|          2|
+---+---------+

+---+---------+
| id|outDegree|
+---+---------+
|  a|          2|
|  b|          1|
|  c|          1|
|  d|          1|
|  e|          2|
|  f|          1|
+---+---------+

查看所有节点的总度数(即出入度之和),代码如下:

// 查看所有节点的总度数(即出入度之和)
g.degrees.show()
g.degrees.sort("id").show()

输出结果如下:

+---+------+
| id|degree|
+---+------+
|  f|      2|
|  e|      3|
|  d|      2|
|  c|      3|
|  b|      3|
|  a|      3|
+---+------+

+---+------+
| id|degree|
+---+------+
|  a|      3|
|  b|      3|
|  c|      3|
|  d|      2|
|  e|      3|
|  f|      2|
+---+------+

可以直接在顶点DataFrame上运行查询。例如,找到图中最年轻的人的年龄,代码如下:

// 找到图中最年轻的人的年龄
val youngest = g.vertices.groupBy().min("age")
youngest.show()

输出结果如下:

+--------+
|min(age)|
+--------+
|       29|
+--------+

找出年龄小于30岁的人,代码如下:

// 找出年龄小于30岁的人
val lt30 = g.vertices.filter("age < 30")
lt30.show()

输出结果如下:

+---+-----+---+
| id| name|age|
+---+-----+---+
|  d|David| 29|
+---+-----+---+

同样,可以在边DataFrame上运行查询。例如,统计一下图中follow关系的数量,代码如下:

// 统计一下图中“follow”关系的数量
g.edges.filter("relationship = 'follow'").count()      // 4

可以看到结果输出为4。

如果想知道c的粉丝有多少,分别是谁呢?GraphFrame对象有一个triplets属性,它是一个DataFrame,具有src、edge、dst三列,分别代表源顶点、边和目标顶点,代码如下:

g.triplets.printSchema()

输出结果如下:

root
 |-- src: struct (nullable = false)
 |    |-- id: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- age: integer (nullable = false)
 |-- edge: struct (nullable = false)
 |    |-- src: string (nullable = true)
 |    |-- dst: string (nullable = true)
 |    |-- relationship: string (nullable = true)
 |-- dst: struct (nullable = false)
 |    |-- id: string (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- age: integer (nullable = false)

查看triplets的内容,代码如下:

g.triplets.show()

输出结果如下:

+----------------+--------------+----------------+
|               src|            edge|                dst|
+----------------+--------------+----------------+
|  [a, Alice, 34]|[a, b, friend]|     [b, Bob, 36]|
|    [b, Bob, 36]|[b, c, follow]| [c, Charlie, 30]|
|[c, Charlie, 30]|[c, b, follow]|     [b, Bob, 36]|
|  [f, Fanny, 36]|[f, c, follow]| [c, Charlie, 30]|
| [e, Esther, 32]|[e, f, follow]|   [f, Fanny, 36]|
| [e, Esther, 32]|[e, d, friend]|   [d, David, 29]|
|  [d, David, 29]|[d, a, friend]|   [a, Alice, 34]|
|  [a, Alice, 34]|[a, e, friend]|  [e, Esther, 32]|
+----------------+--------------+----------------+

从中找出关系为follow的数据,代码如下:

g.triplets.filter("edge.relationship = 'follow'").show()

输出结果如下:

+----------------+--------------+----------------+
|               src|            edge|                dst|
+----------------+--------------+----------------+
|    [b, Bob, 36]|[b, c, follow]|[c, Charlie, 30]|
|[c, Charlie, 30]|[c, b, follow]|    [b, Bob, 36]|
| [e, Esther, 32]|[e, f, follow]|  [f, Fanny, 36]|
|  [f, Fanny, 36]|[f, c, follow]|[c, Charlie, 30]|
+----------------+--------------+----------------+

进一步找出与c有follow关系的数据,代码如下:

g.triplets
 .filter("edge.relationship = 'follow'")
 .filter("edge.dst = 'c'").show()

输出结果如下:

+--------------+--------------+----------------+
|             src|            edge|                dst|
+--------------+--------------+----------------+
|  [b, Bob, 36]|[b, c, follow]|[c, Charlie, 30]|
|[f, Fanny, 36]|[f, c, follow]|[c, Charlie, 30]|
+--------------+--------------+----------------+

那么,c的粉丝有多少?粉丝都有谁?代码如下:

val followers = g
	.triplets
	.filter("edge.relationship = 'follow'")
	.filter("edge.dst = 'c'")
	.count
println("c的粉丝数:" + followers)

g.triplets
 .filter("edge.relationship = 'follow'")
 .filter("edge.dst = 'c'")
 .select("src")
 .show

输出结果如下:

c的粉丝数:2
+--------------+
|             src|
+--------------+
|  [b, Bob, 36]|
|[f, Fanny, 36]|
+--------------+