在Spark中实现增量合并(upsert/merge实现)

2022-07-21 20:34:48.0

通常会将大量数据抽取到Hadoop分布式文件系统(HDFS)中进行分析。通常情况下,我们需要用新的变化定期更新这些数据。很长一段时间以来,实现这一目标的最常见方法是使用Apache Hive增量地将新的或更新的记录合并到现有数据集中。增量合并也可以使用Apache Spark执行。在这篇博客中,我将探索如何使用Spark SQL和Spark DataFrame增量更新数据,并演示三种不同的实现方法。

1. 什么是增量合并?

考虑下面的订单表orders:

order_no customer_id quantity cost order_date last_updated_date
001 u1 1 ¥15.00 2022-03-01 2022-03-01
002 u2 1 ¥30.00 2022-04-01 2022-04-01

现在,假设我们在order_updates表中收到了对订单号“002”的成本更新,如下所示:

order_no customer_id quantity cost order_date last_updated_date
002 u2 1 ¥20.00 2022-04-01 2022-04-02
003 u3 3 ¥50.00 2022-04-02 2022-04-02

注意到,其中对002号订单的成本(cost)和最后更新日期(last_updated_date)这两个字段值作了更新。而003号订单是新增加的订单。现在要求将执行增量合并,将order_updates表中的数据合并更新到orders表中,以生成一张更新的表,更新以后的内容如下:

order_no customer_id quantity cost order_date last_updated_date
001 u1 1 ¥15.00 2022-03-01 2022-03-01
002 u2 1 ¥20.00 2022-04-01 2022-04-02
003 u3 3 ¥50.00 2022-04-02 2022-04-02

其中001号订单没有改变,002号的值被修改为最新的值,003号订单作为新增的订单添加到表中。这种将新表中的数据与旧表(也叫目标表)中的数据进行合并,如果有重复的记录,则用新记录值修改旧的记录值(即update操作),如果有增加的记录,则添加进去(即insert操作),这种合并就是“增量更新”,也称为upsert或merge合并。

遗憾的是,在Apache Spark 2.x中不支持合并操作功能。但是,我们可以采用变通的方法实现同样的upsert功能。接下来就给大家讲解和演示三种实现合并操作的方法。

准备演示数据

首先,导入相关的包,并定义一个创建DataFrame的辅助方法,代码如下:

import java.sql.Date
import org.apache.spark.sql.types._
import org.apache.spark.sql._

// 定义DataFrame创建方法
def createDF(rows: Seq[Row], schema: StructType): DataFrame = {
  spark.createDataFrame(
    sc.parallelize(rows),
    schema
  )
}

创建订单表orders,代码如下:

val schema = StructType(
  List(
    StructField("order_no", StringType, true),
    StructField("customer_id", StringType, true),
    StructField("quantity", IntegerType, true),
    StructField("cost", DoubleType, true),
    StructField("order_date", DateType, true),
    StructField("last_updated_date", DateType, true)
  )
)

// 创建 orders DataFrame
val orders = Seq(
  Row("001", "u1", 1, 15.00, Date.valueOf("2020-03-01"), Date.valueOf("2020-03-01")),
  Row("002", "u2", 1, 30.00, Date.valueOf("2020-04-01"), Date.valueOf("2020-04-01"))
)
val ordersDF = createDF(orders, schema)

ordersDF.printSchema()
ordersDF.show()

执行以上代码,输出内容如下:

root
 |-- order_no: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- cost: double (nullable = true)
 |-- order_date: date (nullable = true)
 |-- last_updated_date: date (nullable = true)

+--------+-----------+--------+----+----------+-----------------+
|order_no|customer_id|quantity|cost|order_date|last_updated_date|
+--------+-----------+--------+----+----------+-----------------+
|     001|         u1|       1|15.0|2020-03-01|       2020-03-01|
|     002|         u2|       1|30.0|2020-04-01|       2020-04-01|
+--------+-----------+--------+----+----------+-----------------+

然后创建更新表创建order_updates DataFrame,代码如下:

// 创建order_updates DataFrame
val orderUpdates = Seq(
  Row("002", "u2", 1, 20.00, Date.valueOf("2020-04-01"), Date.valueOf("2020-04-02")),
  Row("003", "u3", 3, 50.00, Date.valueOf("2020-04-02"), Date.valueOf("2020-04-02"))
)
val orderUpdatesDF = createDF(orderUpdates, schema)

orderUpdatesDF.printSchema()
orderUpdatesDF.show() 

执行以上代码,输出内容如下:

root
 |-- order_no: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- cost: double (nullable = true)
 |-- order_date: date (nullable = true)
 |-- last_updated_date: date (nullable = true)

+--------+-----------+--------+----+----------+-----------------+
|order_no|customer_id|quantity|cost|order_date|last_updated_date|
+--------+-----------+--------+----+----------+-----------------+
|     002|         u2|       1|20.0|2020-04-01|       2020-04-02|
|     003|         u3|       3|50.0|2020-04-02|       2020-04-02|
+--------+-----------+--------+----+----------+-----------------+

方法一:使用SQL查询实现增量合并

第一种方法是使用SQL语句来实现增量合并。为此,首先将ordersDF和orderUpdateDF注册到临时视图中,代码如下:

