发布日期:2022-10-19 VIP内容

配置和使用Catalog

要在Apache Spark中使用Iceberg,首先要配置catalogs(目录)。那么,catalog是什么?

Catalog是Iceberg对表进行管理(create、drop、rename等)的一个组件。通过Iceberg附带的catalog目录,我们可以使用SQL命令管理表并按名称加载表。通过设置spark.sql.catalog下的Spark属性来配置Spark catalogs。Catalog名称在SQL查询中用于标识一个表。

配置Catalog

目前Iceberg支持两种catalog,分别是HiveCatalog和HadoopCatalog。其中HiveCatalog将当前表的元数据文件路径存储在Hive Metastore,因为这个表元数据文件是所有读写Iceberg表的入口,所以每次读写Iceberg表都需要先从Hive Metastore中取出对应的表元数据文件路径,然后再解析这个元数据文件以进行接下来的操作。而HadoopCatalog是将当前表元数据文件路径记录在一个文件目录下,因此不需要连接Hive MetaStore。

Catalog(目录)的创建和命名是通过添加Spark属性spark.sql.catalog.(catalog-name),其值为实现类。目前Iceberg支持两个实现:

(1) org.apache.iceberg.spark.SparkCatalog:支持Hive Metastore或Hadoop目录作为catalog。

(2) org.apache.iceberg.spark.SparkSessionCatalog:将对Iceberg表的支持添加到Spark的内置catalog中,并将非Icegerg表委托给内置catalog。

例如,创建一个名为hive_prod的Iceberg catalog目录,并指定表元数据文件存储在Hive Metastore中(即采用HiveCatalog类型的catalog),这是通过type=hive配置的。代码如下:

spark.sql.catalog.hive_prod = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hive_prod.type = hive
spark.sql.catalog.hive_prod.uri = thrift://metastore-host:port

注意: 基于Hive的catalog只加载Iceberg表。要加载同一Hive Metastore中的非Iceberg表,将实现切换为SparkSessionCatalog。

而下面的配置创建一个名为hadoop_prod的Iceberg catalog目录,并指定表元数据文件存储在HDFS的指定目录中(即采用HadoopCatalog类型的catalog),这是通过type=hadoop配置的:

spark.sql.catalog.hadoop_prod = org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hadoop_prod.type = hadoop
spark.sql.catalog.hadoop_prod.warehouse = hdfs://localhost:8020/warehouse/path

需要注意的是,HiveCatalog和HadoopCatalog不能混合使用。也就是说,用HiveCatalog创建的表不能用HadoopCatalog正确加载,反之亦然。

那么,在哪里配置Spark catalogs?

如果使用的是Spark shell,那么可以在执行spark-shell命令时进行配置,命令如下:

$ spark-shell --packages org.apache.iceberg:iceberg-spark-runtime-3.1_2.12:0.13.1 \
    --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
    --conf spark.sql.catalog.hadoop_prod = org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.hadoop_prod.type=hadoop \
    --conf spark.sql.catalog.hadoop_prod.warehouse=hdfs://localhost:8020/data_lake/iceberg

上面这个命令创建了一个名为hadoop_prod的catalog,它是基于Hadoop路径的目录。在该catalog下创建的所有数据库和表都将位于该目录下。

如果是在Zeppelin Notebook中,则可以使用%spark.conf来进行配置。例如,我们要配置一个名为hive_prod的catalog,它是基于Hive Metastore的类型。配置命令如下:

%spark.conf
spark.jars.packages         		iceberg-spark-runtime-3.1_2.12:0.13.1
spark.sql.extensions         		org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.sql.catalog.hive_prod   		org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hive_prod.type 	hive
spark.sql.catalog.hive_prod.uri 	thrift://localhost:9083

使用Catalog

在SQL查询中使用catalog名称来标识表。例如,在上一节配置的hadoop_prod这个catalog中有一个名为my_db的数据库,在my_db数据库中有一个名为my_tb的表。那么要查询该表中的数据,可以使用hadoop_prod作为前缀来标识,代码如下:

spark.sql("SELECT * FROM hadoop_prod.my_db.my_tb")

Spark 3会跟踪当前的catalog和数据库,所以在当前catalog和数据库中引用表时,可以省略catalog和数据库名,代码如下:

spark.sql("USE hadoop_prod.my_db")
spark.sql("SELECT * FROM my_tb") 

要查看当前的catalog和数据库,代码如下:

spark.sql("SHOW CURRENT NAMESPACE")

替换Session Catalog

要向Apache Spark的内置目录添加Iceberg表支持,以及配置catalog使用Iceberg的SparkSessionCatalog,配置内容如下:

spark.sql.catalog.hive_prod = org.apache.iceberg.spark.SparkSessionCatalog
spark.sql.catalog.hive_prod.type = hive

Apache Spark内置的catalog支持在Hive Metastore中跟踪现有的v1和v2表。这将配置Spark使用Iceberg的SparkSessionCatalog作为该session catalog的包装器。当表不是Iceberg表时,将使用内置catalog来加载它。

这个配置可以对Iceberg表和非Iceberg表使用相同的Hive Metastore。

运行时配置

可以在运行时对Spark 3读写Iceberg进行动态配置。

1. 读选项配置

Spark read options选项是在配置DataFrameReader时被传递的,代码如下:

// 时间旅行
spark.read
    .option("snapshot-id", 10963874102873L)
.table("hadoop_catalog.my_db.my_table")

可用的读配置选项见下表。

Spark option选项 默认值 描述
snapshot-id (latest) 要读取的表快照的ID
as-of-timestamp (latest) 以毫秒为单位的时间戳;所使用的快照将是此时的当前快照
split-size   重写这个表的read.split.target-size和read.split.metadata-target-size
lookback   重写这个表的read.split.planning-lookback
file-open-cost   重写这个表的read.split.open-file-cost
vectorization-enabled   重写这个表的read.parquet.vectorization.enabled
batch-size   重写这个表的read.parquet.vectorization.batch-size

2. 写选项配置

Spark write options选项在配置DataFrameWriter时被传递,代码如下:

// 用Avro而不是Parquet写
df.write
    .option("write-format", "avro")
    .option("snapshot-property.key", "value")
.insertInto("hadoop_catalog.my_db.my_table")

可用的写配置选项见下表。

Spark option选项 默认值 描述
write-format write.format.default 写操作使用的文件格式:parquet,avro或 orc
target-file-size-bytes As per table property 覆盖这个表的write.target-file-size-bytes
check-nullability true 对字段设置nullable检查
snapshot-property.custom-key null 在快照摘要中添加具有自定义key和相应value的条目
fanout-enabled false 覆盖这个表的write.spark.fanout.enabled
check-ordering true 检查输入模式和表模式是否相同