发布日期:2022-10-25 VIP内容

Iceberg架构

有了前面对Apache Iceberg的初步了解,接下来深入了解一下它的架构体系。

Iceberg表的结构有三层,分别是:

  • (1) Iceberg Catalog(目录层)。
  • (2) metadata layer(元数据层),包含:元数据文件(metadata file);清单列表(manifest list);清单文件(manifest file)。
  • (3) data layer(数据层)。

下面是一张Iceberg表结构的结构图,如图11-26所示:

Iceberg Catalog

读取Iceberg表的第一步是找到该表当前元数据指针的位置。在这个可以找到当前元数据指针的当前位置的中心位置,就是Iceberg catalog。

Iceberg Catalog的主要需求是它必须支持更新当前元数据指针的原子操作(例如,HDFS, Hive Metastore, Nessie)。这就是允许Iceberg表上的事务是原子的并提供正确性保证的原因。

在catalog目录中,每个表都有一个指向该表当前元数据文件的引用或指针。例如,在图11-27中,有两个元数据文件。目录中表的当前元数据指针的值是右侧元数据文件的位置。

这些数据的形式取决于使用的是什么Iceberg目录:

  • (1) 使用HDFS作为catalog目录,在表的元数据文件夹中有一个名为version-hint.text的文件,其内容是当前元数据文件的版本号。
  • (2) 使用Hive metastore作为catalog目录,metastore中的表条目有一个表属性,用来存储当前元数据文件的位置。
  • (3) 以Nessie为catalog目录,Nessie存储了该表当前元数据文件的位置。

因此,当一个SELECT查询读取一个Iceberg表时,查询引擎首先进入Iceberg Catalog,然后检索它要读取的表的当前元数据文件所在位置的条目,然后打开该文件。

元数据文件

Iceberg使用JSON文件跟踪表元数据。对表的每次更改都会生成一个新的元数据文件(metadata file),以提供原子性。默认情况下,旧的元数据文件将作为历史记录保存。如图11-28所示。

顾名思义,元数据文件存储关于表的元数据。这包括关于表的模式、分区信息、快照以及哪个快照是当前快照的信息。图11-28中的例子是为了演示的目的而删节的,一个元数据文件完整内容的例子,内容如下(v3.metadata.json):

