处理重复流数据

当数据源多次发送相同的数据时,实时流数据中的数据就会产生重复。在流处理中,由于流数据的无界性,去除重复数据是一种非常具有挑战性的任务。

不过,PySpark结构化流使得流应用程序能够轻松地执行数据去重,因此这些应用程序可以通过在到达时删除重复的数据来保证精确一次处理。结构化流所提供的数据去重特性可以与水印一起工作,也可以不使用水印。不过,需要注意的一点是,在执行数据去重时,如果没有指定水印的话,在流应用程序的整个生命周期中,结构化流需要维护的状态将无限增长,这可能会导致内存不足的问题。使用水印,比水印更老的数据会被自动删除,以避免重复的可能。

在结构化流中执行重复数据删除操作的API很简单,它只有一个输入参数,该输入参数是用来惟一标识每一行的列名的列表。这些列的值将被用于执行重复检测,并且结构化流将把它们存储为中间状态。

下面我们通过一个示例来演示这个API的使用。

【示例】处理重复到达的移动电话操作事件数据。

请按以下步骤操作。

1)准备数据

在本示例中,我们使用文件数据源,代表移动电话操作事件数据存储在两个JSON格式的文件中。每个事件由三个字段组成:

  • id:表示手机的唯一ID,字符串类型。
  • action:表示用户所采取的操作。该操作的可能值是"open"或"close"。
  • ts:表示用户action发生时的时间戳。这是事件时间(event time)。

这两个数据文件flie1.json和file2.json均位于PBLP平台的如下位置:~/data/spark/mobile4/。

观察file1.json文件上面的数据,会发现每一行都是唯一的id和ts列。

观察file2.json文件上面的数据,前两行是file1.json中前两行的重复,第三行是唯一的,第四行也是唯一的,但延迟到达(所以在我们后面的代码中应该不被处理)。

为了模拟数据流的行为,我们将把这两个JSON文件复制到项目的“src/main/resources/mobile4”目录下。

2)编写流处理代码,在代码中我们基于id列进行分组count聚合。id和ts列共同定义为key。

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

# 创建SparkSession实例
spark = SparkSession \
        .builder \
        .appName("late data demo") \
        .getOrCreate()

# 设置shuffle后的分区数为2(测试环境下)
spark.conf.set("spark.sql.shuffle.partitions",2)

# 设置日志级别
# spark.sparkContext.setLogLevel("WARN") 

# 为手机事件数据创建一个schema
fields = [
      StructField("id", StringType(), nullable = False),
      StructField("action", StringType(), nullable = False),
      StructField("ts", TimestampType(), nullable = False)
]
mobileDataSchema = StructType(fields)

# 指定监听的文件目录
dataPath = "/data/spark/stream/mobile4"

# 读取指定目录下的源数据文件,一次一个
mobileDF = spark.readStream \
      .option("maxFilesPerTrigger", 1) \
      .option("mode","failFast") \
      .schema(mobileDataSchema) \
      .json(dataPath)
   
# 添加水印,去重,分组聚合操作
windowCountDupDF = mobileDF \
      .withWatermark("ts", "10 minutes") \
      .dropDuplicates(["id", "ts"]) \
      .groupBy("id") \
      .count()

# 将结果输出到控制台
windowCountDupDF.writeStream \
      .format("console") \
      .option("truncate", "false") \
      .outputMode("update") \
      .start()

# 等待流程序执行结束(作为作业文件提交时启用)
# query.awaitTermination()

4)执行上面的程序代码。

当读取到file1.json源数据文件时,输出结果如下所示:

+------+-----+
|id    |count|
+------+-----+
|phone3|1    |
|phone1|1    |
|phone2|1    |
+------+-----+

当读取到file2.json源数据文件时,输出结果如下所示:

+------+-----+
|id    |count|
+------+-----+
|phone4|1    |
+------+-----+

如预期所料,当读取到file2.json源数据文件时,输出结果中只有一行显示在控制台中。原因是前两行是file1.json中前两行的重复,因此它们被过滤掉了(去重)。最后一行的时间戳是10:10:00,这被认为是迟到数据,因为时间戳比10分钟的水印阈值更迟。因此,最后一行没有被处理,也被删除掉了。


《Flink原理深入与编程实战》