发布日期:2022-10-26 VIP内容

分区演变示例

本节通过一个应用示例来深入理解Iceberg的分区演变及其底层实现过程。该应用示例模拟了如下的业务场景:

某公司(暂且称为X公司)从2008年开始,开发软件业务,拓展客户和市场。其日志事件数据是按月分区存储在Iceberg数据湖中。随着业务的发展,日志事件信息发生的越来越频繁。公司决定,从2009年开始,将日志事件按天进行分区存储。为此,开发如下代码来完成这个场景。

为了简单起见,本示例使用一个简单的数据集logdata.csv,它模仿X公司开发的某些软件产品的日志表,包含三个字段列,分别为ts、log_id和log_msg。注意,数据中的ts列显示为与UNIX时间戳(以秒为单位)对应的long数据类型。

数据集logdata.csv文件内容如下:

1225526400,1,a
1225699200,2,b
1225785600,3,c
1226476800,4,d
1226908800,5,e
1226995200,6,f
1227513600,7,g
1227772800,8,h
1228032000,9,i
1228118400,10,j
1228377600,11,k
1228809600,12,l
1228982400,13,m
1229673600,14,n
1230019200,15,o
1230278400,16,p
1230451200,17,q
1230624000,18,r
1230710400,19,s
1230796800,20,t
1230969600,21,u
1231747200,22,v
1232352000,23,w	
1232784000,24,x
1233216000,25,y
1233302400,26,z

继续使用HadoopCatalog类型的catalog,基于Hadoop路径。名为hadoop_prod的catalog配置如下:

%spark.conf
spark.sql.extensions            org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.sql.catalog.hadoop_prod               org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hadoop_prod.type           hadoop
spark.sql.catalog.hadoop_prod.warehouse     hdfs://localhost:8020/data_lake/iceberg

请按以下步骤操作。

(1) 在my_db命名空间中创建一个名为logtable的Iceberg表,这个表最初是按事件月份进行分区的,代码如下:

// 在名为my_db的名称空间中创建表(logtable)
spark.sql("""
    CREATE TABLE hadoop_prod.my_db.logtable (
        ts         timestamp,
        log_id     bigint,
        log_msg    string
    )
    USING iceberg
    PARTITIONED BY (months(ts))
""")

如果没有my_db命名空间,则先创建它,代码如下:

spark.sql("crete namespace hadoop_prod.my_db")

执行以上代码,然后在终端窗口中查看logtable对应的物理结构,shell命令如下:

$ hdfs dfs -ls -R  /data_lake/iceberg/my_db/logtable

可以看到hadoop_prod.my_db.logtable对应的物理存储结构如图11-39所示。

在每个Iceberg表文件夹中,都有一个元数据文件夹(metadata)和一个数据文件夹(data)。元数据文件夹(metadata)包含关于分区规范、它们的惟一ID的信息,以及使用适当的分区规范ID连接各个数据文件的清单。数据文件夹(data)包含构成整个Iceberg表的所有表数据文件。在上面的图中我们只看到metadata元数据目录,而没有data目录,是因为刚创建的空表中还没有任何数据。另外,v1.metadata.json是元数据文件,而version-hint.text文件中则标记了当前的元数据版本号。可以查看当前的版本信息,代码如下:

// 查看当前的版本提示
spark.read.text("/data_lake/iceberg/my_db/logtable/metadata/version-hint.text").show()

执行上面的代码,输出内容包含下一个快照的ID,如下所示。

+-----+
|value|
+-----+
|     1|
+-----+

再查看一下v1.metadata.json文件的内容,代码如下:

spark
	.read
	.text("/data_lake/iceberg/my_db/logtable/metadata/v1.metadata.json")
	.show(100,false) 

下面显示了其中的部分内容(请注意其中粗体字部分):

...
 "partition-spec" : [ {                                                      
    "name" : "ts_month",                                                      
    "transform" : "month",                                                    
    "source-id" : 1,                                                          
    "field-id" : 1000                                                         
  } ],                                                                        
  "default-spec-id" : 0,                                                      
  "partition-specs" : [ {                                                     
    "spec-id" : 0,                                                            
    "fields" : [ {                                                            
      "name" : "ts_month",                                                    
      "transform" : "month",                                                  
      "source-id" : 1,                                                        
      "field-id" : 1000                                                       
    } ]                                                                       
  } ],                                                                        
  ...                                                                         
  "current-snapshot-id" : -1,                                                 
  "snapshots" : [ ],                                                          
  "snapshot-log" : [ ],                                                       
  "metadata-log" : [ ]   

