发布日期:2022-10-24 VIP内容

数据覆盖

INSERT OVERWRITE可以用查询的结果替换表中的数据。OVERWRITE是Iceberg表的原子操作。要被INSERT OVERWRITE替换的分区取决于Spark的分区覆盖模式和表的分区。建议使用MERGE INTO而不是INSERT OVERWRITE,因为Iceberg可以仅重写受影响的数据文件,并且具有更容易理解的行为,而且如果表的分区发生变化,被动态重写覆盖的数据可能会发生变化。

Spark的默认覆盖模式是静态的(static),但是在写Iceberg表时建议使用动态覆盖模式。静态覆盖模式通过将PARTITION子句转换为过滤器来确定要覆盖表中的哪些分区,但是PARTITION子句只能引用表的列。

动态覆盖模式是通过设置spark.sql.sources.partitionOverwriteMode=dynamic来配置的。

为了演示动态和静态覆盖的行为,考虑一个由DDL定义的logs表,代码如下:

CREATE TABLE hadoop_prod.my_app.logs (
    uuid string NOT NULL,
    level string NOT NULL,
    ts timestamp NOT NULL,
    message string
)
USING iceberg
PARTITIONED BY (level, hours(ts))

1.动态覆盖

当Spark的覆盖模式是动态的时,包含SELECT查询产生的行的分区将被替换。

例如,查询从示例的logs表中删除重复的日志事件,代码如下:

INSERT OVERWRITE hadoop_prod.my_app.logs
SELECT uuid, first(level), first(ts), first(message)
FROM prod.my_app.logs
WHERE cast(ts as date) = '2020-07-01'
GROUP BY uuid

在dynamic模式下,这将用SELECT结果中的行替换任何分区。因为所有行的日期都限制在7月1日,所以只替换当天的hours。

2.静态覆盖

当Spark的覆盖模式为静态时,PARTITION子句被转换为用于从表中删除的过滤器。如果省略了PARTITION子句,所有的分区都将被替换。

由于在上面的查询中没有PARTITION子句,当以静态模式运行时,它将删除表中所有现有的行,但只写入7月1日的日志。

要覆盖已加载的分区,添加一个与SELECT查询过滤器对齐的PARTITION子句,代码如下:

INSERT OVERWRITE hadoop_prod.my_app.logs
PARTITION (level = 'INFO')
SELECT uuid, first(level), first(ts), first(message)
FROM prod.my_app.logs
WHERE level = 'INFO'
GROUP BY uuid

注意,这种模式不能像动态示例查询那样替换小时分区,因为PARTITION子句只能引用表中的列,而不能引用隐藏分区。