发布日期:2022-10-19 VIP内容

压缩小文件

数据湖可以积累很多小文件,特别是当执行增量更新时。文件小,读取速度就会变慢。可以用Spark压缩Delta Lake中的小文件。通过压缩将小文件合并到大文件中是保持快速读取的一种重要的数据湖维护技术。

下面创建一个包含1000个文件的Delta数据湖,然后将文件夹压缩为只包含10个文件。首先构造一个DataFrame,将其重分区为1000个分区,这样在存入Delta Lake时就会生成1000个小的存储文件,代码如下:

// 先构造一个DataFrame
val df = spark.range(0, 10000).toDF("number")

// 创建具有1000个文件的Delta Lake
df.repartition(1000)
  .write
  .format("delta")
  .save("/data_lake/delta_lake/compact")

查看_delta_log下相应的日志文件00000000000000000000.json,命令如下:

$ hdfs dfs -cat /data_lake/delta_lake/compact/_delta_log/00000000000000000000.json

会看到其中包含有1000行类似下面这样的json行(为了便于阅读,这里格式化显示其中一行),内容如下:

{
    "add":{
        "path":"part-00000-05e5bda3-f20f-4452-b10a-c675f7557b1d-c000.snappy.parquet",
        "partitionValues":{},
        "size":525,
        "modificationTime":1622099342864,
        "dataChange":true
    }
}

从Delta Lake中把这些数据读取出来,然后重分区为10个分区,再使用overwrite模式写入相同的delta表,这样将把数据压缩到只包含10个文件,代码如下:

val data = spark.read.format("delta").load("/delta_data_lake/compact")

data.repartition(10)
    .write
    .format("delta")
    .mode("overwrite")
    .save("/data_lake/delta_lake/compact")

现在/data_lake/delta_lake/compact目录下包含1010个文件,其中1000个原始未压缩的文件以及10个压缩过的文件。

这时再查看/data_lake/delta_lake/compact/_delta_log/00000000000000000001.json,会看到其中包含有10行类似下面这样的json行:

{
    "add":{
        "path":"part-00000-9f8f50a7-b31b-49f9-bdfc-6e6072002c2e-c000.snappy.parquet",
        "partitionValues":{},
        "size":4526,
        "modificationTime":1622100154614,
        "dataChange":true
    }
}

另外还包含有1000行类似下面这样的json行:

{
    "remove":{
        "path":"part-00097-47fcbc63-44e5-42f3-924d-47eefad90a0f-c000.snappy.parquet",
        "deletionTimestamp":1622100157546,
        "dataChange":true
    }
}

该JSON文件中的remove部分表明当运行vacuum()命令时,part-00097-......-c000.snappy.parquet文件会被删除。

Delta Lake包括一个删除旧版本数据的vacuum()命令。可以运行vacuum()命令删除旧的数据文件,这样就不必存储未压缩的数据。代码如下:

import io.delta.tables._
import org.apache.spark.sql.functions._

val deltaTable = DeltaTable.forPath("/data_lake/delta_lake/compact")
deltaTable.vacuum(0.000001)

这时再查看/data_lake/delta_lake/compact目录,可以看到只剩下10个parquet文件了,如下图所示。

当设置保留的时间小于168 hours时,需要如下设置:

spark.databricks.delta.retentionDurationCheck.enabled = false

这里将保留时间设置为0.000001小时,这样就可以立即运行这个vacuum()命令。在执行vacuum()命令后,就无法访问Delta Lake的0版本了,否则会出现错误。例如,访问Delta Lake的0版本,代码如下:

val df = spark.read
	.format("delta")
	.option("versionAsOf", 0)
	.load("/data_lake/delta_lake/compact")

df.show()

运行上面的代码,会出现错误信息,内容如下:

...: java.io.FileNotFoundException: File does not exist: hdfs://cda:8020/delta_data_lake/compact/part-00766-e857aad6-839d-4042-b3ed-7c81d7ef72bb-c000.snappy.parquet
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
...

最后,还有两个小问题需要理解。

1)关于dataChange=false

Delta Lake事务协议能够将事务日志中的项标记为dataChange=false,表示它们只重新安排已经是表的一部分的数据。这非常强大,因为它允许执行压缩和其他读性能优化,而不会破坏使用Delta表作为流源的能力。用户应该在用于重写的DataFrame写入器中使用选项配置它。

Delta Lake目前在数据压缩时设置dataChange=true,这对下游流消费者来说是一个重大改变。当文件被压缩时,用户可以选择设置dataChange=false,这时Delta Lake将被更新,这样压缩对于下游流客户来说不是一个中断操作。

2)合并分区的Delta Lake

假设数据存储在/some/path/data文件夹中,并按year字段进行分区。进一步假设2019这个目录中包含5个文件,现在希望将其压缩为一个文件。代码如下:

val table = "/some/path/data"
val partition = "year = '2019'"
val numFiles = 1
spark.read
  .format("delta")
  .load(table)
  .where(partition)
  .repartition(numFiles)
  .write
  .format("delta")
  .mode("overwrite")
  .option("replaceWhere", partition)
  .save(table)