基本的CRUD操作
Apache Iceberg作为分析数据集的开放表格式,提供了很好的用户易用性。Iceberg支持SQL或DataFrame API的方式来读写数据湖表。
要在Apache Spark中创建Iceberg表,使用spark-shell或spark.sql(...)来运行CREATE TABLE命令。Apache Spark 3.0可以使用USING Iceberg子句在任何Iceberg catalog中创建表,Iceberg将把Spark中的列类型转换为相应的Iceberg类型。
要创建Iceberg表,有两种catalog目录选项:
- (1) HadoopCatalog:支持存储在HDFS或本地文件系统中的表。
- (2) HiveCatalog:使用Hive Metastore来通过存储对最新元数据文件的引用来跟踪Iceberg表。
在11.2.1节,已经创建了HadoopCatalog类型的hadoop_catalog和HiveCatalog类型的hive_prod。接下来,在hadoop_prod目录中创建数据库和表,代码如下:
// hadoop_prod是前面定义的基于路径的catalog
spark.sql("""
CREATE TABLE hadoop_prod.my_db.my_tb1 (
id bigint,
data string
)
USING iceberg
""")
查看表是否创建成功,代码如下:
spark.sql("SHOW TABLES IN hadoop_prod.my_db").show()
一旦表被创建,就可以使用INSERT INTO来插入数据,代码如下:
spark.sql("""
INSERT INTO hadoop_prod.my_db.my_tb1
VALUES
(1, 'a'),
(2, 'b'),
(3, 'c')
""")
查询Iceberg表中的插入的数据,代码如下:
spark.sql("select * from hadoop_prod.my_db.my_tb1").show()
执行以上代码,可以看到查询结果如下:
+---+----+ | id|data| +---+----+ | 1| a| | 2| b| | 3| c| +---+----+
Apache Spark 3.1增加了对UPDATE查询的支持,该查询可以更新表中匹配的行。UPDATE查询接受过滤器来匹配要更新的行。例如,将my_tb1表中id为1这条记录的data字段值修改为b,代码如下:
spark.sql("""
update hadoop_prod.my_db.my_tb1
set data="b"
where id=1
""")
然后查询更新后的数据,代码如下:
spark.sql("select * from hadoop_prod.my_db.my_tb1").show()
执行以上代码,查询结果如下:
+---+----+ | id|data| +---+----+ | 1| b| | 2| b| | 3| c| +---+----+
也可以在Iceberg表上执行聚合计算。例如,按data字段分组计数,代码如下:
spark.sql("""
select data, count(1) as cnt
from hadoop_prod.my_db.my_tb1
group by data
order by cnt desc
""").show()
执行以上代码,输出结果如下:
+----+---+ |data|cnt| +----+---+ | b| 2| | c| 1| +----+---+
Iceberg支持使用新的v2 DataFrameAPI来读写DataFrame。在下面的代码中,有一个新的数据集newData,将其内容增量写入上面的hadoop_prod.my_db.my_tb1表中,然后再一次查询表数据。代码如下:
// 有一个新的DataFrame
val newData = Seq((4,"d"), (5,"e")).toDF("id", "data")
newData.show()
// 将其数据追加到hadoop_prod.my_db.my_tb1表中
newData.writeTo("hadoop_prod.my_db.my_tb1").append()
// 查询此时的表数据
spark.table("hadoop_prod.my_db.my_tb1").show()
执行以上代码,输出查询结果如下:
+---+----+ | id|data| +---+----+ | 1| b| | 2| b| | 3| c| | 4| d| | 5| e| +---+----+
Spark 3增加了对DELETE FROM查询的支持,可以从表中删除数据。DELETE查询接受一个过滤器来匹配要删除的行。例如,从my_tb1表中删除id为1的数据,代码如下:
spark.sql("DELETE FROM hadoop_prod.my_db.my_tb1 WHERE id=1")
// 查询此时的表数据
spark.table("hadoop_prod.my_db.my_tb1").show()
执行以上代码,输出查询结果如下:
+---+----+ | id|data| +---+----+ | 2| b| | 3| c| | 4| d| | 5| e| +---+----+
1.多catalog支持
从上面的代码中可以看到,当查询Iceberg表时,既可以使用spark.sql(...),也可以使用spark.table(...)。实际上,在Spark 3中,Iceberg 0.13.1都为DataFrameReader增加了多catalog支持。
可以通过Spark的DataFrameReader接口加载路径path和表名。表的加载方式取决于标识符的指定方式。当使用spark.read.format("iceberg").path(table)或spark.table(table)时,table变量可以有以下几种形式:
- (1) file:/path/to/table:加载一个给定路径的HadoopTable。
- (2) tablename:加载currentCatalog.currentNamespace.tablename。
- (3) catalog.tablename:从指定的catalog加载tablename。
- (4) namespace.tablename:从当前catalog加载namespace.tablename。
- (5) catalog.namespace.tablename:从指定的catalog加载namespace.tablename。
- (6) namespace1.namespace2.tablename:从当前catalog加载namespace1.namespace2. tablename。
上面的列表是按先后顺序排列的。例如:匹配的catalog将优先于任何名称空间解析。
2.类型适配
Spark和Iceberg支持不同的类型集。Iceberg会自动进行类型转换,但不是针对所有组合,因此在设计表中的列类型之前,可能希望了解Iceberg中的类型转换。
1)从Spark类型到Iceberg类型
此类型转换表描述了如何将Spark类型转换为Iceberg类型。这个转换既适用于创建Iceberg表,也适用于通过Spark写入Iceberg表,见下表。
2)Iceberg类型到Spark类型
此类型转换表描述了如何将Iceberg类型转换为Spark类型。转换应用于通过Spark从Iceberg表读取数据,见下表。