可以看出,当前的分区规范指定是通过month转换按月分区的。另外当前的快照ID是-1。

(2) 然后将数据添加到表中。在下面的命令中,只添加时间戳为2009年1月1日之前的数据,模拟了示例中的场景。当写入表时,确保在分区列上排序数据(如下所示),代码如下:

// 首先读取数据源到DataFrame中
import org.apache.spark.sql.types._

// 数据文件路径
val filePath = "/data/data_lake/iceberg/logdata.csv"

// 指定schema中的字段及类型
val fields = Array(
                    StructField("ts", LongType, true),
                    StructField("log_id", IntegerType, true),
                    StructField("log_msg", StringType, true)
                )
// 创建schema
val schema = StructType(fields)

// 加载数据到DataFrame中
val data = spark
    .read
    .option("delimiter", ",")
    .option("header", "false")
    .option("charset", "UTF8")
    .schema(schema)
    .csv(filePath)

// data.printSchema()
// data.show()

// 将ts字段转为时间戳,并选择2009年1月1日之前的数据,写入iceberg表中
data.where(col("ts").lt(1230768000L))
    .select(
        col("ts").cast(DataTypes.TimestampType), 
        col("log_id"), 
        col("log_msg"))
    .sort(col("ts"))
    .repartition(1)
    .write
    .format("iceberg")
    .mode("overwrite")
    .save("hadoop_prod.my_db.logtable")

以上命令执行完毕后,在终端窗口中执行以下shell命令查看logtable对应的物理结构:

$ hdfs dfs -ls -R  /data_lake/iceberg/my_db/logtable

可以看到现在hadoop_prod.my_db.logtable对应的物理存储结构如图11-40所示。

从上图中可以看出,现在多了一个data文件夹,其中包含了两个分区文件夹ts_month=2008-11和ts_month=2008_12,在这两个分区文件夹下是新增加的parquet格式的数据文件。在metadata元数据文件夹下,多了几个新的文件,其中/metadata/v2.metadata.json文件是当前的元数据文件,它包含了对/metadata/snap-*.avro文件的引用(该文件是清单列表文件),而/metadata/61224930-*.avro是清单文件。

注意: 在每个Iceberg表文件夹中,都有一个元数据文件夹(metadat)和一个数据文件夹(data)。元数据文件夹包含关于分区规范、它们的惟一ID的信息,以及使用适当的分区规范ID连接各个数据文件的清单。数据文件夹包含构成整个Iceberg表的所有表数据文件。当向带有分区的表写入数据时,Iceberg会在数据文件夹中创建多个文件夹。每个分区都用分区描述和值命名。例如,一个以time为标题并以month为分区的列将有文件夹time_month=2008-11、time_month=2008-12,等等。我们将在下面的例子中看到这一点。在多个列上分区的数据创建多个分层的文件夹,每个顶级文件夹包含一个或多个用于每个二级分区值的子文件夹。

查看元数据文件v2.metadata.json,其中部分内容如下:

 ...
  "current-snapshot-id" : 6579228024748445391,        
  "snapshots" :  {                                                                                                   
    "snapshot-id" : 6579228024748445391,                                                                                                                           
    "timestamp-ms" : 1647324680804,                                                                                                                                
    "summary" :                                                                                                                                                   
      "operation" : "append",                                                                                                                                      
      "spark.app.id" : "app-20220315124150-0001",                                                                                                                  
      ...                                                                                                                             
    },                                                                                                                                                             
    "manifest-list" : "hdfs://192.168.190.133:8020/data_lake/iceberg/my_db/logtable/metadata/snap-6579228024748445391-1-61224930-1e9e-424f-b089-4f0ddbdf0cd7.avro",|
    "schema-id" : 0                                                                                                                                                
  } ],                                                                                                                                                             
  "snapshot-log" :  {                                                                                                   
    "timestamp-ms" : 1647324680804,                                                                                                                                
    "snapshot-id" : 6579228024748445391                                                                                                                            
  } ],                                                                                                                                                             
  "metadata-log" :  {                                  
    "timestamp-ms" : 1647324555320,                                                                                                                                
    "metadata-file" : "hdfs://192.168.190.133:8020/data_lake/iceberg/my_db/logtable/metadata/v1.metadata.json"                                                     
  } 

这时查看表中的数据,代码如下:

spark.table("hadoop_prod.my_db.logtable").show()

