增量更新与合并更新
Iceberg不但支持增量更新,并且也支持合并更新。Apache Spark 3增加了对MERGE INTO查询的支持,可以表示行级更新。通过重写包含在overwrite提交中需要更新的行的数据文件,Iceberg支持MERGE INTO。
本节通过一个示例来演示Iceberg表中的增量更新实现和合并更新实现。
1.配置Catalog
要支持增量更新和合并更新,需要配置spark.sql.extensions扩展支持,并指定Catalog。下面以Zeppelin Notebook中为例,配置它并配置基于Hadoop路径的Catalog,代码如下:
%spark.conf spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions spark.sql.catalog.hadoop_prod org.apache.iceberg.spark.SparkCatalog spark.sql.catalog.hadoop_prod.type hadoop spark.sql.catalog.hadoop_prod.warehouse hdfs://xueai8:8020/data_lake/iceberg
2.初始数据湖表
首先将一个原始商品数据集写入到Iceberg数据湖中,创建一个原始的Iceber表。这个原始商品数据集文件phones.csv的内容如下:
id,price,stock 小米,1299.00,100 苹果,3299.00,300 华为,2299.00,200
读取这个CSV文件到DataFrame,然后写入到一个Iceberg表中,代码如下:
// 使用Spark SQL API读取这个CSV文件到一个DataFrame中
val path = "/data/data_lake/iceberg/phones.csv"
val df = spark
.read
.option("header", "true")
.option("charset", "UTF8")
.csv(path)
// 查看模式和内容
df.printSchema()
df.show
执行以上代码,输出内容如下:
root |-- id: string (nullable = true) |-- price: string (nullable = true) |-- stock: string (nullable = true) +----+-------+-----+ | id| price|stock| +----+-------+-----+ | 小米|1299.00| 100| | 苹果|3299.00| 300| | 华为|2299.00| 200| +----+-------+-----+
然后将该DataFrame作为一个Iceberg数据湖写出,代码如下:
df
.repartition(1)
.writeTo("hadoop_prod.my_db.phones")
.create() // 创建iceberg表
// 查看写入是否成功
spark.table("hadoop_prod.my_db.phones").show()
执行以上代码,应该可以看到输出的表数据,内容如下:
+----+-------+-----+ | id| price|stock| +----+-------+-----+ | 小米|1299.00| 100| | 苹果|3299.00| 300| | 华为|2299.00| 200| +----+-------+-----+
查看hadoop_prod.my_db.phones表的物理存储结构,在终端命令行下执行下面的命令:
$ hdfs dfs -ls -R /data_lake/iceberg/my_db/phones
可以看到目录结构如图11-35所示。
3.增量更新
Apache Iceberg支持增量更新。接下来向hadoop_prod.my_db.phones表插入一条新的数据记录,代码如下:
// 增加一条记录
spark.sql("insert into hadoop_prod.my_db.phones values('Nokia',699.00,350)")
// 查看这时表中的数据
spark.table("hadoop_prod.my_db.phones").show()
执行以上代码,输出结果如下:
+------+-------+-----+ | id| price|stock| +------+-------+-----+ | 小米|1299.00| 100| | 苹果|3299.00| 300| | 华为|2299.00| 200| | Nokia| 699.00| 350| +------+-------+-----+
也可以批量增加数据。例如有一个新的数据文件append_phones.csv,该数据文件内容如下(注意其包含的数据与phones.csv并无任何重复):
id,price,stock oppo,1688.00,230 noov,499.00,310
将数据文件追加到hadoop_prod.my_db.phones表中,代码如下:
// 增量数据文件
val appendData = "/data/data_lake/iceberg/append_phones.csv"
// 追加到表中
spark
.read
.option("header", "true")
.option("charset", "UTF8")
.csv(appendData)
.writeTo("hadoop_prod.my_db.phones")
.append()
// 查看此时表中的数据
spark.table("hadoop_prod.my_db.phones").show()
执行以上代码,输出结果如下:
+-----+-------+-----+ | id| price|stock| +-----+-------+-----+ | oppo|1688.00| 230| | noov| 499.00| 310| |Nokia| 699.00| 350| | 小米|1299.00| 100| | 苹果|3299.00| 300| | 华为|2299.00| 200| +-----+-------+-----+
4.合并更新(MERGE INTO / UPSERT)
随着Apache Spark 3.0的发布,Apache Iceberg通过MERGE INTO查询支持upsert。它们使用直接的“写时复制(copy-on-write)”方法,即需要更新记录的文件会立即被重写。
MERGE INTO使用一组更新数据(称为source)来更新一个表(称为target表)。使用类似于连接条件的ON子句可以找到target表中某一行的更新。MERGE INTO的语法如下:
MERGE INTO prod.db.target t -- target表 USING (SELECT ...) s -- source更新 ON t.id = s.id -- 用于查找目标行更新的条件 WHEN ... -- 更新
对目标表中行的更新使用WHEN MATCHED…THEN…列出。多个MATCHED子句可以添加条件,以确定何时应用每个匹配。使用第一个匹配的表达式。模板代码如下:
WHEN MATCHED AND s.op = 'delete' THEN DELETE WHEN MATCHED AND t.count IS NULL AND s.op = 'increment' THEN UPDATE SET t.count = 0 WHEN MATCHED AND s.op = 'increment' THEN UPDATE SET t.count = t.count + 1
不匹配的source行(更新)可以插入。模板代码如下:
WHEN NOT MATCHED THEN INSERT *
插入还支持其他条件。模板代码如下:
WHEN NOT MATCHED AND s.event_time > still_valid_threshold THEN INSERT (id, count) VALUES (s.id, 1)
源数据(source)中只有一条记录可以更新目标表中的任何给定行,否则将抛出错误。
假设现在有一个包含一组更新数据的文件merge_phones.csv,其中既包含有对表my_db.phones中部分数据的修改(例如修改价格price、库存stock等),也包含有表my_db.phones中未曾存储过的数据(在新增加的数据),内容如下:
id,price,stock 小米,1199.00,100 苹果,2799.00,150 vivo,2199.00,120
现在希望将这个merge_phones.csv文件中的数据与my_db.phones表中原有数据进行合并:如果商品ID已经在表中,则更新该商品价格和库存数量;如果表中还没有那个商品的记录,就插入这个新商品的记录到表中。
首先将这个新数据文件加载到DataFrame中,再写入到Iceberg数据湖的new_phones表中,代码如下:
// 新的数据文件
val mergeData = "/data/data_lake/iceberg/merge_phones.csv"
// 读取到DataFrame中
val mergeDF = spark
.read
.option("header", "true")
.option("charset", "UTF8")
.csv(mergeData)
// mergeDF.show()
// 写入到Iceberg中
mergeDF
.repartition(1)
.writeTo("hadoop_prod.my_db.new_phones")
.create()
// 查看写入是否成功
spark.table("hadoop_prod.my_db.new_phones").show()
执行以上代码,输出结果如下所示。
+----+-------+-----+ | id| price|stock| +----+-------+-----+ | 小米|1199.00| 100| | 苹果|2799.00| 150| |vivo|2199.00| 120| +----+-------+-----+
从输出结果可以看出,因为小米和苹果产品在源表中有,所以对这两个产品的价格和库存数量做了修改;而vivo在源表中是没有的,所以执行插入操作。 现在有了一个源表new_phones和一个目标表phones。接下来,将源表合并到目标表中,执行merge into操作,代码如下:
// merge into / upsert
spark.sql( """
merge into hadoop_prod.my_db.phones p
using (select * from hadoop_prod.my_db.new_phones) n
on p.id=n.id
when matched then update set p.price=n.price,p.stock=n.stock
when not matched then insert *
""")
// 查看合并结果
spark.table("hadoop_prod.my_db.phones").show()
执行以上合并更新代码后,再次查询表中数据,得到的输出内容如下:
+-----+-------+-----+ | id| price|stock| +-----+-------+-----+ | oppo|1688.00| 230| | noov| 499.00| 310| | vivo|2199.00| 120| | 华为|2299.00| 200| | 小米|1199.00| 100| | 苹果|2799.00| 150| |Nokia| 699.00| 350| +-----+-------+-----+
从上面的输出结果可以看出,vivo所在行是新增的商品记录,而小米和苹果所在行的两个商品价格已得到调整。
5.增量读取
要增量地读取追加的数据,可以使用以下两种方式:
- (1) start-snapshot-id:用于增量扫描的起始快照ID(不包含)。
- (2) end-snapshot-id:用于增量扫描的结束快照ID(包含)。这是可选的。忽略它将默认为当前快照。
首先查看所有的快照信息,代码如下:
spark.sql("select * from hadoop_prod.my_db.phones.snapshots")
.select("committed_at","snapshot_id","parent_id","operation")
.show(false)
执行以上代码,输出结果如下:
+-----------------------+-------------------+-------------------+---------+ |committed_at |snapshot_id |parent_id |operation| +-----------------------+-------------------+-------------------+---------+ |2022-03-14 19:54:35.235|4093447139497540820|null |append | |2022-03-14 20:02:29.811|3260015930261212891|4093447139497540820|append | |2022-03-14 20:02:36.021|3128063198822559538|3260015930261212891|append | |2022-03-14 20:08:16.768|4514244045809687626|3128063198822559538|append | +-----------------------+-------------------+-------------------+---------+
下面获取start-snapshot-id (3128063198822559538L)之后到 end-snapshot-id (4514244045809687626L)的增量数据,代码如下:
spark.read
.format("iceberg")
.option("start-snapshot-id", "3128063198822559538")
.option("end-snapshot-id", "4514244045809687626")
.load("hadoop_prod.my_db.phones")
.show()
执行以上代码,可以得到如下的输出内容:
+----+-------+-----+ | id| price|stock| +----+-------+-----+ |oppo|1688.00| 230| |noov| 499.00| 310| +----+-------+-----+
注意: 当前仅从append追加操作获取数据。 不支持replace、overwrite、delete操作。Spark的SQL语法不支持增量读。