{
    "format-version" : 1,
    "table-uuid" : "4b96b6e8-9838-48df-a111-ec1ff6422816",
    "location" : "/home/hadoop/warehouse/db2/part_table2",
    "last-updated-ms" : 1611694436618,
    "last-column-id" : 3,
    "schema" : {
        "type" : "struct",
        "fields" : [ {
            "id" : 1,
            "name" : "id",
            "required" : true,
            "type" : "int"
        }, {
            "id" : 2,
            "name" : "ts",
            "required" : false,
            "type" : "timestamptz"
        }, {
            "id" : 3,
            "name" : "message",
            "required" : false,
            "type" : "string"
        } ]
    },
    "partition-spec" : [ {
        "name" : "ts_hour",
        "transform" : "hour",
        "source-id" : 2,
        "field-id" : 1000
    } ],
    "default-spec-id" : 0,
    "partition-specs" : [ {
        "spec-id" : 0,
        "fields" : [ {
            "name" : "ts_hour",
            "transform" : "hour",
            "source-id" : 2,
            "field-id" : 1000
        } ]
    } ],
    "default-sort-order-id" : 0,
    "sort-orders" : [ {
        "order-id" : 0,
        "fields" : [ ]
    } ],
    "properties" : {
        "owner" : "hadoop"
    },
    "current-snapshot-id" : 1257424822184505371,
    "snapshots" : [ {
        "snapshot-id" : 8271497753230544300,
        "timestamp-ms" : 1611694406483,
        "summary" : {
            "operation" : "append",
            "spark.app.id" : "application_1611687743277_0002",
            "added-data-files" : "1",
            "added-records" : "1",
            "added-files-size" : "960",
            "changed-partition-count" : "1",
            "total-records" : "1",
            "total-data-files" : "1",
            "total-delete-files" : "0",
            "total-position-deletes" : "0",
            "total-equality-deletes" : "0"
        },
        "manifest-list" : "/home/hadoop/warehouse/db2/part_table2/metadata/snap-8271497753230544300-1-d8a778f9-ad19-4e9c-88ff-28f49ec939fa.avro"
    }, 
    {
        "snapshot-id" : 1257424822184505371,
        "parent-snapshot-id" : 8271497753230544300,
        "timestamp-ms" : 1611694436618,
        "summary" : {
            "operation" : "append",
            "spark.app.id" : "application_1611687743277_0002",
            "added-data-files" : "1",
            "added-records" : "1",
            "added-files-size" : "973",
            "changed-partition-count" : "1",
            "total-records" : "2",
            "total-data-files" : "2",
            "total-delete-files" : "0",
            "total-position-deletes" : "0",
            "total-equality-deletes" : "0"
        },
        "manifest-list" : "/home/hadoop/warehouse/db2/part_table2/metadata/snap-1257424822184505371-1-eab8490b-8d16-4eb1-ba9e-0dede788ff08.avro"
    } ],
    "snapshot-log" : [ {
        "timestamp-ms" : 1611694406483,
        "snapshot-id" : 8271497753230544300
    }, 
    {
        "timestamp-ms" : 1611694436618,
        "snapshot-id" : 1257424822184505371
    } ],
    "metadata-log" : [ {
        "timestamp-ms" : 1611694097253,
        "metadata-file" : "/home/hadoop/warehouse/db2/part_table2/metadata/v1.metadata.json"
    }, 
    {
        "timestamp-ms" : 1611694406483,
        "metadata-file" : "/home/hadoop/warehouse/db2/part_table2/metadata/v2.metadata.json"
    } ]
}

当一个SELECT查询读取一个Iceberg表,并在从catalog目录中的表条目获取其位置后打开其当前元数据文件时,查询引擎然后读取current-snapshot-id的值。然后,它使用这个值在snapshots数组中查找快照的条目,然后检索该快照的manifest-list条目的值,并打开位置指向的清单列表(manifest list)。

清单列表(manifest list)

清单列表( manifest list)是清单文件( manifest files)的列表。清单列表包含了组成该快照的每个清单文件的信息,例如清单文件的位置、作为其中一部分添加的快照、关于它所属分区的信息以及它所跟踪的数据文件的分区列的下界和上界。如图11-29所示。

一个清单列表文件的完整内容如下(为了友好显示,将avro格式的文件转为转为JSON格式显示,snap-1257424822184505371-1-eab8490b-8d16-4eb1-ba9e-0dede788ff08.avro):

{
    "manifest_path": "/home/hadoop/warehouse/db2/part_table2/metadata/eab8490b-8d16-4eb1-ba9e-0dede788ff08-m0.avro",
    "manifest_length": 4884,
    "partition_spec_id": 0,
    "added_snapshot_id": {
        "long": 1257424822184505300
    },
    "added_data_files_count": {
        "int": 1
    },
    "existing_data_files_count": {
        "int": 0
    },
    "deleted_data_files_count": {
        "int": 0
    },
    "partitions": {
        "array": [ {
            "contains_null": false,
            "lower_bound": {
                "bytes": "¹Ô\\u0006\\u0000"
            },
            "upper_bound": {
                "bytes": "¹Ô\\u0006\\u0000"
            }
        } ]
    },
    "added_rows_count": {
        "long": 1
    },
    "existing_rows_count": {
        "long": 0
    },
    "deleted_rows_count": {
        "long": 0
    }
}
{
    "manifest_path": "/home/hadoop/warehouse/db2/part_table2/metadata/d8a778f9-ad19-4e9c-88ff-28f49ec939fa-m0.avro",
    "manifest_length": 4884,
    "partition_spec_id": 0,
    "added_snapshot_id": {
        "long": 8271497753230544000
    },
    "added_data_files_count": {
        "int": 1
    },
    "existing_data_files_count": {
        "int": 0
    },
    "deleted_data_files_count": {
        "int": 0
    },
    "partitions": {
        "array": [ {
            "contains_null": false,
            "lower_bound": {
                "bytes": "¸Ô\\u0006\\u0000"
            },
            "upper_bound": {
                "bytes": "¸Ô\\u0006\\u0000"
            }
        } ]
    },
    "added_rows_count": {
        "long": 1
    },
    "existing_rows_count": {
        "long": 0
    },
    "deleted_rows_count": {
        "long": 0
    }
}