可以看到数据表中仅包含了2008年的数据,内容如下:

+-------------------+------+-------+
|                    ts|log_id|log_msg|
+-------------------+------+-------+
|2008-11-01 16:00:00|      1|       a|
|2008-11-03 16:00:00|      2|       b|
|2008-11-04 16:00:00|      3|       c|
|2008-11-12 16:00:00|      4|       d|
|2008-11-17 16:00:00|      5|       e|
|2008-11-18 16:00:00|      6|       f|
|2008-11-24 16:00:00|      7|       g|
|2008-11-27 16:00:00|      8|       h|
|2008-11-30 16:00:00|      9|       i|
|2008-12-01 16:00:00|     10|       j|
|2008-12-04 16:00:00|     11|       k|
|2008-12-09 16:00:00|     12|       l|
|2008-12-11 16:00:00|     13|       m|
|2008-12-19 16:00:00|     14|       n|
|2008-12-23 16:00:00|     15|       o|
|2008-12-26 16:00:00|     16|       p|
|2008-12-28 16:00:00|     17|       q|
|2008-12-30 16:00:00|     18|       r|
|2008-12-31 16:00:00|     19|       s|
+-------------------+------+-------+

下面应用条件查询,只查询2008年12月份的数据(注意,between...and...包含的范围)。代码如下:

val sql_select = """
        select ts, log_id, log_msg
        from hadoop_prod.my_db.logtable
        where ts between '2008-12-01' and '2009-01-01'
        order by ts
    """
spark.sql(sql_select).show()

执行以上代码,输出内容如下:

+-------------------+------+-------+
|                    ts|log_id|log_msg|
+-------------------+------+-------+
|2008-12-01 16:00:00|     10|       j|
|2008-12-04 16:00:00|     11|       k|
|2008-12-09 16:00:00|     12|       l|
|2008-12-11 16:00:00|     13|       m|
|2008-12-19 16:00:00|     14|       n|
|2008-12-23 16:00:00|     15|       o|
|2008-12-26 16:00:00|     16|       p|
|2008-12-28 16:00:00|     17|       q|
|2008-12-30 16:00:00|     18|       r|
|2008-12-31 16:00:00|     19|       s|
+-------------------+------+-------+

(3) 修改表的分区规范,将按月分区修改为按天分区。代码如下:

spark.sql("ALTER TABLE hadoop_prod.my_db.logtable ADD PARTITION FIELD days(ts)")

(4) 手动向表中添加新的日志记录。在这个写操作中,只添加在2009年1月1日或之后发生的日志事件数据。代码如下:

data.where(col("ts").gt(1230768000L))
    .select(
        col("ts").cast(DataTypes.TimestampType), 
        col("log_id"), 
        col("log_msg")
    )
    .sort(col("ts"))
    .writeTo("hadoop_prod.my_db.logtable")
    .overwritePartitions()

(5) 正如在代码中看到的,在Iceberg中进行分区演变后,不需要重写整个表。这时如果查看logtable的data文件夹,将看到Iceberg已经根据分区值对数据文件进行了组织—2009年1月1日之前的时间戳是按month组织的;该日期和之后的时间戳按day进行组织,命令如下:

$ hdfs dfs -ls -R /data_lake/iceberg/my_db/logtable/data

结果如图11-41所示。

该公司现在想要查询在跨2008年12月份和2009年1月份期间发生的所有日志事件。查询将跨越多个分区布局,但仍然无缝地工作,而不需要用户指定任何额外的信息或知道任何关于表的分区,代码如下:

spark.sql("""
    SELECT * 
    FROM hadoop_prod.my_db.logtable 
    WHERE ts > '2008-12-14' AND ts < '2009-1-14' """).show() 

查询的结果如下:

+-------------------+------+-------+
|                    ts|log_id|log_msg|
+-------------------+------+-------+
|2008-12-19 16:00:00|     14|       n|
|2008-12-23 16:00:00|     15|       o|
|2008-12-26 16:00:00|     16|       p|
|2008-12-28 16:00:00|     17|       q|
|2008-12-30 16:00:00|     18|       r|
|2008-12-31 16:00:00|     19|       s|
|2009-01-01 16:00:00|     20|       t|
|2009-01-03 16:00:00|     21|       u|
|2009-01-12 16:00:00|     22|       v|
+-------------------+------+-------+

总的来说,Iceberg为分区和分区演变提供了大量的功能。所执行的大多数分区测试的工作情况与Iceberg文档中所声明的完全一样。