流读取和流写入
Apache Iceberg使用Apache Spark的DataSourceV2 API实现数据源和catalog目录实现。Spark DSv2是一个不断发展的API,在Spark版本中提供了不同级别的支持。
从Spark 3.0开始,支持DataFrame读写。
流读取
可以使用快照历史记录来支持从Iceberg表中作为流源读取数据。Iceberg支持处理spark结构化流作业中的增量数据,这些作业从历史时间戳开始,模板代码如下:
val df = spark.readStream
.format("iceberg")
.option("stream-from-timestamp", Long.toString(streamStartTimestamp))
.load("database.table_name")
Iceberg仅支持从append快照读取数据,而overwrite快照不能够被处理,会引起异常。类似地,默认情况下delete快照会导致异常,但是通过设置stream-skip-delete-snapshot=true可以忽略delete。
流写入
使用Iceberg表作为Spark结构化流的Data Sink(数据接收器)。Spark为所有的sink提供了queryId和epochId,必须保证所有的写操作都是幂等的。Spark可能会尝试多次提交同一个批处理。因此,用户需要知道每个查询最新提交的epochId。一种方法是在快照摘要中持久化queryId和epochId。在写操作上,可以简单地遍历快照并检查给定查询最新提交的epochId,以使写操作幂等。
要将流查询的值写入Iceberg表,使用DataStreamWriter,模板代码如下:
val tableIdentifier: String = ...
data.writeStream
.format("iceberg")
.outputMode("append")
.trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
.option("path", tableIdentifier)
.option("checkpointLocation", checkpointPath)
.start()
其中tableIdentifier可以是:
- (1) HDFS表的完全限定路径,例如hdfs://nn:8020/path/to/table。
- (2) 如果表由catalog跟踪,则为表名,如database.table_name。
Iceberg不支持“continuous processing”,因为它不提供“commit”输出的接口。
Iceberg支持append和complete的输出模式:
- (1) append:将每个微批的行追加到表中。
- (2) complete:每个微批替换表内容。
Iceberg表应该在开始流查询之前创建。
1.对分区表进行写入
在对分区表进行写操作之前,Iceberg要求根据每个任务(Spark分区)的分区规范对数据进行排序。对于批处理查询,鼓励执行显式排序来满足需求,但是这种方法会带来额外的延迟,因为重分区和排序被认为是流工作负载的繁重操作。为了避免额外的延迟,可以启用fanout writer(扇出写入器)来消除这一要求。模板代码如下:
val tableIdentifier: String = ...
data.writeStream
.format("iceberg")
.outputMode("append")
.trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
.option("path", tableIdentifier)
.option("fanout-enabled", "true")
.option("checkpointLocation", checkpointPath)
.start()
Fanout writer按每个分区值打开文件,直到写任务完成才关闭这些文件。不鼓励批处理查询使用此功能,因为对输出行进行显式排序对于批处理工作负载来说并不昂贵。
2.读写表示例
本示例是从上游Kafka中读取数据,写入Iceberg表,打包放到PBLP集群上通过spark-submit提交执行。
(1) 通过Kafka脚本创建测试使用的主题并准备测试数据。
使用SSH方式登录到Kafka集群,并切换到Kafka的安装目录,命令如下:
$ cd ~/bigdata/kafka-4.1.2
创建名为iceberg_test的Kafka主题,命令如下:
$ ./bin/kafka-topics.sh --zookeeper localhost:2181 --topic iceberg_test --partitions 1 --replication-factor 1 --create
生产测试数据,命令如下:
$ kafka-console-producer.sh --broker-list localhost:9092 --topic iceberg_test
(2) 通过Spark SQL创建测试使用的数据库iceberg_db和表iceberg_table。
(3) 编写Spark处理逻辑,代码如下:
def main(args: Array[String]): Unit = {
// 配置使用数据湖元数据
val sparkConf = new SparkConf()
sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
sparkConf.set("spark.sql.catalog.dlf_catalog", "org.apache.iceberg.spark.SparkCatalog")
sparkConf.set("spark.sql.catalog.dlf_catalog.type", "org.apache.iceberg.spark.SparkCatalog")
sparkConf.set("spark.sql.catalog.dlf_catalog.warehouse", "<yourOSSWarehousePath>")
val spark = SparkSession
.builder()
.config(sparkConf)
.appName("StructuredSinkIceberg")
.getOrCreate()
val checkpointPath = "/tmp/iceberg_table_checkpoint"
val bootstrapServers = "localhost:9092"
val topic = "iceberg_test"
// 从上游Kafka读取数据
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("subscribe", topic)
.load()
import spark.implicits._
val resDF = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)].toDF("id", "data")
// 流式写入Iceberg表
val query = resDF.writeStream
.format("iceberg")
.outputMode("append")
.trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
.option("path", "dlf_catalog.iceberg_db.iceberg_table")
.option("checkpointLocation", checkpointPath)
.start()
query.awaitTermination()
}