发布日期:2022-10-19 VIP内容

合并更新(upsert)

这一节,将解释如何更新表列,以及如何使用merge命令执行upsert操作。

Delta Lake存储使用parquet文件,而parquet文件是不可变的,不支持更新。Delta Lake提供了merge语句来提供类似于更新的接口,但实际上,这些并不是真正的更新,不会改变底层文件。Delta Lake只是在重写整个Parquet文件。这将使大型数据集上的upsert或update列语句非常慢。

1.merge示例

下面通过一个示例来理解和掌握使用merge命令的方法。

首先准备一个样本events_data.csv,其内容如下:

eventType,websitePage
click,homepage
clck,about page
mouseOver,logo

读取该数据文件,并写出到Delta Lake中,代码如下:

// 数据源文件
val path = "/data/data_lake/delta_lake/event_data.csv"

// delta路径(HDFS)
val outputPath = "/data_lake/delta_lake/events/"

spark
  .read
  .option("header", "true")
  .option("charset", "UTF8")
  .csv(path)
  .repartition(1)
  .write
  .format("delta")
  .save(outputPath) 

看一下_delta_log/ 0000000000000000000000.json事务日志文件中存储了什么内容。执行以下hdfs shell命令:

$ hdfs dfs -cat /data_lake/delta_lake/events/_delta_log/00000000000000000000.json

可以看到事务日志文件中有如下一行json文本形式的事务日志(这里为了便于阅读,进行了格式化展示):

...
{
  "add":{
    "path":"part-00000-f960ca7c-eff0-40d0-b753-1f99ea4ffb9f-c000.snappy.parquet",
    "partitionValues":{},
    "size":673,
    "modificationTime":1569079218000,
    "dataChange":true
  }
}

下面查看一下数据湖的内容,代码如下:

// 查看delta lake的内容
spark.read.format("delta").load(outputPath).show()
df.show()

执行以上代码,得到的输出结果如下:

+---------+-----------+
|eventType|websitePage|
+---------+-----------+
|     click|   homepage|
|      clck| about page|
|mouseOver|        logo|
+---------+-----------+

注意,第二行数据在eventType字段中有一个输入错误。它说的是“clck”而不是“click”。那么怎么来修改这个错误呢?修改错误的代码如下:

val deltaTable = DeltaTable.forPath(spark, outputPath)

deltaTable.updateExpr(
  "eventType = 'clck'",
  Map("eventType" -> "'click'")
)

或者使用另一种形式,代码如下:

deltaTable.update(
  col("eventType") === "clck",
  Map("eventType" -> lit("click"))
)

执行上面的代码,然后可以再次检查Delta Lake的内容,确认拼写错误是否已经修正。代码如下:

spark.read.format("delta").load(outputPath).show()

执行以上代码,得到如下的输出结果:

+---------+-----------+
|eventType|websitePage|
+---------+-----------+
|     click|   homepage|
|     click| about page|
|mouseOver|        logo|
+---------+-----------+

可以看到,错误的拼写已经得到修正。

查看此时文件系统中的所有文件,命令如下:

$ hdfs dfs -ls -R /data_lake/delta_lake/events/ 

输出内容目录如下图所示。

如前所述,parquet文件是不可变的,那么Delta是怎样做到修复拼写错误呢? 查看一下delta中的日志文件_delta_log/ 00000000000000000001.json,发现其内容如下:

{
  "remove":{
"path":"part-00000-267cec2a-a90f-432b-98fd-0c67eee4a666-c000.snappy.parquet",
    "deletionTimestamp":1569079467662,
    "dataChange":true
  }
}
{
  "add":{
"path":"part-00000-7493fe0e-1469-46f6-aac9-6ca37be2139e-c000.snappy.parquet",
    "partitionValues":{},
    "size":694,
    "modificationTime":1569079467000,
    "dataChange":true
  }
}

可以看出,合并命令是将一个新文件写入了文件系统,对旧文件执行了remove操作。可以分别检查一下delta中两个parquet文件的内容。先查看日期最近的parquet文件,代码如下:

