发布日期:2022-10-19 VIP内容

增量更新与时间旅行

Delta Lake支持增量更新。Delta Lake使用事务日志存储数据湖元数据,通过事务日志,用户可以在给定的时间点进行时间旅行和数据探索。下面通过一个示例来演示Delta Lake中的时间旅行特性。

首先,从下面这个CSV文件创建一个Delta Lake。people.csv文件内容如下:

first_name,last_name,country
miguel,cordoba,colombia
luisa,gomez,colombia
li,li,china
wang,wei,china
hans,meyer,germany
mia,schmidt,germany

接下来,使用Spark SQL API读取这个CSV文件到一个DataFrame中,然后将该DataFrame作为一个Delta数据湖写出。代码如下:

val path = "/data/data_lake/delta_lake/people.csv"
val outputPath = "/data_lake/delta_lake/person_delta_lake"
spark
  .read
  .option("header", "true")
  .option("charset", "UTF8")
  .csv(path)
  .repartition(1)
  .write
  .format("delta")
  .mode(SaveMode.Overwrite)
  .save(outputPath)

执行上面的代码后,再执行以下HDFS shell命令查看,命令如下:

$ hdfs dfs -ls -R /data_lake/delta_lake/person_delta_lake

得到/data_lake/delta_lake/person_delta_lake目录内容,如下图所示。

数据以parquet格式保存在文件中,元数据以_delta_log/00000000000000000000.json格式保存在文件中。JSON文件包含关于写事务、数据模式以及添加了什么文件的信息。打开这个JSON文件,其内容如下:(为了便于阅读,这里对一行JSON进行了格式化展示)

{
  "commitInfo":{
    "timestamp":1565119301357,
    "operation":"WRITE",
    "operationParameters":{
      "mode":"Overwrite",
      "partitionBy":"[]"
    }
  }
}
{
  "protocol":{
    "minReaderVersion":1,
    "minWriterVersion":2
  }
}
{
  "metaData":{
    "id":"a3ca108e-3ba1-49dc-99a0-c9d29c8f1aec",
    "format":{
      "provider":"parquet",
      "options":{}
},
"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"first_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"last_name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"country\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}",
    "partitionColumns":[],
    "configuration":{},
    "createdTime":1565119298882
  }
}
{
  "add":{
"path":"part-00000-78f9c583-ea60-4962-af99-895f453dce23-c000.snappy.parquet",
    "partitionValues":{},
    "size":939,
    "modificationTime":1565119299000,
    "dataChange":true
  }
}

1.增量更新

下面使用一些纽约市出租车数据来构建并增量更新一个Delta数据湖。首先构建一个初始的Delta数据湖,代码如下:

import org.apache.spark.sql.SaveMode

val outputPath = "/data_lake/delta_lake/incremental_delta_lake/"
val p1 = "/data/data_lake/delta_lake/taxi_data/taxi1.csv"

// 将文件读取到DataFrame中,然后写出到delta lake中
spark
  .read
  .option("header", "true")
  .option("charset", "UTF8")
  .csv(p1)
  .repartition(1)
  .write
  .format("delta")
  .mode(SaveMode.Overwrite)
  .save(outputPath)

这段代码会创建一个parquet文件和一个_delta_log/000000000000000000.json文件。查看文件系统结构,命令如下:

$ hdfs dfs -ls -R /data_lake/delta_lake/incremental_delta_lake/

可以看到这时的文件目录,如下图所示。

接下来让检查Delta数据湖的内容,代码如下:

spark
  .read
  .format("delta")
  .load(outputPath)
  .select("passenger_count", "fare_amount")
  .show()

执行以上代码,输出结果如下:

+---------------+-----------+
|passenger_count|fare_amount|
+---------------+-----------+
|                 2|           83|
|                 1|        14.5|
|                 1|            8|
|                 1|            6|
|                 1|            6|
+---------------+-----------+

可以看到,在第一次加载后该Delta Lake包含5行数据。

接下来,将另一个文件加载到同一个Delta Lake中,使用SaveMode.Append模式。代码如下:

val p2 = "/data/data_lake/delta_lake/taxi_data/taxi2.csv"

// 将数据读取到DataFrame中,再写入(覆盖)Delta Lake
spark
  .read
  .option("header", "true")
  .option("charset", "UTF8")
  .csv(p2)
  .repartition(1)
  .write
  .format("delta")
  .mode(SaveMode.Append)
  .save(outputPath) 

