发布日期:2022-10-25 VIP内容

增量更新与合并更新

Iceberg不但支持增量更新,并且也支持合并更新。Apache Spark 3增加了对MERGE INTO查询的支持,可以表示行级更新。通过重写包含在overwrite提交中需要更新的行的数据文件,Iceberg支持MERGE INTO。

本节通过一个示例来演示Iceberg表中的增量更新实现和合并更新实现。

1.配置Catalog

要支持增量更新和合并更新,需要配置spark.sql.extensions扩展支持,并指定Catalog。下面以Zeppelin Notebook中为例,配置它并配置基于Hadoop路径的Catalog,代码如下:

%spark.conf
spark.sql.extensions                      org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.sql.catalog.hadoop_prod             org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hadoop_prod.type        hadoop
spark.sql.catalog.hadoop_prod.warehouse   hdfs://xueai8:8020/data_lake/iceberg

2.初始数据湖表

首先将一个原始商品数据集写入到Iceberg数据湖中,创建一个原始的Iceber表。这个原始商品数据集文件phones.csv的内容如下:

id,price,stock
小米,1299.00,100
苹果,3299.00,300
华为,2299.00,200

读取这个CSV文件到DataFrame,然后写入到一个Iceberg表中,代码如下:

// 使用Spark SQL API读取这个CSV文件到一个DataFrame中
val path = "/data/data_lake/iceberg/phones.csv"
val df = spark
  .read
  .option("header", "true")
  .option("charset", "UTF8")
  .csv(path)

// 查看模式和内容
df.printSchema()
df.show

执行以上代码,输出内容如下:

root
 |-- id: string (nullable = true)
 |-- price: string (nullable = true)
 |-- stock: string (nullable = true)

+----+-------+-----+
|  id|   price|stock|
+----+-------+-----+
| 小米|1299.00|  100|
| 苹果|3299.00|  300|
| 华为|2299.00|  200|
+----+-------+-----+

然后将该DataFrame作为一个Iceberg数据湖写出,代码如下:

df
  .repartition(1)
  .writeTo("hadoop_prod.my_db.phones")
  .create()      // 创建iceberg表

// 查看写入是否成功
spark.table("hadoop_prod.my_db.phones").show()

执行以上代码,应该可以看到输出的表数据,内容如下:

+----+-------+-----+
|  id|   price|stock|
+----+-------+-----+
| 小米|1299.00|  100|
| 苹果|3299.00|  300|
| 华为|2299.00|  200|
+----+-------+-----+

查看hadoop_prod.my_db.phones表的物理存储结构,在终端命令行下执行下面的命令:

$ hdfs dfs -ls -R /data_lake/iceberg/my_db/phones

可以看到目录结构如图11-35所示。

3.增量更新

Apache Iceberg支持增量更新。接下来向hadoop_prod.my_db.phones表插入一条新的数据记录,代码如下:

// 增加一条记录
spark.sql("insert into hadoop_prod.my_db.phones values('Nokia',699.00,350)")

// 查看这时表中的数据
spark.table("hadoop_prod.my_db.phones").show()

执行以上代码,输出结果如下:

+------+-------+-----+
|    id|   price|stock|
+------+-------+-----+
|   小米|1299.00|  100|
|   苹果|3299.00|  300|
|   华为|2299.00|  200|
| Nokia| 699.00|  350|
+------+-------+-----+

也可以批量增加数据。例如有一个新的数据文件append_phones.csv,该数据文件内容如下(注意其包含的数据与phones.csv并无任何重复):

id,price,stock
oppo,1688.00,230
noov,499.00,310

将数据文件追加到hadoop_prod.my_db.phones表中,代码如下:

// 增量数据文件
val appendData = "/data/data_lake/iceberg/append_phones.csv"

// 追加到表中
spark
  .read
  .option("header", "true")
  .option("charset", "UTF8")
  .csv(appendData)
  .writeTo("hadoop_prod.my_db.phones")
  .append()

// 查看此时表中的数据
spark.table("hadoop_prod.my_db.phones").show()

执行以上代码,输出结果如下:

+-----+-------+-----+
|   id|  price|stock|
+-----+-------+-----+
| oppo|1688.00|  230|
| noov| 499.00|  310|
|Nokia| 699.00|  350|
|  小米|1299.00|  100|
|  苹果|3299.00|  300|
|  华为|2299.00|  200|
+-----+-------+-----+

4.合并更新(MERGE INTO / UPSERT)

随着Apache Spark 3.0的发布,Apache Iceberg通过MERGE INTO查询支持upsert。它们使用直接的“写时复制(copy-on-write)”方法,即需要更新记录的文件会立即被重写。

MERGE INTO使用一组更新数据(称为source)来更新一个表(称为target表)。使用类似于连接条件的ON子句可以找到target表中某一行的更新。MERGE INTO的语法如下:

MERGE INTO prod.db.target t   	-- target表
USING (SELECT ...) s          	-- source更新
ON t.id = s.id                		-- 用于查找目标行更新的条件
WHEN ...                      	-- 更新

对目标表中行的更新使用WHEN MATCHED…THEN…列出。多个MATCHED子句可以添加条件,以确定何时应用每个匹配。使用第一个匹配的表达式。模板代码如下:

WHEN MATCHED AND s.op = 'delete' THEN DELETE
WHEN MATCHED AND t.count IS NULL AND s.op = 'increment' THEN UPDATE SET t.count = 0
WHEN MATCHED AND s.op = 'increment' THEN UPDATE SET t.count = t.count + 1

不匹配的source行(更新)可以插入。模板代码如下:

WHEN NOT MATCHED THEN INSERT *

插入还支持其他条件。模板代码如下:

WHEN NOT MATCHED AND s.event_time > still_valid_threshold THEN INSERT (id, count) VALUES (s.id, 1)

源数据(source)中只有一条记录可以更新目标表中的任何给定行,否则将抛出错误。

假设现在有一个包含一组更新数据的文件merge_phones.csv,其中既包含有对表my_db.phones中部分数据的修改(例如修改价格price、库存stock等),也包含有表my_db.phones中未曾存储过的数据(在新增加的数据),内容如下:

id,price,stock
小米,1199.00,100
苹果,2799.00,150
vivo,2199.00,120

现在希望将这个merge_phones.csv文件中的数据与my_db.phones表中原有数据进行合并:如果商品ID已经在表中,则更新该商品价格和库存数量;如果表中还没有那个商品的记录,就插入这个新商品的记录到表中。

首先将这个新数据文件加载到DataFrame中,再写入到Iceberg数据湖的new_phones表中,代码如下:

// 新的数据文件
val mergeData = "/data/data_lake/iceberg/merge_phones.csv"

// 读取到DataFrame中
val mergeDF = spark
  .read
  .option("header", "true")
  .option("charset", "UTF8")
  .csv(mergeData)
// mergeDF.show()

// 写入到Iceberg中
mergeDF
  .repartition(1)
  .writeTo("hadoop_prod.my_db.new_phones")
  .create()

// 查看写入是否成功
spark.table("hadoop_prod.my_db.new_phones").show()

执行以上代码,输出结果如下所示。

+----+-------+-----+
|  id|   price|stock|
+----+-------+-----+
| 小米|1199.00|  100|
| 苹果|2799.00|  150|
|vivo|2199.00|  120|
+----+-------+-----+

从输出结果可以看出,因为小米和苹果产品在源表中有,所以对这两个产品的价格和库存数量做了修改;而vivo在源表中是没有的,所以执行插入操作。 现在有了一个源表new_phones和一个目标表phones。接下来,将源表合并到目标表中,执行merge into操作,代码如下:

// merge into / upsert
spark.sql( """
    merge into hadoop_prod.my_db.phones p 
    using (select * from hadoop_prod.my_db.new_phones) n
    on p.id=n.id
    when matched then update set p.price=n.price,p.stock=n.stock
    when not matched then insert *
""")

// 查看合并结果
spark.table("hadoop_prod.my_db.phones").show()

执行以上合并更新代码后,再次查询表中数据,得到的输出内容如下:

+-----+-------+-----+
|   id|   price|stock|
+-----+-------+-----+
| oppo|1688.00|  230|
| noov| 499.00|  310|
| vivo|2199.00|  120|
|  华为|2299.00|  200|
|  小米|1199.00|  100|
|  苹果|2799.00|  150|
|Nokia| 699.00|  350|
+-----+-------+-----+

从上面的输出结果可以看出,vivo所在行是新增的商品记录,而小米和苹果所在行的两个商品价格已得到调整。

5.增量读取

要增量地读取追加的数据,可以使用以下两种方式:

  • (1) start-snapshot-id:用于增量扫描的起始快照ID(不包含)。
  • (2) end-snapshot-id:用于增量扫描的结束快照ID(包含)。这是可选的。忽略它将默认为当前快照。

首先查看所有的快照信息,代码如下:

spark.sql("select * from hadoop_prod.my_db.phones.snapshots")
     .select("committed_at","snapshot_id","parent_id","operation")
     .show(false)

执行以上代码,输出结果如下:

+-----------------------+-------------------+-------------------+---------+
|committed_at             |snapshot_id          |parent_id         |operation|
+-----------------------+-------------------+-------------------+---------+
|2022-03-14 19:54:35.235|4093447139497540820|null                  |append   |
|2022-03-14 20:02:29.811|3260015930261212891|4093447139497540820|append   |
|2022-03-14 20:02:36.021|3128063198822559538|3260015930261212891|append   |
|2022-03-14 20:08:16.768|4514244045809687626|3128063198822559538|append   |
+-----------------------+-------------------+-------------------+---------+

下面获取start-snapshot-id (3128063198822559538L)之后到 end-snapshot-id (4514244045809687626L)的增量数据,代码如下:

spark.read
  .format("iceberg")
  .option("start-snapshot-id", "3128063198822559538")
  .option("end-snapshot-id", "4514244045809687626")
  .load("hadoop_prod.my_db.phones")
  .show()

执行以上代码,可以得到如下的输出内容:

+----+-------+-----+
|  id|  price|stock|
+----+-------+-----+
|oppo|1688.00|  230|
|noov| 499.00|  310|
+----+-------+-----+

注意: 当前仅从append追加操作获取数据。 不支持replace、overwrite、delete操作。Spark的SQL语法不支持增量读。