val path = "/data_lake/delta_lake/events/part-00000-7493fe0e-1469-46f6-aac9-6ca37be2139e-c000.snappy.parquet"
spark.read.parquet(path).show()

执行上面的代码,输出内容如下:

+---------+-----------+
|eventType|websitePage|
+---------+-----------+
|     click|   homepage|
|     click| about page|
|mouseOver|        logo|
+---------+-----------+

可以看到,其中是修改过后的数据。再看日期较早的parquet文件,代码如下:

val path = "/data_lake/delta_lake/events/part-00000-267cec2a-a90f-432b-98fd-0c67eee4a666-c000.snappy.parquet"
spark.read.parquet(path).show()

执行上面的代码,输出内容如下:

+---------+-----------+
|eventType|websitePage|
+---------+-----------+
|     click|   homepage|
|      clck| about page|
|mouseOver|        logo|
+---------+-----------+

因此,合并命令将所有数据写入一个全新的文件中。但是,这样方式写出所有数据将使merge运行要慢得多。

2.upsert示例

首先准备一个原始数据集original_data.csv,建立另一个小Delta Lake。

original_data.csv文件内容如下:

date,eventId,data
2019-01-01,4,take nap
2019-02-05,8,play smash brothers
2019-04-24,9,speak at spark summit

将上面的数据构建为Delta Lake,代码如下:

val path = "/data/data_lake/delta_lake/original_data.csv"

// delta lake path (hdfs)
val outputPath = "/data_lake/delta_lake/upsert_event/"

spark
  .read
  .option("header", "true")
  .option("charset", "UTF8")
  .csv(path)
  .repartition(1)
  .write
  .format("delta")
  .save(outputPath)

看看这个Delta Lake的初始状态,代码如下:

spark.read.format("delta").load(outputPath).show(false) 

执行上面这行代码,得到的输出内容如下:

+----------+-------+---------------------+
|date       |eventId|data                    |
+----------+-------+---------------------+
|2019-01-01|4       |take nap               |
|2019-02-05|8       |play smash brothers  |
|2019-04-24|9       |speak at spark summit|
+----------+-------+---------------------+

接下来用另一种更“友好”的表达来更新Delta Lake。首先准备这个新的数据friendly_data.csv。

friendly_data.csv文件内容如下:

date,eventId,data
2019-01-01,4,set goals
2019-02-05,8,bond with nephew
2019-08-10,66,think about my mommy

在上面的新数据集中,第4和第8个事件的描述进行了修改。而第66号事件是新增加的。下面执行upsert操作,代码如下:

val updatesPath = "/data/data_lake/delta_lake/friendly_data.csv"

// 新数据(包含修改的数据和新增的数据)
val updatesDF = spark.read
	.option("header", "true")
	.option("charset", "UTF8")
	.csv(updatesPath)

import io.delta.tables._

DeltaTable.forPath(spark, outputPath) // outputPath是原始的delta lake path
  .as("events")
  .merge(updatesDF.as("updates"), "events.eventId = updates.eventId")
  .whenMatched.updateExpr(Map("data" -> "updates.data"))
  .whenNotMatched.insertExpr(Map(
        "date" -> "updates.date",
        "eventId" -> "updates.eventId",
        "data" -> "updates.data"
      )
   )
  .execute()

执行上面的代码,然后查看一下upsert之后Delta Lake的内容,代码如下:

spark.read.format("delta").load(outputPath).show(false)

执行上面这行代码,输出内容如下:

+----------+-------+---------------------+
|date       |eventId|data                    |
+----------+-------+---------------------+
|2019-08-10|66      |think about my mommy |
|2019-04-24|9       |speak at spark summit|
|2019-02-05|8       |bond with nephew      |
|2019-01-01|4       |set goals              |
+----------+-------+---------------------+

从结果可以看出,将新数据合并到了旧数据中。其中4和8的data值被修改了,而66是新增加的事件,事件9没有变化。

3.upsert的事务日志

在_delta_log /00000000000000000000.json文件中包含单个项,用于添加的单个Parquet文件。查看该文件,内容如下:

