发布日期:2022-10-24 VIP内容

修改表结构

Iceberg支持使用ALTER TABLE命令对表结构进行修改。在Apache Spark 3中,Iceberg有完整的ALTER TABLE支持,包括:

  • (1) 重命名表。
  • (2) 设置或删除表属性。
  • (3) 添加、删除和重命名列。
  • (4) 添加、删除和重命名嵌套字段。
  • (5) 重新排序顶级列和嵌套的struct字段。
  • (6) 扩大int、float和decimal字段的类型。
  • (7) 使必需列为可选列。

此外,可以使用SQL扩展来添加对分区演化和设置表的写顺序的支持。不过需要特别注意的是,ALTER TABLE不能够修改Hadoop表,也就是说,在HadoopCatalog类型的catalog中创建的表,不能使用ALTER TABLE命令,只有HiveCatalog类型的catalog中创建的表才支持ALTER TABLE。

为了演示ALTER TABLE命令的使用,先配置一个名为hive_prod的HiveCatalog类型的catalog。在Zeppelin Notebook中指定配置(如果是使用spark-shell,请参考11.2.1节),代码如下:

%spark.conf
spark.jars.packages         		iceberg-spark-runtime-3.1_2.12:0.13.1
spark.sql.extensions         		org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.sql.catalog.hive_prod   		org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hive_prod.type 	hive
spark.sql.catalog.hive_prod.uri 	thrift://localhost:9083

接下来在hive_prod中创建一个名为my_db的命名空间(数据库),代码如下:

// 创建命名空间
spark.sql("CREATE NAMESPACE hive_prod.my_db")

// 查看是否创建成功
spark.sql("show namespaces in hive_prod").show()

执行上面的代码,在输出结果中应当能够看到创建的my_db命名空间。

然后,在hive_prod.my_db命名空间中创建一个名为my_tb的Iceberg表,代码如下:

// 创建iceberg表
spark.sql("""
    CREATE TABLE hive_prod.my_db.my_tb ( 
      id bigint,
      data string
    ) 
    USING iceberg
""")

// 向iceberg表中插入数据
spark.sql("""
    INSERT INTO hive_prod.my_db.my_tb
    VALUES 
    (1, 'a'), 
    (2, 'b'), 
    (3, 'c')
""")

// 查看创建和插入是否成功
spark.sql("select * from hive_prod.my_db.my_tb").show()

执行上面的代码,应该可以看到输出内容如下:

+---+----+
| id|data|
+---+----+
|  1|    a|
|  2|    b|
|  3|    c|
+---+----+

接下来,学习一些常用的ALTER TABLE命令。

(1) 重命名表。

语法:ALTER TABLE ... RENAME TO

下面将表my_tb重命名为new_tb,代码如下:

spark.sql("ALTER TABLE hive_prod.my_db.my_tb RENAME TO hive_prod.my_db.new_tb")

执行以上代码,然后查看my_db中的表名,代码如下:

// 查询hadoop_catalog.my_db数据库中有哪些表
spark.sql("SHOW TABLES IN hive_prod.my_db").show()

执行以上代码,输出结果如下:

+---------+---------+
|namespace|tableName|
+---------+---------+
|    my_db|    new_tb|
+---------+---------+

(2) 设置表属性。

语法:ALTER TABLE ... SET TBLPROPERTIES

例如,将new_tb表的read.split.target-size的属性值设置为256M,代码如下:

// 设置表属性
spark.sql("""
    ALTER TABLE hive_prod.my_db.new_tb SET TBLPROPERTIES (
        'read.split.target-size'='268435456'
    )
""")

// 查看表的描述
spark.sql("desc formatted hive_prod.my_db.new_tb").show(false)

Iceberg使用表属性来控制表行为。执行上面的代码,输出内容如图11-11所示。

可以看出,表属性设置成功。可以用UNSET删除指定的属性,代码如下:

// 删除属性
spark.sql("""
    ALTER TABLE hive_prod.my_db.new_tb UNSET TBLPROPERTIES ('read.split.target-size')
""")

// 查看表的描述
spark.sql("desc formatted hive_prod.my_db.new_tb").show(false)

再一次查看表的属性,可以看到属性删除成功,如图11-12所示。

(3) 增加新的属性列。

语法:ALTER TABLE ... ADD COLUMN

要向Iceberg添加一个列,可以使用ALTER TABLE中的ADD COLUMNS子句,代码如下:

// 增加列
spark.sql("""
    ALTER TABLE hive_prod.my_db.new_tb
    ADD COLUMNS (new_column string comment 'new_column docs')
""")

// 查看表的描述
spark.sql("desc formatted hive_prod.my_db.new_tb").show(false)

可以同时添加多个列,以逗号分隔。执行以上代码,然后再查看表的属性,可以看到表多了一个新的属性列,如图11-13所示。

还可以增加嵌套列。例如,为表new_tb增加了一个struct类型的列point,代码如下:

// 创建一个struct列
spark.sql("""
    ALTER TABLE hive_prod.my_db.new_tb
    ADD COLUMN point struct<x: double, y: double>
""")

// 查看表的描述
spark.sql("desc formatted hive_prod.my_db.new_tb").show(false)

查看表的属性,如图11-14所示。

对于嵌套的列,在引用时应该使用完整的列名来标识。例如,为point增加一个嵌套的子列z,代码如下:

