维护流表
流查询可以快速创建新的表版本,这将创建大量的表元数据来跟踪这些版本。强烈建议通过调优提交速率、旧快照过期和自动清理元数据文件来维护元数据。
(1) 调优提交速率。
高提交率会产生大量的数据文件、清单和快照,这会导致表难以维护。鼓励至少1分钟的触发间隔,并在需要时增加间隔。代码如下:
import org.apache.spark.sql.streaming.Trigger
// 默认触发器 (尽可能快地运行微批处理)
df.writeStream
.format("console")
.start()
// ProcessingTime trigger,使用处理时间,指定两秒钟的微批间隔
df.writeStream
.format("console")
.trigger(Trigger.ProcessingTime("2 seconds"))
.start()
// 一次性触发器
df.writeStream
.format("console")
.trigger(Trigger.Once())
.start()
(2) 过期旧快照。
每个写入表的微批都会生成一个新的快照,在表元数据中跟踪该快照直到过期,从而删除不再需要的元数据和任何数据文件。快照随着频繁提交而迅速累积,因此强烈建议定期维护由流查询写的表。
维护操作需要Table实例,使用Java API。例如,将把时间超过1天的快照设为过期,代码如下:
Table table = ...
long tsToExpire = System.currentTimeMillis() - (1000*60*60*24); // 1天
table.expireSnapshots()
.expireOlderThan(tsToExpire)
.commit();
还有一个Spark Action可以并行运行大表的表过期,模板代码如下:
Table table = ...
SparkActions
.get()
.expireSnapshots(table)
.expireOlderThan(tsToExpire)
.execute();
过期的旧快照会从元数据中删除它们,因此它们不可以再用于时间旅行查询。
(3) 清理元数据文件。
Iceberg使用JSON文件跟踪表元数据。对表的每次更改都会生成一个新的元数据文件,以提供原子性。默认情况下,旧的元数据文件将作为历史记录保存。频繁提交的表,比如由流作业编写的表,可能需要定期清理元数据文件。
如果需要自动清理元数据文件,请在表属性中设置write.metadata.delete-after-commit.enabled=true。这将保留一些元数据文件(最高可达write.metadata.previous-versions-max),并在每个新创建的元数据文件后删除最旧的元数据文件。这些属性包括:
- 属性write.metadata.delete-after-commit.enabled:每次表提交后是否删除旧的元数据文件。赋值为true表示自动清理元数据文件。
- 属性write.metadata.previous-versions-max:要保存的旧元数据文件的数量。
(4) 压缩数据文件。
Iceberg在一个表中跟踪每个数据文件。数据文件越多会导致在清单文件中存储的元数据越多,而较小的数据文件导致不必要的元数据量和文件打开成本的低效率查询。
微批处理中写入的数据量通常很小,这可能导致表元数据跟踪大量小文件。将小文件压缩为大文件可以减少表所需的元数据,并提高查询效率。Iceberg可以使用Spark和rewriteDataFiles()操作并行压缩数据文件。这将把小文件组合成更大的文件,以减少元数据开销和运行时文件打开成本。模板代码如下:
Table table = ...
SparkActions
.get()
.rewriteDataFiles(table)
.filter(Expressions.equal("date", "2020-08-18"))
.option("target-file-size-bytes", Long.toString(500 * 1024 * 1024)) // 500 MB
.execute();
可以通过files元数据表来检查数据文件大小和确定何时压缩分区。
(5) 重写清单。
Iceberg在其清单列表和清单文件中使用元数据来加快查询规划,并删除不必要的数据文件。元数据树的作用是作为表数据的索引。
元数据树中的清单将按照添加顺序自动压缩,这使得在写入模式与读取过滤器对齐时查询速度更快。例如,在数据到达时写入按小时分区的数据是与时间范围查询过滤器对齐的。
当表的写模式与查询模式不一致时,可以重写元数据,使用rewriteManifests()或rewriteManfests()动作(用于使用Spark进行并行重写)将数据文件重新组合到清单中。
为了优化流工作负载上的写延迟,Iceberg可以使用不自动压缩清单的“fast”append来写新快照。这可能导致大量的小清单文件。可以重写清单以优化查询和压缩。Spark Action使用rewriteManifestsAction()进行并行重写元数据,以将数据文件重新组合到清单中。
下面重写了小清单文件,并根据第一个分区字段对数据文件进行分组,代码如下:
Table table = ...
SparkActions
.get()
.rewriteManifests(table)
.rewriteIf(file -> file.length() < 10 * 1024 * 1024) // 10 MB
.execute();