表流处理读写
Delta Lake通过readStream和writeStream与Spark结构化流深度集成。Delta Lake克服了许多通常与流系统和文件相关的限制,包括:
- (1) 使用多个流(或并发批处理作业)维护exactly-once的处理。即使有其他流或批查询并发地运行在该表上,Delta Lake事务日志也能保证精确一次性处理。
- (2) 当使用文件作为流的源时,有效地发现哪些文件是新的。
对于表上的许多Delta Lake操作,可以在创建新的SparkSession时通过设置配置来支持与Apache Spark DataSourceV2和Catalog API(自3.0以来)的集成。
1.Delta表作为数据源
当将Delta表加载为流源并在流查询中使用它时,查询将处理表中出现的所有数据以及在流启动后到达的任何新数据。示例代码如下:
spark.readStream.format("delta").load("/delta/events")
import io.delta.implicits._
spark.readStream.delta("/delta/events") // 简写
1)限制输入的速度
以下选项可用于控制微批:
- (1) maxFilesPerTrigger:每个微批处理中需要考虑多少个新文件。默认值是1000。
- (2) maxBytesPerTrigger:每个微批处理的数据量。此选项设置一个“软最大值”,意味着一批大约处理这个数量的数据,并可能处理超过这个限制。如果为流指定Trigger.Once,则该选项将被忽略。这在默认情况下没有设置。
如果将maxBytesPerTrigger与maxFilesPerTrigger结合使用,则微批处理数据直到达到maxFilesPerTrigger或maxBytesPerTrigger限制为止。
2)忽略update和delete
结构化流不处理非追加的输入,并在用作源的表上发生任何修改时抛出异常。处理不能自动向下游传播的更改有两种主要策略:
- (1) 可以删除输出和检查点,然后从头重新启动流。
- (2) 可以设置这两个选项。ignoreDeletes:忽略在分区边界上删除数据的事务。ignoreChanges:如果由于数据更改操作(如UPDATE、MERGE INTO、DELETE(分区内)或OVERWRITE)而不得不重写源表中的文件,则重新处理更新。未更改的行仍然可能被发出,因此下游消费者应该能够处理重复的副本。删除不会向下游传播。ignoreChanges包容ignoreDeletes。因此,如果使用ignoreChanges,流将不会被对源表的删除或更新所中断。
例如,假设有一个表user_events,其中包含date、user_email和action列,按date分区。将user_events表作为流源,由于GDPR的关系,需要从该表中删除数据。
当在分区边界处删除时(即WHERE位于分区列上),文件已经按值分段,因此删除只是从元数据中删除这些文件。因此,如果只是想从一些分区中删除数据,代码如下:
spark.readStream.format("delta")
.option("ignoreDeletes", "true")
.load("/delta/user_events")
然而,如果必须删除基于user_email的数据,则代码如下:
spark.readStream.format("delta")
.option("ignoreChanges", "true")
.load("/delta/user_events")
如果使用update语句更新user_email,则包含问题user_email的文件将被重写。当使用ignoreChanges时,新记录将与同一文件中所有其他未更改的记录一起向下传播。计算逻辑应该能够处理这些传入的重复记录。
3)指定初始位置
可以使用以下选项指定Delta Lake流源的起始点,而无需处理整个表。
- (1) startingVersion:开始的Delta Lake版本。从这个版本(包括在内)开始的所有表更改都将被流源读取。提交的版本号可以从DESCRIBE HISTORY命令输出的version列中获取。若要只返回最新的更改,指定该选项值为latest。
- (2) startingTimestamp:开始的时间戳。在这个时间戳(包括在内)或之后提交的所有表更改都将被流源读取。其值可以是以下两种类型之一:一个时间戳字符串,例如,“2019-01-01T00:00:00.000Z”;一个日期字符串,例如,“2019-01-01”。
不能同时设置这两个选项,只能使用其中一种。它们只在启动一个新的流查询时生效。如果一个流查询已经启动,并且进度已经记录在检查点中,那么这些选项将被忽略。
需要注意的是,虽然可以从指定的版本或时间戳启动流源,但流源的模式始终是Delta表的最新模式。必须确保在指定的版本或时间戳之后对Delta表没有不兼容的模式更改。否则,当使用不正确的模式读取数据时,流源可能会返回不正确的结果。
例如,假设有一个表user_events。如果想读取自版本5以来的更改,代码如下:
spark.readStream.format("delta")
.option("startingVersion", "5")
.load("/delta/user_events")
如果想读取2018-10-18以来的更改,代码如下:
spark.readStream.format("delta")
.option("startingTimestamp", "2018-10-18")
.load("/delta/user_events")
2.Delta表作为数据接收器
还可以使用结构化流将数据写入Delta表。事务日志使Delta Lake能够保证exactly-once处理,即使有其他流或批查询同时对表运行。示例代码如下:
val streamingDf = spark
.readStream
.format("rate")
.option("rowsPerSecond","10")
.load()
val stream = streamingDf
.select($"value" as "number")
.writeStream.format("delta")
.option("checkpointLocation", "/tmp/checkpoint")
.start("file:///home/hduser/data/delta-table/demo")
stream.awaitTermination()
stream.stop()
将流数据写入Delta表时,有两种模式。
1)Append模式
默认情况下,流以追加(append)模式运行,该模式将新记录添加到表中,代码如下:
// path方法
events.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation","/delta/events/_checkpoints/etl-from-json")
.start("/delta/events")
# 简写
import io.delta.implicits._
events.writeStream
.outputMode("append")
.option("checkpointLocation","/delta/events/_checkpoints/etl-from-json")
.delta("/delta/events")
// 表方法
events.writeStream
.outputMode("append")
.option("checkpointLocation","/delta/events/_checkpoints/etl-from-json")
.table("events")
2)Complete模式
也可以使用结构化流来替换每批处理的整个表。一个应用场景是是使用聚合计算汇总信息,代码如下:
spark.readStream
.format("delta")
.load("/delta/events")
.groupBy("customerId")
.count()
.writeStream
.format("delta")
.outputMode("complete")
.option("checkpointLocation","/delta/eventsByCustomer/_checkpoints/streaming-agg")
.start("/delta/eventsByCustomer")
上面的示例不断更新一个表,该表包含按客户聚合的事件数。
对于延迟要求更宽松的应用程序,可以使用一次性触发器节省计算资源。使用它们根据给定的时间表更新汇总聚合表,只处理自上次更新以来到达的新数据。