文件移除
Delta Lake进行了版本控制,因此可以轻松地恢复或访问到数据的旧版本。因此,在某些情况下,Delta lake需要存储多个版本的数据才能实现回滚功能。
存储相同数据的多个版本可能成本很高,因此Delta Lake包含了一个vacuum命令,可以删除数据的旧版本(任何比指定的保留期限早的数据)。下面解释如何使用vacuum命令及其适用场景。
假设要使用下面的people.csv文件来构造一个Delta Lake表,文件内容如下:
first_name,last_name,country miguel,cordoba,colombia luisa,gomez,colombia li,li,china wang,wei,china hans,meyer,germany mia,schmidt,germany
假设将该数据文件存储到HDFS的/data/data_lake/delta_lake/目录下。
首先加载people.csv文件内容到DataFrame中,代码如下:
val path = "/data/data_lake/delta_lake/people.csv"
val df = spark
.read
.option("header", "true")
.option("charset", "UTF8")
.csv(path)
df.printSchema()
df.show()
执行以上代码,输出内容如下:
root |-- first_name: string (nullable = true) |-- last_name: string (nullable = true) |-- country: string (nullable = true) +----------+---------+--------+ |first_name|last_name| country| +----------+---------+--------+ | miguel| cordoba|colombia| | luisa| gomez| colombia| | li| li| china| | wang| wei| china| | hans| meyer| germany| | mia| schmidt| germany| +----------+---------+--------+
然后将上面的DataFrame写入到一个Delta Lake表中,代码如下:
val outputPath = "/data_lake/delta_lake/vacuum_example"
df
.repartition(1)
.write
.format("delta")
.save(outputPath)
查看该Delta Lake的内容,代码如下:
val path = "/data_lake/delta_lake/vacuum_example/"
val df = spark.read.format("delta").load(path)
df.show()
执行以上代码,输出结果如下:
+----------+---------+--------+ |first_name|last_name| country| +----------+---------+--------+ | miguel| cordoba|colombia| | luisa| gomez|colombia| | li| li| china| | wang| wei| china| | hans| meyer| germany| | mia| schmidt| germany| +----------+---------+--------+
下面用另一个new_people.csv文件覆盖Delta Lake中的数据。new_people.csv文件内容如下:
first_name,last_name,country li,li,china aa,li,china zhang,zhang,china
覆盖Delta Lake表的代码如下:
val path = "/data/data_lake/delta_lake/new_people.csv"
val outputPath = "/data_lake/delta_lake/vacuum_example/"
spark
.read
.option("header", "true")
.option("charset", "UTF8")
.csv(path)
.repartition(1)
.write
.format("delta")
.mode("overwrite")
.save(outputPath)
然后查看一下覆盖后的数据,代码如下:
spark.read.format("delta").load(outputPath).show()
执行以上代码,输出覆盖后的数据,内容如下:
+----------+---------+-------+ |first_name|last_name|country| +----------+---------+-------+ | li| li| china| | aa| li| china| | zhang| zhang| china| +----------+---------+-------+
在终端窗口中,执行命令来查看当前的文件系统,命令如下:
$ hdfs dfs -ls - /data_lake/delta_lake/vacuum_example
下图是第二次写入后HDFS上显示的文件系统的目录结构和内容:
可以看到,虽然第一次写入的数据不再读取到DataFrame中,但它仍然存储在文件系统中。所以可以回滚到数据的旧版本。
例如,要显示版本0时的Delta Lake内容,代码如下:
val df = spark.read
.format("delta")
.option("versionAsOf", 0)
.load(outputPath)
df.show()
执行以上代码,输出内容如下:
+----------+---------+--------+ |first_name|last_name| country| +----------+---------+--------+ | miguel| cordoba|colombia| | luisa| gomez|colombia| | li| li| china| | wang| wei| china| | hans| meyer| germany| | mia| schmidt| germany| +----------+---------+--------+
运行vacuum命令并验证文件系统中是否删除了文件,代码如下:
import io.delta.tables._ val deltaTable = DeltaTable.forPath(spark, outputPath) deltaTable.vacuum(0.000001)
将保留时间设置为0.000001小时,这样就可以立即运行这个vacuum命令。
注意: 当设置保留的时间小于168 hours时,需要设置spark.databricks.delta.retentionDurationCheck.enabled = false。
运行vacuum()命令后,在终端窗口中,执行命令来查看当前的文件系统,命令如下:
$ hdfs dfs -ls -R /data_lake/delta_lake/vacuum_example
这时文件系统的结构和内容如下图所示:
可以看到,有一个parquet数据文件(即旧版本的数据文件)已经被删除掉了。可以通过查看00000000000000000001.json这个文件,以了解Delta如何知道要删除哪些文件。查看文件的命令如下:
$ hdfs dfs -cat /data_lake/delta_lake/vacuum_example/_delta_log/00000000000000000001.json
可以得到00000000000000000001.json的内容如下(下面对显示格式进行了美化,以方便查看):
{
"commitInfo":{
"timestamp":1646118315485,
"operation":"WRITE",
"operationParameters":{
"mode":"Overwrite",
"partitionBy":"[]"
},
"readVersion":0,
"isBlindAppend":false,
"operationMetrics":{
"numFiles":"1",
"numOutputBytes":"925",
"numOutputRows":"3"
}
}
}
{
"add":{
"path":"part-00000-36e8dca8-1378-4de5-97f4-4d4304de8dcb-c000.snappy.parquet",
"partitionValues":{},
"size":925,
"modificationTime":1646118314217,
"dataChange":true
}
}
{
"remove":{
"path":"part-00000-9d6a938e-f928-41a6-a129-482c5e36fe48-c000.snappy.parquet",
"deletionTimestamp":1646118315484,
"dataChange":true,
"extendedFileMetadata":true,
"partitionValues":{},
"size":973
}
}
上面JSON文件的remove部分表明part-00000-9d6a938e--....fe48-c000.snappy.parquet可以在执行vacuum()命令时删除。在执行vacuum()命令后,就无法访问Delta Lake的0版本了。如果强行访问versionAsOf为0的版本,则会抛出异常。例如,读取前面outputPath指定的路径下的文件,用versionAsOf指定0版本,代码如下:
val df = spark.read
.format("delta")
.option("versionAsOf", 0)
.load(outputPath)
df.show()
这时候访问0版本,会抛出以下错误信息:
... java.io.FileNotFoundException: File does not exist: hdfs://xueai8:8020/data_lake/delta_lake/vacuum_example/part-00000-9d6a938e-f928-41a6-a129-482c5e36fe48-c000.snappy.parquet It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved. ...
对于旧数据,保存期系统默认为7天。因此执行deltable .vacuum()不会马上生效,除非等待7天(168个小时)才能运行该命令。需要设置特殊的命令来调用保留时间小于7天的vacuum()方法,否则会抛出错误信息。需要修改Spark配置,以允许小于168小时的保留时间。可以静态设置也可以动态设置spark.databricks.delta.retentionDurationCheck.enabled配置参数的值为false,代码如下:
val spark: SparkSession = {
SparkSession
.builder()
.master("local")
.appName("spark session")
.config("spark.databricks.delta.retentionDurationCheck.enabled", "false")
.getOrCreate()
}
如果是使用Zeppelin Notebook,则可以在Spark解释器中添加配置如下:
spark.databricks.delta.retentionDurationCheck.enabled false
请注意,只有事务日志中有remove项时,执行vacuum()方法才会执行删除。如果事务日志(json文件中)就没有remove项,那么即使调用了vacuum()方法,也什么都不会做。
在下面这个示例中,仅添加数据到Delta Lake,这样的话,运行vacumm()命令将什么也不做。
dogs1.csv文件内容如下:
first_name,breed fido,lab spot,bulldog dogs2.csv: first_name,breed fido,beagle lou,pug
将dog1.csv和dogs2.csv写入Delta Lake文件,代码如下:
val df = spark
.read
.option("header", "true")
.option("charset", "UTF8")
.csv("/dog_data/dogs1.csv")
df
.repartition(1)
.write
.format("delta")
.save("/data_lake/delta_lake/vacuum_example2")
val df2 = spark
.read
.option("header", "true")
.option("charset", "UTF8")
.csv("/dog_data/dogs2.csv")
df2
.repartition(1)
.write
.format("delta")
.mode(SaveMode.Append)
.save("/data_lake/delta_lake/vacuum_example2")
在这两个文件被写入后,Delta Lake所包含的内容如下:
+----------+-------+ |first_name| breed| +----------+-------+ | fido| lab| | spot|bulldog| | fido| beagle| | lou| pug| +----------+-------+
查看对应的文件系统,目录结构如下:
vacuum_example2/
_delta_log/
00000000000000000000.json
00000000000000000001.json
part-00000-57db2297-9aaf-44b6-b940-48c504c510d1-c000.snappy.parquet
part-00000-6574b35c-677b-4423-95ae-993638f222cf-c000.snappy.parquet
其中0000000000000000.json文件的内容如下:
{
"add":{
"path":"part-00000-57db2297-9aaf-44b6-b940-48c504c510d1-c000.snappy.parquet",
"partitionValues":{},
"size":606,
"modificationTime":1568685380000,
"dataChange":true
}
}
下面是0000000000000001.json文件的内容:
{
"add":{
"path":"part-00000-6574b35c-677b-4423-95ae-993638f222cf-c000.snappy.parquet",
"partitionValues":{},
"size":600,
"modificationTime":1568685386000,
"dataChange":true
}
}
所有JSON文件都不包含任何remove行,因此vacuum()方法不会删除任何文件。所以当执行下面这段代码后没有改变任何东西:
import io.delta.tables._ val path = "/delta_lake_data/vacuum_example2" val deltaTable = DeltaTable.forPath(spark, path) deltaTable.vacuum(0.000001)
由上面的内容可知,如果想节省数据存储成本,可以使用vacuum()从Delta Lake中删除文件。
在运行overwirte操作后,经常会有重复的文件。在_delta_log/下的JSON文件中,任何比指定的保留时间更早且被标记为remove的文件都将在执行vacuum()方法时被删除。