这段代码创建了一个parquet文件和一个_delta_log/ 0000000000000001.json文件。文件系统目录incremental_data_lake现在包含的文件如下图所示。

再一次加载该Delta Lake。在加载文件后,现在Delta Lake包含10行数据,代码如下:

spark
  .read
  .format("delta")
  .load(outputPath)
  .select("passenger_count", "fare_amount")
  .show()

执行以上代码,得到的输出内容如下:

+---------------+-----------+
|passenger_count|fare_amount|
+---------------+-----------+
|                 2|          83|
|                 1|        14.5|
|                 1|           8|
|                 1|           6|
|                 1|           6|
|                 1|           7|
|                 1|           8|
|                 1|         5.5|
|                 1|          52|
|                 6|         7.5|
+---------------+-----------+

2.时间旅行

Delta提供了进行时间旅行的支持。下面编写一个查询来检查第一次数据加载之后(第二次数据写入之前)的Delta数据湖,代码如下:

spark
  .read
  .format("delta")
  .option("versionAsOf", 0)        // 指定要查询的版本
  .load(outputPath)
  .select("passenger_count", "fare_amount")
  .show()

执行以上代码,得到的输出内容如下:

+---------------+-----------+
|passenger_count|fare_amount|
+---------------+-----------+
|                2|           83|
|                1|         14.5|
|                1|            8|
|                1|            6|
|                1|            6|
+---------------+-----------+

在上面的代码中,使用了option("versionAsOf",0)选项配置,用来告诉Delta只抓取_delta_log/00000000000000000000.json中的文件,而忽略_delta_log/00000000000000000001. json中的文件。

假设用户正在数据湖上训练一个机器学习模型,并希望在实验时保持数据不变。Delta Lake的时间旅行特性让用户在训练模型时很容易使用单一版本的数据。

也可以轻松访问Delta Lake事务日志的完整历史,代码如下:

import io.delta.tables._

val lakePath = outputPath
val deltaTable = DeltaTable.forPath(spark, lakePath)
val fullHistoryDF = deltaTable.history()
fullHistoryDF.show(false) 

执行以上代码,得到的输出内容如下图所示。

为了更清楚地看明白版本的变化,只选取version、timestamp和operationParameters三个字段,代码如下:

fullHistoryDF.select("version","timestamp","operationParameters").show(false) 

执行以上代码,得到的输出内容如下:

+-------+-----------------------+--------------------------------------+
|version|timestamp                 |operationParameters                      |
+-------+-----------------------+--------------------------------------+
|1       |2022-03-03 14:58:24.588|{mode -> Append, partitionBy -> []}   |
|0       |2022-03-03 14:52:32.843|{mode -> Overwrite, partitionBy -> []}|
+-------+-----------------------+--------------------------------------+

查看Delta history历史表的模式,代码如下:

fullHistoryDF.printSchema()

得到的输出内容如下:

root
 |-- version: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- userId: string (nullable = true)
 |-- userName: string (nullable = true)
 |-- operation: string (nullable = true)
 |-- operationParameters: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- job: struct (nullable = true)
 |    |-- jobId: string (nullable = true)
 |    |-- jobName: string (nullable = true)
 |    |-- runId: string (nullable = true)
 |    |-- jobOwnerId: string (nullable = true)
 |    |-- triggerType: string (nullable = true)
 |-- notebook: struct (nullable = true)
 |    |-- notebookId: string (nullable = true)
 |-- clusterId: string (nullable = true)
 |-- readVersion: long (nullable = true)
 |-- isolationLevel: string (nullable = true)
 |-- isBlindAppend: boolean (nullable = true)

因此,除了可以通过版本号获取Delta表特定版本,还可以通过时间戳进行时间旅行。代码如下:

spark
  .read
  .format("delta")
  .option("timestampAsOf", "2022-03-03 14:53:00") // 请修改为自己的版本时间
  .load(lakePath)
  .select("passenger_count", "fare_amount")
  .show()

这与抓取的Delta表的第一个版本(即version等于0)是一样的,内容如下:

+---------------+-----------+
|passenger_count|fare_amount|
+---------------+-----------+
|                 2|          83|
|                 1|        14.5|
|                 1|           8|
|                 1|           6|
|                 1|           6|
+---------------+-----------+