{
  "add":{
    "path":"part-00000-7eaa0d54-4dba-456a-ab80-b17f9aa7b583-c000.snappy.parquet",
    "partitionValues":{},
    "size":900,
    "modificationTime":1569177685000,
    "dataChange":true
  }
}

由_delta_log /00000000000000000001.json文件显示,upserts在事务日志中添加了大量记录。查看该文件,内容如下:

{
  "remove":{
    "path":"part-00000-7eaa0d54-4dba-456a-ab80-b17f9aa7b583-c000.snappy.parquet",
    "deletionTimestamp":1569177701037,
    "dataChange":true
  }
}
{
  "add":{
    "path":"part-00000-36aafda3-530d-4bd7-a29b-9c1716f18389-c000.snappy.parquet",
    "partitionValues":{},
    "size":433,
    "modificationTime":1569177698000,
    "dataChange":true
  }
}
{
  "add":{
    "path":"part-00026-fcb37eb4-165f-4402-beb3-82d3d56bfe0c-c000.snappy.parquet",
    "partitionValues":{},
    "size":968,
    "modificationTime":1569177700000,
    "dataChange":true
  }
}
{
  "add":{
    "path":"part-00139-eab3854f-4ed4-4856-8268-c89f0efe977c-c000.snappy.parquet",
    "partitionValues":{},
    "size":1013,
    "modificationTime":1569177700000,
    "dataChange":true
  }
}
{
  "add":{
    "path":"part-00166-0e9cddc8-9104-4c11-8b7f-44a6441a95fb-c000.snappy.parquet",
    "partitionValues":{},
    "size":905,
    "modificationTime":1569177700000,
    "dataChange":true
  }
}
{
  "add":{
    "path":"part-00178-147c78fa-dad2-4a1c-a4c5-65a1a647a41e-c000.snappy.parquet",
    "partitionValues":{},
    "size":1013,
    "modificationTime":1569177701000,
    "dataChange":true
  }
}

下面创建一个小辅助方法,以便轻松检查这些Parquet文件的内容,代码如下:

def displayEventParquetFile(filename: String): Unit = {
  val path = "/data_lake/delta_lake/upsert_event/$filename.snappy.parquet"
  val df = spark.read.parquet(path)
  df.show(false)
}

然后依次查看各个parquet文件的内容。

查看第一个parquet文件内容,代码如下:

displayEventParquetFile("part-00000-36aafda3-530d-4bd7-a29b-9c1716f18389-c000")

得到内容如下:

+----+-------+----+
|date|eventId|data|
+----+-------+----+
+----+-------+----+

查看第二个parquet文件内容,代码如下:

displayEventParquetFile("part-00026-fcb37eb4-165f-4402-beb3-82d3d56bfe0c-c000")

得到内容如下:

+----------+-------+----------------+
|date       |eventId|data              |
+----------+-------+----------------+
|2019-02-05|8       |bond with nephew|
+----------+-------+----------------+

查看第三个parquet文件内容,代码如下:

displayEventParquetFile("part-00139-eab3854f-4ed4-4856-8268-c89f0efe977c-c000")

得到内容如下:

+----------+-------+---------------------+
|date       |eventId|data                    |
+----------+-------+---------------------+
|2019-04-24|9       |speak at spark summit|
+----------+-------+---------------------+

查看第四个parquet文件内容,代码如下:

displayEventParquetFile("part-00166-0e9cddc8-9104-4c11-8b7f-44a6441a95fb-c000")

得到内容如下:

+----------+-------+---------+
|date       |eventId|data      |
+----------+-------+---------+
|2019-01-01|4       |set goals|
+----------+-------+---------+

查看第五个parquet文件内容,代码如下:

displayEventParquetFile("part-00178-147c78fa-dad2-4a1c-a4c5-65a1a647a41e-c000")

得到内容如下:

+----------+-------+--------------------+
|date       |eventId|data                   |
+----------+-------+--------------------+
|2019-08-10|66      |think about my mommy|
+----------+-------+--------------------+

这个更新代码创建了数量惊人的parquet文件。