发布日期:2022-10-20 VIP内容

创建和删除表

上一节使用CREATE TABLE语句创建了Iceberg表。除此之外,Iceberg也支持CTAS和RTAS方法来创建表。下面分别来了解这两种方法。

1.CTAS(create table ... as select)

在使用SparkCatalog时,Iceberg支持CTAS作为原子操作。在使用SparkSessionCatalog时CTAS虽然是受支持的,但不是原子的。

下面应用CTAS创建一个新的Iceberg表my_tb2,其内容来自my_tb1中所有id为偶数的行。代码如下:

// CTAS
spark.sql("""
    CREATE TABLE hadoop_prod.my_db.my_tb2
    USING iceberg
    AS  select * 
        from hadoop_prod.my_db.my_tb1
        where id%2==0
""")

// 查询新表的数据
spark.table("hadoop_prod.my_db.my_tb2").show()

执行上面的代码,查询结果如下:

+---+----+
| id|data|
+---+----+
|  2|    b|
|  4|    d|
+---+----+

2.RTAS(replace table ... as select)

当使用SparkCatalog时,Iceberg支持RTAS作为原子操作。在使用SparkSessionCatalog时RTAS虽然是受支持的,但不是原子的。原子表替换使用SELECT查询的结果创建一个新的快照,但保留表历史。

下面应用RTAS使用新的内容来替换my_tb2表中原有的内容,代码如下:

// RTAS
spark.sql("""
    REPLACE TABLE hadoop_prod.my_db.my_tb2
    USING iceberg
    AS select * 
       from hadoop_prod.my_db.my_tb1
       where id%2==1
""")

// 查询结果
spark.table("hadoop_prod.my_db.my_tb2").show()

执行上面的代码,查询结果如下:

+---+----+
| id|data|
+---+----+
|  5|    e|
|  3|    c|
+---+----+

下面的代码应用RTAS来创建(如果目标表不存在)或替换(如果目标表存在)my_tb3表中原有的内容,其内容来自my_tb1中所有id为奇数的行,代码如下:

// create or replace table ... as select
spark.sql("""
    CREATE OR REPLACE TABLE hadoop_prod.my_db.my_tb3
    USING iceberg
    AS select * 
       from hadoop_prod.my_db.my_tb1
       where id%2==1
""")

// 查询结果
spark.table("hadoop_prod.my_db.my_tb3").show()

执行上面的代码,查询结果如下:

+---+----+
| id|data|
+---+----+
|  3|    c|
|  5|    e|
+---+----+

请注意,使用RTAS,模式和分区规范将被替换。为了避免修改表的模式和分区,请使用INSERT OVERWRITE而不要使用REPLACE TABLE。REPLACE TABLE命令中的新表属性将与任何现有的表属性合并。如果更改了现有的表属性,则会更新它们,否则它们就会被保存下来。

表创建命令,包括CTAS和RTAS,支持所有Spark create子句,包括:

  • (1) PARTITION BY (partition-expressions):配置分区。
  • (2) LOCATION '(fully-qualified-uri)' :设置表位置。
  • (3) COMMENT 'table documentation' :设置一个表的描述信息。
  • (4) TBLPROPERTIES ('key'='value', ...):设置表配置信息。

表创建命令也可以使用USING子句设置默认格式。这只支持SparkCatalog,因为Spark对内置catalog的USING子句处理方式不同。

3.删除Iceberg表

如果想要删除一张Iceberg表,可以使用drop table命令,代码如下:

// 删除表my_tb3
spark.sql("drop table hadoop_prod.my_db.my_tb3") 

// 查看my_db中有哪些表
spark.sql("show tables in hadoop_prod.my_db").show()

执行上面的代码,查询结果如下:

+---------+---------+
|namespace|tableName|
+---------+---------+
|    my_db|    my_tb1|
|    my_db|    my_tb2|
+---------+---------+