分区演变示例
本节通过一个应用示例来深入理解Iceberg的分区演变及其底层实现过程。该应用示例模拟了如下的业务场景:
某公司(暂且称为X公司)从2008年开始,开发软件业务,拓展客户和市场。其日志事件数据是按月分区存储在Iceberg数据湖中。随着业务的发展,日志事件信息发生的越来越频繁。公司决定,从2009年开始,将日志事件按天进行分区存储。为此,开发如下代码来完成这个场景。
为了简单起见,本示例使用一个简单的数据集logdata.csv,它模仿X公司开发的某些软件产品的日志表,包含三个字段列,分别为ts、log_id和log_msg。注意,数据中的ts列显示为与UNIX时间戳(以秒为单位)对应的long数据类型。
数据集logdata.csv文件内容如下:
1225526400,1,a 1225699200,2,b 1225785600,3,c 1226476800,4,d 1226908800,5,e 1226995200,6,f 1227513600,7,g 1227772800,8,h 1228032000,9,i 1228118400,10,j 1228377600,11,k 1228809600,12,l 1228982400,13,m 1229673600,14,n 1230019200,15,o 1230278400,16,p 1230451200,17,q 1230624000,18,r 1230710400,19,s 1230796800,20,t 1230969600,21,u 1231747200,22,v 1232352000,23,w 1232784000,24,x 1233216000,25,y 1233302400,26,z
继续使用HadoopCatalog类型的catalog,基于Hadoop路径。名为hadoop_prod的catalog配置如下:
%spark.conf spark.sql.extensions org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions spark.sql.catalog.hadoop_prod org.apache.iceberg.spark.SparkCatalog spark.sql.catalog.hadoop_prod.type hadoop spark.sql.catalog.hadoop_prod.warehouse hdfs://localhost:8020/data_lake/iceberg
请按以下步骤操作。
(1) 在my_db命名空间中创建一个名为logtable的Iceberg表,这个表最初是按事件月份进行分区的,代码如下:
// 在名为my_db的名称空间中创建表(logtable)
spark.sql("""
CREATE TABLE hadoop_prod.my_db.logtable (
ts timestamp,
log_id bigint,
log_msg string
)
USING iceberg
PARTITIONED BY (months(ts))
""")
如果没有my_db命名空间,则先创建它,代码如下:
spark.sql("crete namespace hadoop_prod.my_db")
执行以上代码,然后在终端窗口中查看logtable对应的物理结构,shell命令如下:
$ hdfs dfs -ls -R /data_lake/iceberg/my_db/logtable
可以看到hadoop_prod.my_db.logtable对应的物理存储结构如图11-39所示。
在每个Iceberg表文件夹中,都有一个元数据文件夹(metadata)和一个数据文件夹(data)。元数据文件夹(metadata)包含关于分区规范、它们的惟一ID的信息,以及使用适当的分区规范ID连接各个数据文件的清单。数据文件夹(data)包含构成整个Iceberg表的所有表数据文件。在上面的图中我们只看到metadata元数据目录,而没有data目录,是因为刚创建的空表中还没有任何数据。另外,v1.metadata.json是元数据文件,而version-hint.text文件中则标记了当前的元数据版本号。可以查看当前的版本信息,代码如下:
// 查看当前的版本提示
spark.read.text("/data_lake/iceberg/my_db/logtable/metadata/version-hint.text").show()
执行上面的代码,输出内容包含下一个快照的ID,如下所示。
+-----+ |value| +-----+ | 1| +-----+
再查看一下v1.metadata.json文件的内容,代码如下:
spark
.read
.text("/data_lake/iceberg/my_db/logtable/metadata/v1.metadata.json")
.show(100,false)
下面显示了其中的部分内容(请注意其中粗体字部分):
...
"partition-spec" : [ {
"name" : "ts_month",
"transform" : "month",
"source-id" : 1,
"field-id" : 1000
} ],
"default-spec-id" : 0,
"partition-specs" : [ {
"spec-id" : 0,
"fields" : [ {
"name" : "ts_month",
"transform" : "month",
"source-id" : 1,
"field-id" : 1000
} ]
} ],
...
"current-snapshot-id" : -1,
"snapshots" : [ ],
"snapshot-log" : [ ],
"metadata-log" : [ ]
可以看出,当前的分区规范指定是通过month转换按月分区的。另外当前的快照ID是-1。
(2) 然后将数据添加到表中。在下面的命令中,只添加时间戳为2009年1月1日之前的数据,模拟了示例中的场景。当写入表时,确保在分区列上排序数据(如下所示),代码如下:
// 首先读取数据源到DataFrame中
import org.apache.spark.sql.types._
// 数据文件路径
val filePath = "/data/data_lake/iceberg/logdata.csv"
// 指定schema中的字段及类型
val fields = Array(
StructField("ts", LongType, true),
StructField("log_id", IntegerType, true),
StructField("log_msg", StringType, true)
)
// 创建schema
val schema = StructType(fields)
// 加载数据到DataFrame中
val data = spark
.read
.option("delimiter", ",")
.option("header", "false")
.option("charset", "UTF8")
.schema(schema)
.csv(filePath)
// data.printSchema()
// data.show()
// 将ts字段转为时间戳,并选择2009年1月1日之前的数据,写入iceberg表中
data.where(col("ts").lt(1230768000L))
.select(
col("ts").cast(DataTypes.TimestampType),
col("log_id"),
col("log_msg"))
.sort(col("ts"))
.repartition(1)
.write
.format("iceberg")
.mode("overwrite")
.save("hadoop_prod.my_db.logtable")
以上命令执行完毕后,在终端窗口中执行以下shell命令查看logtable对应的物理结构:
$ hdfs dfs -ls -R /data_lake/iceberg/my_db/logtable
可以看到现在hadoop_prod.my_db.logtable对应的物理存储结构如图11-40所示。
从上图中可以看出,现在多了一个data文件夹,其中包含了两个分区文件夹ts_month=2008-11和ts_month=2008_12,在这两个分区文件夹下是新增加的parquet格式的数据文件。在metadata元数据文件夹下,多了几个新的文件,其中/metadata/v2.metadata.json文件是当前的元数据文件,它包含了对/metadata/snap-*.avro文件的引用(该文件是清单列表文件),而/metadata/61224930-*.avro是清单文件。
注意: 在每个Iceberg表文件夹中,都有一个元数据文件夹(metadat)和一个数据文件夹(data)。元数据文件夹包含关于分区规范、它们的惟一ID的信息,以及使用适当的分区规范ID连接各个数据文件的清单。数据文件夹包含构成整个Iceberg表的所有表数据文件。当向带有分区的表写入数据时,Iceberg会在数据文件夹中创建多个文件夹。每个分区都用分区描述和值命名。例如,一个以time为标题并以month为分区的列将有文件夹time_month=2008-11、time_month=2008-12,等等。我们将在下面的例子中看到这一点。在多个列上分区的数据创建多个分层的文件夹,每个顶级文件夹包含一个或多个用于每个二级分区值的子文件夹。
查看元数据文件v2.metadata.json,其中部分内容如下:
...
"current-snapshot-id" : 6579228024748445391,
"snapshots" : {
"snapshot-id" : 6579228024748445391,
"timestamp-ms" : 1647324680804,
"summary" :
"operation" : "append",
"spark.app.id" : "app-20220315124150-0001",
...
},
"manifest-list" : "hdfs://192.168.190.133:8020/data_lake/iceberg/my_db/logtable/metadata/snap-6579228024748445391-1-61224930-1e9e-424f-b089-4f0ddbdf0cd7.avro",|
"schema-id" : 0
} ],
"snapshot-log" : {
"timestamp-ms" : 1647324680804,
"snapshot-id" : 6579228024748445391
} ],
"metadata-log" : {
"timestamp-ms" : 1647324555320,
"metadata-file" : "hdfs://192.168.190.133:8020/data_lake/iceberg/my_db/logtable/metadata/v1.metadata.json"
}
这时查看表中的数据,代码如下:
spark.table("hadoop_prod.my_db.logtable").show()
可以看到数据表中仅包含了2008年的数据,内容如下:
+-------------------+------+-------+ | ts|log_id|log_msg| +-------------------+------+-------+ |2008-11-01 16:00:00| 1| a| |2008-11-03 16:00:00| 2| b| |2008-11-04 16:00:00| 3| c| |2008-11-12 16:00:00| 4| d| |2008-11-17 16:00:00| 5| e| |2008-11-18 16:00:00| 6| f| |2008-11-24 16:00:00| 7| g| |2008-11-27 16:00:00| 8| h| |2008-11-30 16:00:00| 9| i| |2008-12-01 16:00:00| 10| j| |2008-12-04 16:00:00| 11| k| |2008-12-09 16:00:00| 12| l| |2008-12-11 16:00:00| 13| m| |2008-12-19 16:00:00| 14| n| |2008-12-23 16:00:00| 15| o| |2008-12-26 16:00:00| 16| p| |2008-12-28 16:00:00| 17| q| |2008-12-30 16:00:00| 18| r| |2008-12-31 16:00:00| 19| s| +-------------------+------+-------+
下面应用条件查询,只查询2008年12月份的数据(注意,between...and...包含的范围)。代码如下:
val sql_select = """
select ts, log_id, log_msg
from hadoop_prod.my_db.logtable
where ts between '2008-12-01' and '2009-01-01'
order by ts
"""
spark.sql(sql_select).show()
执行以上代码,输出内容如下:
+-------------------+------+-------+ | ts|log_id|log_msg| +-------------------+------+-------+ |2008-12-01 16:00:00| 10| j| |2008-12-04 16:00:00| 11| k| |2008-12-09 16:00:00| 12| l| |2008-12-11 16:00:00| 13| m| |2008-12-19 16:00:00| 14| n| |2008-12-23 16:00:00| 15| o| |2008-12-26 16:00:00| 16| p| |2008-12-28 16:00:00| 17| q| |2008-12-30 16:00:00| 18| r| |2008-12-31 16:00:00| 19| s| +-------------------+------+-------+
(3) 修改表的分区规范,将按月分区修改为按天分区。代码如下:
spark.sql("ALTER TABLE hadoop_prod.my_db.logtable ADD PARTITION FIELD days(ts)")
(4) 手动向表中添加新的日志记录。在这个写操作中,只添加在2009年1月1日或之后发生的日志事件数据。代码如下:
data.where(col("ts").gt(1230768000L))
.select(
col("ts").cast(DataTypes.TimestampType),
col("log_id"),
col("log_msg")
)
.sort(col("ts"))
.writeTo("hadoop_prod.my_db.logtable")
.overwritePartitions()
(5) 正如在代码中看到的,在Iceberg中进行分区演变后,不需要重写整个表。这时如果查看logtable的data文件夹,将看到Iceberg已经根据分区值对数据文件进行了组织—2009年1月1日之前的时间戳是按month组织的;该日期和之后的时间戳按day进行组织,命令如下:
$ hdfs dfs -ls -R /data_lake/iceberg/my_db/logtable/data
结果如图11-41所示。
该公司现在想要查询在跨2008年12月份和2009年1月份期间发生的所有日志事件。查询将跨越多个分区布局,但仍然无缝地工作,而不需要用户指定任何额外的信息或知道任何关于表的分区,代码如下:
spark.sql("""
SELECT *
FROM hadoop_prod.my_db.logtable
WHERE ts > '2008-12-14' AND ts < '2009-1-14' """).show()
查询的结果如下:
+-------------------+------+-------+ | ts|log_id|log_msg| +-------------------+------+-------+ |2008-12-19 16:00:00| 14| n| |2008-12-23 16:00:00| 15| o| |2008-12-26 16:00:00| 16| p| |2008-12-28 16:00:00| 17| q| |2008-12-30 16:00:00| 18| r| |2008-12-31 16:00:00| 19| s| |2009-01-01 16:00:00| 20| t| |2009-01-03 16:00:00| 21| u| |2009-01-12 16:00:00| 22| v| +-------------------+------+-------+
总的来说,Iceberg为分区和分区演变提供了大量的功能。所执行的大多数分区测试的工作情况与Iceberg文档中所声明的完全一样。