// 向该struct添加一个字段
spark.sql("ALTER TABLE hive_prod.my_db.new_tb ADD COLUMN point.z double")

查看表的属性,如图11-15所示。

为表new_tb增加新的数组列、map列以及如何进行修改这些嵌套列,代码如下:

// 创建一个嵌套的struct数组列
spark.sql("""
    ALTER TABLE hive_prod.my_db.new_tb
    ADD COLUMN points2 array<struct<x: double, y: double>>
""")

// 为数组列中嵌套的struct增加字段。使用关键字'element'以访问数组的元素列。
spark.sql("""
    ALTER TABLE hive_prod.my_db.new_tb
    ADD COLUMN points2.element.z double
""")

// 创建一个map列,key和value都是struct
spark.sql("""
    ALTER TABLE hive_prod.my_db.new_tb
    ADD COLUMN points3 map<struct<x: int>, struct<a: int>>
""")

// 为map的struct值增加一个字段。使用关键字'value'以访问map的值列。
spark.sql("""
    ALTER TABLE hive_prod.my_db.new_tb
    ADD COLUMN points3.value.b int 
""") 

// 查看表的描述信息
spark.sql("desc formatted hive_prod.my_db.new_tb").show(false) 

可以看到修改以后表的结构和属性如图11-16所示。

在Spark 2.4.4及以后版本中,可以通过添加FIRST或AFTER子句在任何位置添加列。但改变列的位置,有一个隐含的要求,即所有位置改变的字段类型要兼容。代码如下:

// 在new_column后增加一列other_column
spark.sql("""
    ALTER TABLE hive_prod.my_db.new_tb
    ADD COLUMN other_column string AFTER new_column
""") 

上面这句执行失败。它在new_column后增加other_column,会造成other_column和后续的point、points2、points3不兼容。

如果是增加的字段类型与之前的类型兼容,则一切正常,代码如下:

// 在嵌套的point列中增加一个子列q,放在第一个位置
spark.sql("""
    ALTER TABLE hive_prod.my_db.new_tb
    ADD COLUMN point.w double FIRST 
""") 

// 查看表结构描述信息
spark.sql("desc hive_prod.my_db.new_tb").show(false) 

执行以后,可以看到修改以后表的结构和属性如图11-17所示。

(4) 重命名列。

语法:ALTER TABLE ... RENAME COLUMN

Iceberg允许对任何字段进行重命名。要重命名一个字段,使用RENAME COLUMN。例如,对字段和嵌套字段的重命名,代码如下:

spark.sql("""
    ALTER TABLE hive_prod.my_db.new_tb RENAME COLUMN data TO payload
""") 

spark.sql("""
    ALTER TABLE hive_prod.my_db.new_tb RENAME COLUMN point.z TO zip
""") 

// 查看表结构描述信息
spark.sql("desc hive_prod.my_db.new_tb").show(false)  

执行以后,可以看到修改以后表的结构和属性如图11-18所示。

注意,嵌套的重命名命令只重命名叶子字段。上面的命令将location.lat重命名为location.latitude。

(5) 修改列定义。

语法:ALTER TABLE ... ALTER COLUMN

Alter column可用于扩展类型、使字段可选、设置注释和重新排序字段。

如果修改是安全的,Iceberg允许改变列类型。安全的列类型修改,代码如下:

int => bigint
float => double
decimal(P,S) => decimal(P2,S) ,当P2 > P (规模不能改变)

例如,为payload字段添加设置注释,代码如下:

spark.sql("""
    ALTER TABLE hive_prod.my_db.new_tb ALTER COLUMN payload comment '原字段名为data'
""") 

// 查看表结构描述信息
spark.sql("desc hive_prod.my_db.new_tb").show(false)  

执行以后,可以看到修改以后表的结构和属性如图11-19所示。

在Iceberg中,允许使用FIRST和AFTER子句对结构中的顶级列或列进行重新排序,代码如下:

spark.sql("""
    ALTER TABLE hive_prod.my_db.new_tb ALTER COLUMN payload AFTER new_column
""") 

spark.sql("""
    ALTER TABLE hive_prod.my_db.new_tb ALTER COLUMN point.w AFTER x
""")  

// 查看表结构描述信息
spark.sql("desc hive_prod.my_db.new_tb").show(false)

执行以后,可以看到字段顺序进行了调整,如图11-20所示。

注意: 改变列的位置操作,有一个隐含的要求,即所有位置改变的字段类型要兼容,否则执行失败。

注意: ALTER COLUMN不用于更新结构(struct)类型。使用ADD COLUMN和DROP COLUMN来添加或删除结构字段。

可以使用DROP NOT NULL将不允许为空的列更改为允许为空。例如,设置id列允许为空,代码如下:

spark.sql("""
    ALTER TABLE hive_prod.my_db.new_tb ALTER COLUMN id DROP NOT NULL 
""")  

注意: 目前Iceberg还不支持将允许为空的列更改为不允许为空。

(6) 删除列。

语法:ALTER TABLE ... DROP COLUMN。

要删除列,使用ALTER TABLE ... DROP COLUMN。例如,删除my_tb表中嵌套的point.w列,代码如下:

spark.sql("ALTER TABLE hive_prod.my_db.new_tb DROP COLUMN point.w")  

执行以上代码,可以看到表结构如图11-21所示:

注意: 同样地,删除某个列时,要保证其后面的列在各自的位置上与现有列兼容。如果不兼容,则删除失败。