使用存储过程维护表
要在Spark中使用Iceberg,首先需要配置Spark catalog。只有在Spark 3.x中使用Iceberg SQL扩展时,存储过程才可用。
1.用法
可以通过CALL从任何配置的Iceberg catalog中使用存储过程。所有存储过程都位于命名空间system中。CALL支持通过名称或位置传递参数,推荐通过名称传递参数。不支持混合位置参数和命名参数。
(1) 命名参数。
所有过程参数都被命名。当通过名称传递参数时,参数可以按任意顺序传递,任何可选参数都可以省略,代码如下:
CALL catalog_name.system.procedure_name(arg_name_2 => arg_2, arg_name_1 => arg_1)
(2) 位置参数。
当按位置传递参数时,如果结尾参数是可选的,则只能省略它们,代码如下:
CALL catalog_name.system.procedure_name(arg_1, arg_2, ... arg_n)
2.快照管理
(1) rollback_to_snapshot()。
将表回滚到指定的快照ID。
要回滚到特定的时间,使用rollback_to_timestamp()方法。此过程将使所有引用受影响表的缓存Spark计划失效。
例如,将表db.sample回滚到快照ID 1,执行语句如下:
CALL catalog_name.system.rollback_to_snapshot('db.sample', 1)
(2) rollback_to_timestamp()。
将表回滚到某个时间点的当前快照。此过程将使所有引用受影响表的缓存Spark计划失效。例如,将db.sample回滚到一天前,执行语句如下:
CALL catalog_name.system.rollback_to_timestamp('db.sample', date_sub(current_date(), 1))
(3) set_current_snapshot()。
设置表的当前快照ID。与rollback不同,快照不需要是当前表状态的祖先。此过程将使所有引用受影响表的缓存Spark计划失效。
例如,为db.sample设置当前快照为1,代码如下:
CALL catalog_name.system.set_current_snapshot('db.sample', 1)
(4) cherrypick_snapshot()。
cherry-pick从快照更改到当前表状态。在不修改或删除原始快照的情况下,cherry-picking从现有快照创建一个新的快照。
只有append和动态覆盖快照可以被cherry-picked。此过程将使所有引用受影响表的缓存Spark计划失效。例如,cherry-pick快照1,代码如下:
CALL catalog_name.system.cherrypick_snapshot('my_table', 1)
使用命名参数cherry-pick快照1,代码如下:
CALL catalog_name.system.cherrypick_snapshot(snapshot_id => 1, table => 'my_table' )
3.元数据管理
可以使用Iceberg存储过程执行许多维护操作。
(1) expire_snapshots()
在Iceberg中,每次写/更新/删除/upsert/压缩都会生成一个新快照,同时保留旧数据和元数据,以便快照隔离和时间旅行。expire_snapshots()过程可以用来删除不再需要的旧快照及其文件。
此过程将删除仅旧快照所需要的旧快照和数据文件。这意味着expire_snapshots()将不会删除未过期快照所需要的文件。
例如,删除超过10天的快照,但保留最近的100个快照,代码如下:
CALL hive_prod.system.expire_snapshots('db.sample', date_sub(current_date(), 10), 100)
删除所有比当前时间戳老的快照,但保留最后5个快照,代码如下:
CALL hive_prod.system.expire_snapshots(table => 'db.sample', older_than => now(), retain_last => 5)
(2) remove_orphan_files()
用于删除Iceberg表的任何元数据文件中没有引用的文件,因此可以认为是“孤立的”文件。
例如,通过在这个表上执行remove_orphan_files命令,列出所有可能需要删除的文件,而不是实际删除它们,代码如下:
CALL catalog_name.system.remove_orphan_files(table => 'db.sample', dry_run => true)
删除表db.sample所不知道的tablelocation/data文件夹中的任何文件,代码如下:
CALL catalog_name.system.remove_orphan_files(table => 'db.sample', location => 'tablelocation/data')
(3) rewrite_manifests()
重写表清单(manifest)以优化扫描计划。
清单中的数据文件是按照分区规范中的字段进行排序的。此过程使用Spark作业并行运行。此过程将使所有引用受影响表的缓存Spark计划失效。
例如,重写表db.sample中的清单文件并将其与表分区对齐,代码如下:
CALL catalog_name.system.rewrite_manifests('db.sample')
重写表db.sample中的清单,并禁用Spark缓存的使用。这样做可以避免执行器的内存问题,代码如下:
CALL catalog_name.system.rewrite_manifests('db.sample', false)
4.表迁移
snapshot()和migrate()过程有助于测试和迁移现有的Hive或Spark表到Iceberg。
(1) snapshot()
在不更改源表的情况下,创建用于测试的表的轻量级临时副本。
可以对新创建的表进行更改或写入,而不会影响源表,但是快照使用原始表的数据文件。当在快照上运行插入或覆盖时,新文件被放置在快照表的位置,而不是原始表的位置。
测试完快照表后,运行DROP TABLE清理快照表。
例如,在catalog的default位置为db.snap创建一个孤立的Iceberg表,它引用表db.sample,命名为db.snap,代码如下:
CALL catalog_name.system.snapshot('db.sample', 'db.snap')
迁移一个引用表db.sample的孤立的Iceberg表db.snap,在一个手动指定的位置/tmp/temptable/,代码如下:
CALL catalog_name.system.snapshot('db.sample', 'db.snap', '/tmp/temptable/')
(2) migrate()
用装载源数据文件的Iceberg表替换表。表模式、分区、属性和位置将从源表复制。
如果任何表分区使用不支持的格式,迁移将失败。支持的格式有Avro、Parquet和ORC。现有数据文件被添加到Iceberg表的元数据中,可以使用从原始表模式创建的name-to-id映射来读取。
为了在测试时保持原始表不变,可以使用snapshot创建共享源数据文件和模式的新临时表。例如,将Spark的default catalog中的db.sample表迁移到一个Iceberg表,并添加一个属性foo,设置为bar。代码如下:
CALL catalog_name.system.migrate('spark_catalog.db.sample', map('foo', 'bar'))
将当前catalog中的db.sample迁移到一个Iceberg表,而不添加任何其他属性。代码如下:
CALL catalog_name.system.migrate('db.sample')