发布日期:2022-08-09
VIP内容
深入理解GraphFrame
下面将简要介绍GraphFrame的内部结构及其执行计划和分区。
由于GraphFrame是基于Spark SQL DataFrame构建的,因此可以通过查看物理计划来理解图操作的执行,代码如下:
// 查看GraphFrame物理执行计划
g.edges.filter("salerank < 100").explain(true)
Spark将数据分割为多个分区,并在这些分区上并行执行计算。可以调整分区的级别,以提高Spark的计算效率。
在下面的示例中,将检查一个GraphFrame重新分区的结果。可以根据顶点DataFrame的列值对GraphFrame进行分区。在这里,使用group列中的值按组或产品类型进行分区,并将通过比较记录的前后分布来展示重新分区的结果。
首先,创建两个GraphFrame。由于在group列中有null,将它们替换为一个值unknown,代码如下:
// 理解GraphFrame中的分区
val v1 = g.vertices.select("id", "group").na.fill("unknown")
v1.show()
v1.groupBy("group").count().show
val g1 = GraphFrame(v1, g.edges)
接下来,在对原始GraphFrame进行重新分区之后创建第二个GraphFrame。在这里,使用组的数量作为初始分区数量,代码如下:
val v2 = g.vertices.select("id", "group").na.fill("unknown")
val g2t1 = GraphFrame(v2, g.edges)
val g2t2 = g2t1.vertices.repartition(11, $"group")
val g2 = GraphFrame(g2t2, g.edges)
// g2中的记录是按组聚在一起的
g1.vertices.show()
g2.vertices.show()
// g1中的默认分区数为9,g2中指定的分区数为11
g1.vertices.rdd.partitions.size
g2.vertices.rdd.partitions.size
// 也可以将分区的内容写在文件中,以查看它们的内容
g1
.vertices
.write
.mode("overwrite")
.csv("/data/spark_demo/graphx/g1/partitions")
g2
.vertices
.write
.mode("overwrite")
.csv("/data/spark_demo/graphx/g2/partitions")
如果查看保存的文件内容的话,会注意到大多数记录位于五个主要的产品组中。用户可能希望减少分区的总数,这可以使用coalesce()操作来实现,代码如下:
val g2c = g2.vertices.coalesce(5) g2c.rdd.partitions.size