发布日期:2022-10-20 VIP内容

基本的CRUD操作

Apache Iceberg作为分析数据集的开放表格式,提供了很好的用户易用性。Iceberg支持SQL或DataFrame API的方式来读写数据湖表。

要在Apache Spark中创建Iceberg表,使用spark-shell或spark.sql(...)来运行CREATE TABLE命令。Apache Spark 3.0可以使用USING Iceberg子句在任何Iceberg catalog中创建表,Iceberg将把Spark中的列类型转换为相应的Iceberg类型。

要创建Iceberg表,有两种catalog目录选项:

  • (1) HadoopCatalog:支持存储在HDFS或本地文件系统中的表。
  • (2) HiveCatalog:使用Hive Metastore来通过存储对最新元数据文件的引用来跟踪Iceberg表。

在11.2.1节,已经创建了HadoopCatalog类型的hadoop_catalog和HiveCatalog类型的hive_prod。接下来,在hadoop_prod目录中创建数据库和表,代码如下:

// hadoop_prod是前面定义的基于路径的catalog
spark.sql("""
    CREATE TABLE hadoop_prod.my_db.my_tb1 ( 
      id bigint,
      data string
    ) 
    USING iceberg
""")

查看表是否创建成功,代码如下:

spark.sql("SHOW TABLES IN hadoop_prod.my_db").show()

一旦表被创建,就可以使用INSERT INTO来插入数据,代码如下:

spark.sql("""
    INSERT INTO hadoop_prod.my_db.my_tb1 
    VALUES 
    (1, 'a'), 
    (2, 'b'), 
    (3, 'c')
""")

查询Iceberg表中的插入的数据,代码如下:

spark.sql("select * from hadoop_prod.my_db.my_tb1").show()

执行以上代码,可以看到查询结果如下:

+---+----+
| id|data|
+---+----+
|  1|    a|
|  2|    b|
|  3|    c|
+---+----+

Apache Spark 3.1增加了对UPDATE查询的支持,该查询可以更新表中匹配的行。UPDATE查询接受过滤器来匹配要更新的行。例如,将my_tb1表中id为1这条记录的data字段值修改为b,代码如下:

spark.sql("""
   update hadoop_prod.my_db.my_tb1
   set data="b"
   where id=1
""")

然后查询更新后的数据,代码如下:

spark.sql("select * from hadoop_prod.my_db.my_tb1").show()

执行以上代码,查询结果如下:

+---+----+
| id|data|
+---+----+
|  1|    b|
|  2|    b|
|  3|    c|
+---+----+

也可以在Iceberg表上执行聚合计算。例如,按data字段分组计数,代码如下:

spark.sql("""
    select data, count(1) as cnt
    from hadoop_prod.my_db.my_tb1
    group by data
    order by cnt desc
""").show()

执行以上代码,输出结果如下:

+----+---+
|data|cnt|
+----+---+
|   b|   2|
|   c|   1|
+----+---+

Iceberg支持使用新的v2 DataFrameAPI来读写DataFrame。在下面的代码中,有一个新的数据集newData,将其内容增量写入上面的hadoop_prod.my_db.my_tb1表中,然后再一次查询表数据。代码如下:

// 有一个新的DataFrame                
val newData = Seq((4,"d"), (5,"e")).toDF("id", "data")
newData.show()

// 将其数据追加到hadoop_prod.my_db.my_tb1表中
newData.writeTo("hadoop_prod.my_db.my_tb1").append()

// 查询此时的表数据
spark.table("hadoop_prod.my_db.my_tb1").show()

执行以上代码,输出查询结果如下:

+---+----+
| id|data|
+---+----+
|  1|    b|
|  2|    b|
|  3|    c|
|  4|    d|
|  5|    e|
+---+----+

Spark 3增加了对DELETE FROM查询的支持,可以从表中删除数据。DELETE查询接受一个过滤器来匹配要删除的行。例如,从my_tb1表中删除id为1的数据,代码如下:

spark.sql("DELETE FROM hadoop_prod.my_db.my_tb1 WHERE id=1") 

// 查询此时的表数据
spark.table("hadoop_prod.my_db.my_tb1").show()

执行以上代码,输出查询结果如下:

+---+----+
| id|data|
+---+----+
|  2|    b|
|  3|    c|
|  4|    d|
|  5|    e|
+---+----+

1.多catalog支持

从上面的代码中可以看到,当查询Iceberg表时,既可以使用spark.sql(...),也可以使用spark.table(...)。实际上,在Spark 3中,Iceberg 0.13.1都为DataFrameReader增加了多catalog支持。

可以通过Spark的DataFrameReader接口加载路径path和表名。表的加载方式取决于标识符的指定方式。当使用spark.read.format("iceberg").path(table)或spark.table(table)时,table变量可以有以下几种形式:

  • (1) file:/path/to/table:加载一个给定路径的HadoopTable。
  • (2) tablename:加载currentCatalog.currentNamespace.tablename。
  • (3) catalog.tablename:从指定的catalog加载tablename。
  • (4) namespace.tablename:从当前catalog加载namespace.tablename。
  • (5) catalog.namespace.tablename:从指定的catalog加载namespace.tablename。
  • (6) namespace1.namespace2.tablename:从当前catalog加载namespace1.namespace2. tablename。

上面的列表是按先后顺序排列的。例如:匹配的catalog将优先于任何名称空间解析。

2.类型适配

Spark和Iceberg支持不同的类型集。Iceberg会自动进行类型转换,但不是针对所有组合,因此在设计表中的列类型之前,可能希望了解Iceberg中的类型转换。

1)从Spark类型到Iceberg类型

此类型转换表描述了如何将Spark类型转换为Iceberg类型。这个转换既适用于创建Iceberg表,也适用于通过Spark写入Iceberg表,见下表。

2)Iceberg类型到Spark类型

此类型转换表描述了如何将Iceberg类型转换为Spark类型。转换应用于通过Spark从Iceberg表读取数据,见下表。