当SELECT查询读取Iceberg表并在从元数据文件中获取快照的位置后为快照打开清单列表时,查询引擎然后读取manifest-path条目的值,并打开清单文件。它还可以在这个阶段进行一些优化,比如使用行计数或使用分区信息过滤数据。

清单文件(manifest file)

清单文件跟踪数据文件以及关于每个文件的附加细节和统计信息,从而实现文件级跟踪数据。每个清单文件都跟踪数据文件的一个子集,以实现并行性和大规模重用效率。如图11-30所示。

清单文件包含许多有用的信息,用于在从这些数据文件中读取数据时提高效率和性能,比如关于分区成员、记录计数以及列的下限和上限的详细信息。这些统计数据是在写入操作期间为每个清单的数据文件子集写入的,因此比Hive中的统计数据更可能存在、更准确、更及时。Iceberg是文件格式不可知的,因此清单文件还指定了数据文件的文件格式,比如Parquet、ORC或Avro。

一个清单文件的完整内容如下(为了可读性,转换为JSON格式显示):

{
    "status": 1,
    "snapshot_id": {
        "long": 1257424822184505300
    },
    "data_file": {
        "file_path": "/home/hadoop/warehouse/db2/part_table2/data/ts_hour=2021-01-26-01/00000-6-7c6cf3c0-8090-4f15-a4cc-3a3a562eed7b-00001.parquet",
        "file_format": "PARQUET",
        "partition": {
            "ts_hour": {
                "int": 447673
            }
        },
        "record_count": 1,
        "file_size_in_bytes": 973,
        "block_size_in_bytes": 67108864,
        "column_sizes": {
            "array": [ {
                "key": 1,
                "value": 47
            },
            {
                "key": 2,
                "value": 57
            },
            {
                "key": 3,
                "value": 60
            } ]
        },
        "value_counts": {
            "array": [ {
                "key": 1,
                "value": 1
            },
            {
                "key": 2,
                "value": 1
            },
            {
                "key": 3,
                "value": 1
            } ]
        },
        "null_value_counts": {
            "array": [ {
                "key": 1,
                "value": 0
            },
            {
                "key": 2,
                "value": 0
            },
            {
                "key": 3,
                "value": 0
            } ]
        },
        "lower_bounds": {
            "array": [ {
                "key": 1,
                "value": "\\u0002\\u0000\\u0000\\u0000"
            },
            {
                "key": 2,
                "value": "\\u0000„ ,ù\\u0005\\u0000"
            },
            {
                "key": 3,
                "value": "test message 2"
            } ]
        },
        "upper_bounds": {
            "array": [ {
                "key": 1,
                "value": "\\u0002\\u0000\\u0000\\u0000"
            },
            {
                "key": 2,
                "value": "\\u0000„ ,ù\\u0005\\u0000"
            },
            {
                "key": 3,
                "value": "test message 2"
            } ]
        },
        "key_metadata": null,
        "split_offsets": {
            "array": [
                4
            ]
        }
    }
}

当SELECT查询读取Iceberg表并在从清单列表中获取清单文件的位置后打开一个清单文件时,查询引擎然后读取每个data-file对象的file-path条目的值,并打开该数据文件。它还可以在这个阶段进行一些优化,比如使用行计数或使用分区或列统计信息过滤数据。