发布日期:2022-10-26 VIP内容

使用存储过程维护表

要在Spark中使用Iceberg,首先需要配置Spark catalog。只有在Spark 3.x中使用Iceberg SQL扩展时,存储过程才可用。

1.用法

可以通过CALL从任何配置的Iceberg catalog中使用存储过程。所有存储过程都位于命名空间system中。CALL支持通过名称或位置传递参数,推荐通过名称传递参数。不支持混合位置参数和命名参数。

(1) 命名参数。

所有过程参数都被命名。当通过名称传递参数时,参数可以按任意顺序传递,任何可选参数都可以省略,代码如下:

CALL catalog_name.system.procedure_name(arg_name_2 => arg_2, arg_name_1 => arg_1)

(2) 位置参数。

当按位置传递参数时,如果结尾参数是可选的,则只能省略它们,代码如下:

CALL catalog_name.system.procedure_name(arg_1, arg_2, ... arg_n)

2.快照管理

(1) rollback_to_snapshot()。

将表回滚到指定的快照ID。

要回滚到特定的时间,使用rollback_to_timestamp()方法。此过程将使所有引用受影响表的缓存Spark计划失效。

例如,将表db.sample回滚到快照ID 1,执行语句如下:

CALL catalog_name.system.rollback_to_snapshot('db.sample', 1)

(2) rollback_to_timestamp()。

将表回滚到某个时间点的当前快照。此过程将使所有引用受影响表的缓存Spark计划失效。例如,将db.sample回滚到一天前,执行语句如下:

CALL catalog_name.system.rollback_to_timestamp('db.sample', date_sub(current_date(), 1))

(3) set_current_snapshot()。

设置表的当前快照ID。与rollback不同,快照不需要是当前表状态的祖先。此过程将使所有引用受影响表的缓存Spark计划失效。

例如,为db.sample设置当前快照为1,代码如下:

CALL catalog_name.system.set_current_snapshot('db.sample', 1)

(4) cherrypick_snapshot()。

cherry-pick从快照更改到当前表状态。在不修改或删除原始快照的情况下,cherry-picking从现有快照创建一个新的快照。

只有append和动态覆盖快照可以被cherry-picked。此过程将使所有引用受影响表的缓存Spark计划失效。例如,cherry-pick快照1,代码如下:

CALL catalog_name.system.cherrypick_snapshot('my_table', 1)

使用命名参数cherry-pick快照1,代码如下:

CALL catalog_name.system.cherrypick_snapshot(snapshot_id => 1, table => 'my_table' )

3.元数据管理

可以使用Iceberg存储过程执行许多维护操作。

(1) expire_snapshots()

在Iceberg中,每次写/更新/删除/upsert/压缩都会生成一个新快照,同时保留旧数据和元数据,以便快照隔离和时间旅行。expire_snapshots()过程可以用来删除不再需要的旧快照及其文件。

此过程将删除仅旧快照所需要的旧快照和数据文件。这意味着expire_snapshots()将不会删除未过期快照所需要的文件。

例如,删除超过10天的快照,但保留最近的100个快照,代码如下:

CALL hive_prod.system.expire_snapshots('db.sample', date_sub(current_date(), 10), 100)

删除所有比当前时间戳老的快照,但保留最后5个快照,代码如下:

CALL hive_prod.system.expire_snapshots(table => 'db.sample', older_than => now(), retain_last => 5)

(2) remove_orphan_files()

用于删除Iceberg表的任何元数据文件中没有引用的文件,因此可以认为是“孤立的”文件。

例如,通过在这个表上执行remove_orphan_files命令,列出所有可能需要删除的文件,而不是实际删除它们,代码如下:

CALL catalog_name.system.remove_orphan_files(table => 'db.sample', dry_run => true)

删除表db.sample所不知道的tablelocation/data文件夹中的任何文件,代码如下:

CALL catalog_name.system.remove_orphan_files(table => 'db.sample', location => 'tablelocation/data')

(3) rewrite_manifests()

重写表清单(manifest)以优化扫描计划。

清单中的数据文件是按照分区规范中的字段进行排序的。此过程使用Spark作业并行运行。此过程将使所有引用受影响表的缓存Spark计划失效。

例如,重写表db.sample中的清单文件并将其与表分区对齐,代码如下:

CALL catalog_name.system.rewrite_manifests('db.sample')

重写表db.sample中的清单,并禁用Spark缓存的使用。这样做可以避免执行器的内存问题,代码如下:

CALL catalog_name.system.rewrite_manifests('db.sample', false)

4.表迁移

snapshot()和migrate()过程有助于测试和迁移现有的Hive或Spark表到Iceberg。

(1) snapshot()

在不更改源表的情况下,创建用于测试的表的轻量级临时副本。

可以对新创建的表进行更改或写入,而不会影响源表,但是快照使用原始表的数据文件。当在快照上运行插入或覆盖时,新文件被放置在快照表的位置,而不是原始表的位置。

测试完快照表后,运行DROP TABLE清理快照表。

例如,在catalog的default位置为db.snap创建一个孤立的Iceberg表,它引用表db.sample,命名为db.snap,代码如下:

CALL catalog_name.system.snapshot('db.sample', 'db.snap')

迁移一个引用表db.sample的孤立的Iceberg表db.snap,在一个手动指定的位置/tmp/temptable/,代码如下:

CALL catalog_name.system.snapshot('db.sample', 'db.snap', '/tmp/temptable/')

(2) migrate()

用装载源数据文件的Iceberg表替换表。表模式、分区、属性和位置将从源表复制。

如果任何表分区使用不支持的格式,迁移将失败。支持的格式有Avro、Parquet和ORC。现有数据文件被添加到Iceberg表的元数据中,可以使用从原始表模式创建的name-to-id映射来读取。

为了在测试时保持原始表不变,可以使用snapshot创建共享源数据文件和模式的新临时表。例如,将Spark的default catalog中的db.sample表迁移到一个Iceberg表,并添加一个属性foo,设置为bar。代码如下:

CALL catalog_name.system.migrate('spark_catalog.db.sample', map('foo', 'bar'))

将当前catalog中的db.sample迁移到一个Iceberg表,而不添加任何其他属性。代码如下:

CALL catalog_name.system.migrate('db.sample')