// 创建临时视图产
ordersDF.createOrReplaceTempView("orders")
orderUpdatesDF.createOrReplaceTempView("order_updates")

然后执行实现增量合并的SQL语句,代码如下:

// 执行SQL语句
val orderMergedDF = spark.sql(
  """
  |SELECT unioned.*
  |FROM (
  |  SELECT * FROM orders x
  |  UNION ALL
  |  SELECT * FROM order_updates y
  |) unioned
  |JOIN
  |(
  |  SELECT
  |    order_no, customer_id,
  |    max(last_updated_date) as max_date
  |  FROM (
  |    SELECT * FROM orders
  |    UNION ALL
  |    SELECT * FROM order_updates
  |  ) t
  |  GROUP BY
  |    order_no, customer_id
  |) grouped
  |ON
  |  unioned.order_no = grouped.order_no AND
  |  unioned.customer_id = grouped.customer_id AND
  |  unioned.last_updated_date = grouped.max_date
  """.stripMargin
)

orderMergedDF.show() 

执行以上代码,输出内容如下:

+--------+-----------+--------+----+----------+-----------------+
|order_no|customer_id|quantity|cost|order_date|last_updated_date|
+--------+-----------+--------+----+----------+-----------------+
|     001|         u1|       1|15.0|2020-03-01|       2020-03-01|
|     003|         u3|       3|50.0|2020-04-02|       2020-04-02|
|     002|         u2|       1|20.0|2020-04-01|       2020-04-02|
+--------+-----------+--------+----+----------+-----------------+

从输出结果可以看出,已经实现了upsert功能。

方法二:使用DataFrame API实现增量合并

第二种方法是使用DataFrame API,根据方法一的思想,实现增量合并。代码如下:

// (1) 合并两个数据集
val unioned = ordersDF.union(orderUpdatesDF)
//unioned.show()

// (2) 执行分组,同一组的(需要update的数据)取最新日期
val grouped = unioned
  .groupBy($"order_no", $"customer_id")
  .agg(max("last_updated_date").as("last_updated_date"))

//grouped.show()

// (3) 使用给定列与另一个DataFrame进行内连接
val merged = grouped.join(unioned, Seq("order_no", "customer_id", "last_updated_date"))

merged.show() 

执行以上代码,输出结果如下:

+--------+-----------+-----------------+--------+----+----------+
|order_no|customer_id|last_updated_date|quantity|cost|order_date|
+--------+-----------+-----------------+--------+----+----------+
|     001|         u1|       2020-03-01|       1|15.0|2020-03-01|
|     003|         u3|       2020-04-02|       3|50.0|2020-04-02|
|     002|         u2|       2020-04-02|       1|20.0|2020-04-01|
+--------+-----------+-----------------+--------+----+----------+

可以看到,与方法一相似,同样实现了upsert功能。

上述方法还可以重构为更加通用的形式,代码如下:

val keys = Seq("order_no", "customer_id")  // 指定key列
val timestampCol = "last_updated_date"

// 转换为 Seq[org.apache.spark.sql.Column]
val keysColumns = keys.map(ordersDF(_))

// 合并两个数据集
val unioned = ordersDF.union(orderUpdatesDF)

// 执行分组,同一组的(需要update的数据)取最新日期
val grouped = unioned
  .groupBy(keysColumns: _*)
  .agg(max(timestampCol).as(timestampCol))

// 通过join连接,取upsert后的结果
val merged = grouped.join(unioned, keys :+ timestampCol)

merged.show()

方法三:使用DataFrame API和窗口函数实现

还可以通过窗口函数Window和union函数来模拟,步骤如下:

  • (1) 首先将旧表(目标表)和新表合并,使用unionAll()函数;
  • (2) 然后,使用窗口函数对记录进行分组,并基于分组为每一行分配一个行号(例如,_row_number);
  • (3) 最后,筛选DataFrame,只保留_row_number = 1,因为它代表一个新记录。还要删除_row_number列,因为它不再需要了。

实现代码如下:

// (1) 合并两个数据集
val unioned = ordersDF.union(orderUpdatesDF)
//unioned.show()

// (2) 为每一行分配一个行号(_row_number)。可以使用窗口函数对记录进行分组和分区。 
import org.apache.spark.sql.expressions.Window

// 定义窗口规范
val w = Window.partitionBy("order_no").orderBy($"last_updated_date".desc)

val unioned2 = unioned.withColumn("_row_number", row_number().over(w))
//unioned2.show()

// (3) 筛选DataFrame,只保留_row_number = 1,因为它代表一个新记录。还要删除_row_number列,因为它不再需要了。
val merged = unioned2.where("_row_number = 1").drop("_row_number") 
merged.orderBy("order_no").show()

执行以上代码,输出内容如下:

+--------+-----------+--------+----+----------+-----------------+
|order_no|customer_id|quantity|cost|order_date|last_updated_date|
+--------+-----------+--------+----+----------+-----------------+
|     001|         u1|       1|15.0|2020-03-01|       2020-03-01|
|     002|         u2|       1|20.0|2020-04-01|       2020-04-02|
|     003|         u3|       3|50.0|2020-04-02|       2020-04-02|
+--------+-----------+--------+----+----------+-----------------+

可以看到,与前两种方法一样,同样实现了upsert功能。


《Flink原理深入与编程实战》