发布日期:2022-10-27 VIP内容

流读取和流写入

Apache Iceberg使用Apache Spark的DataSourceV2 API实现数据源和catalog目录实现。Spark DSv2是一个不断发展的API,在Spark版本中提供了不同级别的支持。

从Spark 3.0开始,支持DataFrame读写。

流读取

可以使用快照历史记录来支持从Iceberg表中作为流源读取数据。Iceberg支持处理spark结构化流作业中的增量数据,这些作业从历史时间戳开始,模板代码如下:

val df = spark.readStream
    .format("iceberg")
    .option("stream-from-timestamp", Long.toString(streamStartTimestamp))
    .load("database.table_name")

Iceberg仅支持从append快照读取数据,而overwrite快照不能够被处理,会引起异常。类似地,默认情况下delete快照会导致异常,但是通过设置stream-skip-delete-snapshot=true可以忽略delete。

流写入

使用Iceberg表作为Spark结构化流的Data Sink(数据接收器)。Spark为所有的sink提供了queryId和epochId,必须保证所有的写操作都是幂等的。Spark可能会尝试多次提交同一个批处理。因此,用户需要知道每个查询最新提交的epochId。一种方法是在快照摘要中持久化queryId和epochId。在写操作上,可以简单地遍历快照并检查给定查询最新提交的epochId,以使写操作幂等。

要将流查询的值写入Iceberg表,使用DataStreamWriter,模板代码如下:

val tableIdentifier: String = ...
data.writeStream
    .format("iceberg")
    .outputMode("append")
    .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
    .option("path", tableIdentifier)
    .option("checkpointLocation", checkpointPath)
    .start()

其中tableIdentifier可以是:

  • (1) HDFS表的完全限定路径,例如hdfs://nn:8020/path/to/table。
  • (2) 如果表由catalog跟踪,则为表名,如database.table_name。

Iceberg不支持“continuous processing”,因为它不提供“commit”输出的接口。

Iceberg支持append和complete的输出模式:

  • (1) append:将每个微批的行追加到表中。
  • (2) complete:每个微批替换表内容。

Iceberg表应该在开始流查询之前创建。

1.对分区表进行写入

在对分区表进行写操作之前,Iceberg要求根据每个任务(Spark分区)的分区规范对数据进行排序。对于批处理查询,鼓励执行显式排序来满足需求,但是这种方法会带来额外的延迟,因为重分区和排序被认为是流工作负载的繁重操作。为了避免额外的延迟,可以启用fanout writer(扇出写入器)来消除这一要求。模板代码如下:

val tableIdentifier: String = ...
data.writeStream
    .format("iceberg")
    .outputMode("append")
    .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
    .option("path", tableIdentifier)
    .option("fanout-enabled", "true")
    .option("checkpointLocation", checkpointPath)
    .start()

Fanout writer按每个分区值打开文件,直到写任务完成才关闭这些文件。不鼓励批处理查询使用此功能,因为对输出行进行显式排序对于批处理工作负载来说并不昂贵。

2.读写表示例

本示例是从上游Kafka中读取数据,写入Iceberg表,打包放到PBLP集群上通过spark-submit提交执行。

(1) 通过Kafka脚本创建测试使用的主题并准备测试数据。

使用SSH方式登录到Kafka集群,并切换到Kafka的安装目录,命令如下:

$ cd ~/bigdata/kafka-4.1.2

创建名为iceberg_test的Kafka主题,命令如下:

$ ./bin/kafka-topics.sh --zookeeper localhost:2181 --topic iceberg_test --partitions 1 --replication-factor 1 --create

生产测试数据,命令如下:

$ kafka-console-producer.sh --broker-list localhost:9092 --topic iceberg_test

(2) 通过Spark SQL创建测试使用的数据库iceberg_db和表iceberg_table。

(3) 编写Spark处理逻辑,代码如下:

def main(args: Array[String]): Unit = {

  // 配置使用数据湖元数据
  val sparkConf = new SparkConf()
  sparkConf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
  sparkConf.set("spark.sql.catalog.dlf_catalog", "org.apache.iceberg.spark.SparkCatalog")
  sparkConf.set("spark.sql.catalog.dlf_catalog.type", "org.apache.iceberg.spark.SparkCatalog")
  sparkConf.set("spark.sql.catalog.dlf_catalog.warehouse", "<yourOSSWarehousePath>")

  val spark = SparkSession
    .builder()
    .config(sparkConf)
    .appName("StructuredSinkIceberg")
    .getOrCreate()

  val checkpointPath = "/tmp/iceberg_table_checkpoint"
  val bootstrapServers = "localhost:9092"
  val topic = "iceberg_test"

  // 从上游Kafka读取数据
  val df = spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", bootstrapServers)
    .option("subscribe", topic)
    .load()

  import spark.implicits._
  val resDF = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
    .as[(String, String)].toDF("id", "data")

  // 流式写入Iceberg表
  val query = resDF.writeStream
    .format("iceberg")
    .outputMode("append")
    .trigger(Trigger.ProcessingTime(1, TimeUnit.MINUTES))
    .option("path", "dlf_catalog.iceberg_db.iceberg_table")
    .option("checkpointLocation", checkpointPath)
    .start()

  query.awaitTermination()
}