合并更新(upsert)
这一节,将解释如何更新表列,以及如何使用merge命令执行upsert操作。
Delta Lake存储使用parquet文件,而parquet文件是不可变的,不支持更新。Delta Lake提供了merge语句来提供类似于更新的接口,但实际上,这些并不是真正的更新,不会改变底层文件。Delta Lake只是在重写整个Parquet文件。这将使大型数据集上的upsert或update列语句非常慢。
1.merge示例
下面通过一个示例来理解和掌握使用merge命令的方法。
首先准备一个样本events_data.csv,其内容如下:
eventType,websitePage click,homepage clck,about page mouseOver,logo
读取该数据文件,并写出到Delta Lake中,代码如下:
// 数据源文件
val path = "/data/data_lake/delta_lake/event_data.csv"
// delta路径(HDFS)
val outputPath = "/data_lake/delta_lake/events/"
spark
.read
.option("header", "true")
.option("charset", "UTF8")
.csv(path)
.repartition(1)
.write
.format("delta")
.save(outputPath)
看一下_delta_log/ 0000000000000000000000.json事务日志文件中存储了什么内容。执行以下hdfs shell命令:
$ hdfs dfs -cat /data_lake/delta_lake/events/_delta_log/00000000000000000000.json
可以看到事务日志文件中有如下一行json文本形式的事务日志(这里为了便于阅读,进行了格式化展示):
...
{
"add":{
"path":"part-00000-f960ca7c-eff0-40d0-b753-1f99ea4ffb9f-c000.snappy.parquet",
"partitionValues":{},
"size":673,
"modificationTime":1569079218000,
"dataChange":true
}
}
下面查看一下数据湖的内容,代码如下:
// 查看delta lake的内容
spark.read.format("delta").load(outputPath).show()
df.show()
执行以上代码,得到的输出结果如下:
+---------+-----------+ |eventType|websitePage| +---------+-----------+ | click| homepage| | clck| about page| |mouseOver| logo| +---------+-----------+
注意,第二行数据在eventType字段中有一个输入错误。它说的是“clck”而不是“click”。那么怎么来修改这个错误呢?修改错误的代码如下:
val deltaTable = DeltaTable.forPath(spark, outputPath)
deltaTable.updateExpr(
"eventType = 'clck'",
Map("eventType" -> "'click'")
)
或者使用另一种形式,代码如下:
deltaTable.update(
col("eventType") === "clck",
Map("eventType" -> lit("click"))
)
执行上面的代码,然后可以再次检查Delta Lake的内容,确认拼写错误是否已经修正。代码如下:
spark.read.format("delta").load(outputPath).show()
执行以上代码,得到如下的输出结果:
+---------+-----------+ |eventType|websitePage| +---------+-----------+ | click| homepage| | click| about page| |mouseOver| logo| +---------+-----------+
可以看到,错误的拼写已经得到修正。
查看此时文件系统中的所有文件,命令如下:
$ hdfs dfs -ls -R /data_lake/delta_lake/events/
输出内容目录如下图所示。
如前所述,parquet文件是不可变的,那么Delta是怎样做到修复拼写错误呢? 查看一下delta中的日志文件_delta_log/ 00000000000000000001.json,发现其内容如下:
{
"remove":{
"path":"part-00000-267cec2a-a90f-432b-98fd-0c67eee4a666-c000.snappy.parquet",
"deletionTimestamp":1569079467662,
"dataChange":true
}
}
{
"add":{
"path":"part-00000-7493fe0e-1469-46f6-aac9-6ca37be2139e-c000.snappy.parquet",
"partitionValues":{},
"size":694,
"modificationTime":1569079467000,
"dataChange":true
}
}
可以看出,合并命令是将一个新文件写入了文件系统,对旧文件执行了remove操作。可以分别检查一下delta中两个parquet文件的内容。先查看日期最近的parquet文件,代码如下:
val path = "/data_lake/delta_lake/events/part-00000-7493fe0e-1469-46f6-aac9-6ca37be2139e-c000.snappy.parquet" spark.read.parquet(path).show()
执行上面的代码,输出内容如下:
+---------+-----------+ |eventType|websitePage| +---------+-----------+ | click| homepage| | click| about page| |mouseOver| logo| +---------+-----------+
可以看到,其中是修改过后的数据。再看日期较早的parquet文件,代码如下:
val path = "/data_lake/delta_lake/events/part-00000-267cec2a-a90f-432b-98fd-0c67eee4a666-c000.snappy.parquet" spark.read.parquet(path).show()
执行上面的代码,输出内容如下:
+---------+-----------+ |eventType|websitePage| +---------+-----------+ | click| homepage| | clck| about page| |mouseOver| logo| +---------+-----------+
因此,合并命令将所有数据写入一个全新的文件中。但是,这样方式写出所有数据将使merge运行要慢得多。
2.upsert示例
首先准备一个原始数据集original_data.csv,建立另一个小Delta Lake。
original_data.csv文件内容如下:
date,eventId,data 2019-01-01,4,take nap 2019-02-05,8,play smash brothers 2019-04-24,9,speak at spark summit
将上面的数据构建为Delta Lake,代码如下:
val path = "/data/data_lake/delta_lake/original_data.csv"
// delta lake path (hdfs)
val outputPath = "/data_lake/delta_lake/upsert_event/"
spark
.read
.option("header", "true")
.option("charset", "UTF8")
.csv(path)
.repartition(1)
.write
.format("delta")
.save(outputPath)
看看这个Delta Lake的初始状态,代码如下:
spark.read.format("delta").load(outputPath).show(false)
执行上面这行代码,得到的输出内容如下:
+----------+-------+---------------------+ |date |eventId|data | +----------+-------+---------------------+ |2019-01-01|4 |take nap | |2019-02-05|8 |play smash brothers | |2019-04-24|9 |speak at spark summit| +----------+-------+---------------------+
接下来用另一种更“友好”的表达来更新Delta Lake。首先准备这个新的数据friendly_data.csv。
friendly_data.csv文件内容如下:
date,eventId,data 2019-01-01,4,set goals 2019-02-05,8,bond with nephew 2019-08-10,66,think about my mommy
在上面的新数据集中,第4和第8个事件的描述进行了修改。而第66号事件是新增加的。下面执行upsert操作,代码如下:
val updatesPath = "/data/data_lake/delta_lake/friendly_data.csv"
// 新数据(包含修改的数据和新增的数据)
val updatesDF = spark.read
.option("header", "true")
.option("charset", "UTF8")
.csv(updatesPath)
import io.delta.tables._
DeltaTable.forPath(spark, outputPath) // outputPath是原始的delta lake path
.as("events")
.merge(updatesDF.as("updates"), "events.eventId = updates.eventId")
.whenMatched.updateExpr(Map("data" -> "updates.data"))
.whenNotMatched.insertExpr(Map(
"date" -> "updates.date",
"eventId" -> "updates.eventId",
"data" -> "updates.data"
)
)
.execute()
执行上面的代码,然后查看一下upsert之后Delta Lake的内容,代码如下:
spark.read.format("delta").load(outputPath).show(false)
执行上面这行代码,输出内容如下:
+----------+-------+---------------------+ |date |eventId|data | +----------+-------+---------------------+ |2019-08-10|66 |think about my mommy | |2019-04-24|9 |speak at spark summit| |2019-02-05|8 |bond with nephew | |2019-01-01|4 |set goals | +----------+-------+---------------------+
从结果可以看出,将新数据合并到了旧数据中。其中4和8的data值被修改了,而66是新增加的事件,事件9没有变化。
3.upsert的事务日志
在_delta_log /00000000000000000000.json文件中包含单个项,用于添加的单个Parquet文件。查看该文件,内容如下:
{
"add":{
"path":"part-00000-7eaa0d54-4dba-456a-ab80-b17f9aa7b583-c000.snappy.parquet",
"partitionValues":{},
"size":900,
"modificationTime":1569177685000,
"dataChange":true
}
}
由_delta_log /00000000000000000001.json文件显示,upserts在事务日志中添加了大量记录。查看该文件,内容如下:
{
"remove":{
"path":"part-00000-7eaa0d54-4dba-456a-ab80-b17f9aa7b583-c000.snappy.parquet",
"deletionTimestamp":1569177701037,
"dataChange":true
}
}
{
"add":{
"path":"part-00000-36aafda3-530d-4bd7-a29b-9c1716f18389-c000.snappy.parquet",
"partitionValues":{},
"size":433,
"modificationTime":1569177698000,
"dataChange":true
}
}
{
"add":{
"path":"part-00026-fcb37eb4-165f-4402-beb3-82d3d56bfe0c-c000.snappy.parquet",
"partitionValues":{},
"size":968,
"modificationTime":1569177700000,
"dataChange":true
}
}
{
"add":{
"path":"part-00139-eab3854f-4ed4-4856-8268-c89f0efe977c-c000.snappy.parquet",
"partitionValues":{},
"size":1013,
"modificationTime":1569177700000,
"dataChange":true
}
}
{
"add":{
"path":"part-00166-0e9cddc8-9104-4c11-8b7f-44a6441a95fb-c000.snappy.parquet",
"partitionValues":{},
"size":905,
"modificationTime":1569177700000,
"dataChange":true
}
}
{
"add":{
"path":"part-00178-147c78fa-dad2-4a1c-a4c5-65a1a647a41e-c000.snappy.parquet",
"partitionValues":{},
"size":1013,
"modificationTime":1569177701000,
"dataChange":true
}
}
下面创建一个小辅助方法,以便轻松检查这些Parquet文件的内容,代码如下:
def displayEventParquetFile(filename: String): Unit = {
val path = "/data_lake/delta_lake/upsert_event/$filename.snappy.parquet"
val df = spark.read.parquet(path)
df.show(false)
}
然后依次查看各个parquet文件的内容。
查看第一个parquet文件内容,代码如下:
displayEventParquetFile("part-00000-36aafda3-530d-4bd7-a29b-9c1716f18389-c000")
得到内容如下:
+----+-------+----+ |date|eventId|data| +----+-------+----+ +----+-------+----+
查看第二个parquet文件内容,代码如下:
displayEventParquetFile("part-00026-fcb37eb4-165f-4402-beb3-82d3d56bfe0c-c000")
得到内容如下:
+----------+-------+----------------+ |date |eventId|data | +----------+-------+----------------+ |2019-02-05|8 |bond with nephew| +----------+-------+----------------+
查看第三个parquet文件内容,代码如下:
displayEventParquetFile("part-00139-eab3854f-4ed4-4856-8268-c89f0efe977c-c000")
得到内容如下:
+----------+-------+---------------------+ |date |eventId|data | +----------+-------+---------------------+ |2019-04-24|9 |speak at spark summit| +----------+-------+---------------------+
查看第四个parquet文件内容,代码如下:
displayEventParquetFile("part-00166-0e9cddc8-9104-4c11-8b7f-44a6441a95fb-c000")
得到内容如下:
+----------+-------+---------+ |date |eventId|data | +----------+-------+---------+ |2019-01-01|4 |set goals| +----------+-------+---------+
查看第五个parquet文件内容,代码如下:
displayEventParquetFile("part-00178-147c78fa-dad2-4a1c-a4c5-65a1a647a41e-c000")
得到内容如下:
+----------+-------+--------------------+ |date |eventId|data | +----------+-------+--------------------+ |2019-08-10|66 |think about my mommy| +----------+-------+--------------------+
这个更新代码创建了数量惊人的parquet文件。