增量更新与时间旅行
Delta Lake支持增量更新。Delta Lake使用事务日志存储数据湖元数据,通过事务日志,用户可以在给定的时间点进行时间旅行和数据探索。下面通过一个示例来演示Delta Lake中的时间旅行特性。
首先,从下面这个CSV文件创建一个Delta Lake。people.csv文件内容如下:
first_name,last_name,country miguel,cordoba,colombia luisa,gomez,colombia li,li,china wang,wei,china hans,meyer,germany mia,schmidt,germany
接下来,使用Spark SQL API读取这个CSV文件到一个DataFrame中,然后将该DataFrame作为一个Delta数据湖写出。代码如下:
val path = "/data/data_lake/delta_lake/people.csv"
val outputPath = "/data_lake/delta_lake/person_delta_lake"
spark
.read
.option("header", "true")
.option("charset", "UTF8")
.csv(path)
.repartition(1)
.write
.format("delta")
.mode(SaveMode.Overwrite)
.save(outputPath)
执行上面的代码后,再执行以下HDFS shell命令查看,命令如下:
$ hdfs dfs -ls -R /data_lake/delta_lake/person_delta_lake
得到/data_lake/delta_lake/person_delta_lake目录内容,如下图所示。
数据以parquet格式保存在文件中,元数据以_delta_log/00000000000000000000.json格式保存在文件中。JSON文件包含关于写事务、数据模式以及添加了什么文件的信息。打开这个JSON文件,其内容如下:(为了便于阅读,这里对一行JSON进行了格式化展示)
{
"commitInfo":{
"timestamp":1565119301357,
"operation":"WRITE",
"operationParameters":{
"mode":"Overwrite",
"partitionBy":"[]"
}
}
}
{
"protocol":{
"minReaderVersion":1,
"minWriterVersion":2
}
}
{
"metaData":{
"id":"a3ca108e-3ba1-49dc-99a0-c9d29c8f1aec",
"format":{
"provider":"parquet",
"options":{}
},
"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"first_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"last_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"country\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}",
"partitionColumns":[],
"configuration":{},
"createdTime":1565119298882
}
}
{
"add":{
"path":"part-00000-78f9c583-ea60-4962-af99-895f453dce23-c000.snappy.parquet",
"partitionValues":{},
"size":939,
"modificationTime":1565119299000,
"dataChange":true
}
}
1.增量更新
下面使用一些纽约市出租车数据来构建并增量更新一个Delta数据湖。首先构建一个初始的Delta数据湖,代码如下:
import org.apache.spark.sql.SaveMode
val outputPath = "/data_lake/delta_lake/incremental_delta_lake/"
val p1 = "/data/data_lake/delta_lake/taxi_data/taxi1.csv"
// 将文件读取到DataFrame中,然后写出到delta lake中
spark
.read
.option("header", "true")
.option("charset", "UTF8")
.csv(p1)
.repartition(1)
.write
.format("delta")
.mode(SaveMode.Overwrite)
.save(outputPath)
这段代码会创建一个parquet文件和一个_delta_log/000000000000000000.json文件。查看文件系统结构,命令如下:
$ hdfs dfs -ls -R /data_lake/delta_lake/incremental_delta_lake/
可以看到这时的文件目录,如下图所示。
接下来让检查Delta数据湖的内容,代码如下:
spark
.read
.format("delta")
.load(outputPath)
.select("passenger_count", "fare_amount")
.show()
执行以上代码,输出结果如下:
+---------------+-----------+ |passenger_count|fare_amount| +---------------+-----------+ | 2| 83| | 1| 14.5| | 1| 8| | 1| 6| | 1| 6| +---------------+-----------+
可以看到,在第一次加载后该Delta Lake包含5行数据。
接下来,将另一个文件加载到同一个Delta Lake中,使用SaveMode.Append模式。代码如下:
val p2 = "/data/data_lake/delta_lake/taxi_data/taxi2.csv"
// 将数据读取到DataFrame中,再写入(覆盖)Delta Lake
spark
.read
.option("header", "true")
.option("charset", "UTF8")
.csv(p2)
.repartition(1)
.write
.format("delta")
.mode(SaveMode.Append)
.save(outputPath)
这段代码创建了一个parquet文件和一个_delta_log/ 0000000000000001.json文件。文件系统目录incremental_data_lake现在包含的文件如下图所示。
再一次加载该Delta Lake。在加载文件后,现在Delta Lake包含10行数据,代码如下:
spark
.read
.format("delta")
.load(outputPath)
.select("passenger_count", "fare_amount")
.show()
执行以上代码,得到的输出内容如下:
+---------------+-----------+ |passenger_count|fare_amount| +---------------+-----------+ | 2| 83| | 1| 14.5| | 1| 8| | 1| 6| | 1| 6| | 1| 7| | 1| 8| | 1| 5.5| | 1| 52| | 6| 7.5| +---------------+-----------+
2.时间旅行
Delta提供了进行时间旅行的支持。下面编写一个查询来检查第一次数据加载之后(第二次数据写入之前)的Delta数据湖,代码如下:
spark
.read
.format("delta")
.option("versionAsOf", 0) // 指定要查询的版本
.load(outputPath)
.select("passenger_count", "fare_amount")
.show()
执行以上代码,得到的输出内容如下:
+---------------+-----------+ |passenger_count|fare_amount| +---------------+-----------+ | 2| 83| | 1| 14.5| | 1| 8| | 1| 6| | 1| 6| +---------------+-----------+
在上面的代码中,使用了option("versionAsOf",0)选项配置,用来告诉Delta只抓取_delta_log/00000000000000000000.json中的文件,而忽略_delta_log/00000000000000000001. json中的文件。
假设用户正在数据湖上训练一个机器学习模型,并希望在实验时保持数据不变。Delta Lake的时间旅行特性让用户在训练模型时很容易使用单一版本的数据。
也可以轻松访问Delta Lake事务日志的完整历史,代码如下:
import io.delta.tables._ val lakePath = outputPath val deltaTable = DeltaTable.forPath(spark, lakePath) val fullHistoryDF = deltaTable.history() fullHistoryDF.show(false)
执行以上代码,得到的输出内容如下图所示。
为了更清楚地看明白版本的变化,只选取version、timestamp和operationParameters三个字段,代码如下:
fullHistoryDF.select("version","timestamp","operationParameters").show(false)
执行以上代码,得到的输出内容如下:
+-------+-----------------------+--------------------------------------+
|version|timestamp |operationParameters |
+-------+-----------------------+--------------------------------------+
|1 |2022-03-03 14:58:24.588|{mode -> Append, partitionBy -> []} |
|0 |2022-03-03 14:52:32.843|{mode -> Overwrite, partitionBy -> []}|
+-------+-----------------------+--------------------------------------+
查看Delta history历史表的模式,代码如下:
fullHistoryDF.printSchema()
得到的输出内容如下:
root |-- version: long (nullable = true) |-- timestamp: timestamp (nullable = true) |-- userId: string (nullable = true) |-- userName: string (nullable = true) |-- operation: string (nullable = true) |-- operationParameters: map (nullable = true) | |-- key: string | |-- value: string (valueContainsNull = true) |-- job: struct (nullable = true) | |-- jobId: string (nullable = true) | |-- jobName: string (nullable = true) | |-- runId: string (nullable = true) | |-- jobOwnerId: string (nullable = true) | |-- triggerType: string (nullable = true) |-- notebook: struct (nullable = true) | |-- notebookId: string (nullable = true) |-- clusterId: string (nullable = true) |-- readVersion: long (nullable = true) |-- isolationLevel: string (nullable = true) |-- isBlindAppend: boolean (nullable = true)
因此,除了可以通过版本号获取Delta表特定版本,还可以通过时间戳进行时间旅行。代码如下:
spark
.read
.format("delta")
.option("timestampAsOf", "2022-03-03 14:53:00") // 请修改为自己的版本时间
.load(lakePath)
.select("passenger_count", "fare_amount")
.show()
这与抓取的Delta表的第一个版本(即version等于0)是一样的,内容如下:
+---------------+-----------+ |passenger_count|fare_amount| +---------------+-----------+ | 2| 83| | 1| 14.5| | 1| 8| | 1| 6| | 1| 6| +---------------+-----------+