复杂模式识别-状态处理

如前所述,按key或事件窗口聚合的中间状态由结构化流自动维护。然而,并不是所有的event-time处理都可以通过简单地在一个或多个列上聚合,并且在没有窗口的情况下得到满足。

例如,在IOT实时温度监控程序中,当看到三个连续的温度读数超过100度时,需要发出一个警报。再例如,关于维护用户会话,其中每个会话的长度不是由固定的时间决定的,而是由用户的活动和缺乏活动决定的。要解决这两个示例和类似的应用场景,就需要能够在每组数据上应用任意处理逻辑,以控制每组数据的窗口长度,并在触发器点上保持任意状态。这就需要应用结构化流的任意状态处理。

结构化流为流应用程序提供了一种回调机制来执行任意的有状态处理,并且它将负责确保中间状态的维护和以容错的方式存储。

下面我们将通过一个示例来演示结构化流中的任意状态处理。

在下面这个示例程序中,我们从数据中心计算机机架温度数据中提取模式,并维护中间状态下每个机架的状态。每当遇到三个连续的100度及以上的温度时,机架状态将升级到warning(警告)级别。这个例子将使用mapGroupsWithState API。

【示例】数据中心计算机机架温度数据复杂事件模式识别和处理。

在本例中,我们感兴趣的模式是从同一个机架上采集到连续三个温度读数在100度或以上、并且两个连续高温读数之间的时间差必须在60秒内。当检测到这种模式时,该特定机架的状态将升级为warning(警告)状态。如果下一个进入的温度读数低于100度阈值,那么机架状态就会降级为正常值。

请按以下步骤操作。

1)准备数据

在本示例中,我们使用文件数据源,事件数据存储在JSON格式的文件中。每个事件由三个字段组成:

  • rack:表示机架的唯一ID,字符串类型。
  • temperature:表示采集到的温度值,double类型。
  • ts:表示事件发生时的时间戳。这是事件时间(event time)。

这里提供了三个数据文件。其中file1.json的内容显示机架rack1的温度,在100度上下交替变化。文件file2.json显示rack2的温度,连续升温。在文件file3.json中,rack3也在升温,但温度读数超过一分钟的距离。这三个数据文件均位于PBLP平台的如下位置:~/data/spark/iot3/。

为了模拟数据流的行为,我们将把这三个JSON文件复制到项目的“src/data/iot3”目录下。

2)首先导入项目所依赖的包。

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.streaming.{GroupState, GroupStateTimeout, OutputMode}
import org.apache.spark.sql.types._

3)接下来,准备两个case class。对于这个用例,机架温度输入数据由类RackInfo来表示(输入事件I),中间状态和输出都由一个名为RackState的类表示(任意状态S)。代码如下。

  // 用于表示输入数据的case class
  case class RackInfo(rack:String, temperature:Double, ts:java.sql.Timestamp)

  // 用于表示中间状态和输出数据的case class
  // 注意,构造函数参数被定义为可修改的所以我们可以更新它们
  // lastTS变量用于比较以前和当前温度读数之间的时间
  case class RackState(var rackId: String,
                       var highTempCount: Int,
                       var status: String,
                       var lastTS: java.sql.Timestamp)

4)然后定义两个函数。第一个被称为updateRackState,它包含了模式检测的核心逻辑,用来检测在60秒内产生三个连续超过100度的高温读数,在每个group上执行。第二个函数叫做updateAcrossAllRackStatus,它是一个回调函数,它将被传递到mapGroupsWithState API。这个函数确保根据事件时间的顺序来处理机架温度读数。

实现代码如下。

  // 包含用于检测上述温度模式并更新中间状态的主要逻辑
  def updateRackState(rackState: RackState, rackInfo: RackInfo) : RackState = {

    // 设置条件来决定是否更新机架状态
    val lastTS = Option(rackState.lastTS).getOrElse(rackInfo.ts)
    val withinTimeThreshold = (rackInfo.ts.getTime - lastTS.getTime) <= 60000
    val meetCondition = if (rackState.highTempCount < 1) true else withinTimeThreshold
    val greaterThanEqualTo100 = rackInfo.temperature >= 100.0

    // 判断匹配条件
    (greaterThanEqualTo100, meetCondition)  match {
      case (true, true) => {      // 如果两个条件都满足了
        rackState.highTempCount = rackState.highTempCount + 1     // 则发现一个高温
        rackState.status = if (rackState.highTempCount >= 3) "Warning" else "Normal"
      }
      case _ => {              // 如果两个条件都不满足,或之一不满足
        rackState.highTempCount = 0         // 则清零,重新开始统计
        rackState.status = "Normal"
      }
    }

    rackState.lastTS = rackInfo.ts  // 更新状态时间为最后一个事件的时间
    rackState
  }

  // 回调函数,提供mapGroupsWithState API
  // GroupState[S]是一个由结构化流提供的包装器,并在内部用于跨执行管理状态S
  // 在该函数中,GroupState提供了对状态的突变访问,以及检查和设置超时的能力。
  def updateAcrossAllRackStatus( rackId: String,
                               inputs: Iterator[RackInfo],
                               oldState: GroupState[RackState]) : RackState = {
    // 初始化rackState,如果之前有状态存在的话,使用它;否则创建一个新的状态
    var rackState = if (oldState.exists) oldState.get else RackState(rackId, 3, "", null)

    // 按时间戳对输入进行升序排序
    inputs.toList.sortBy(_.ts.getTime).foreach( input => {
      rackState = updateRackState(rackState, input)
      // 在状态持有类GroupState中更新该rackState非常重要
      oldState.update(rackState)
    })

    rackState
  }

设置步骤现在已经完成。

5)现在在结构化流应用程序中将回调函数连接到mapGroupsWithState。实现代码如下所示:

  // 主程序
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .master("local")
      .appName("cep demo")
      .getOrCreate()

    // 用于IoT数据的schema
    val iotDataSchema = new StructType()
      .add("rack", StringType, nullable = false)
      .add("temperature", DoubleType, nullable = false)
      .add("ts", TimestampType, nullable = false)

    // 读取流数据,使用文件数据源
    val dataPath = "src/data/iot3"
    val iotSSDF = spark.readStream
      .option("maxFilesPerTrigger", 1)
      .schema(iotDataSchema)
      .json(dataPath)

    iotSSDF.printSchema()

    import spark.implicits._
    
    val iotPatternDF = iotSSDF.as[RackInfo]
      .groupByKey(_.rack)           // 按机架ID分组
      // 将给定函数应用于每组数据,同时维护用户定义的每组数据状态。
      // 对于一个流Dataset, 该函数将在每个触发器中为每个group重复调用,
      // 每个组状态的更新将在调用之间保存。
      .mapGroupsWithState[RackState,RackState](GroupStateTimeout.NoTimeout)(updateAcrossAllRackStatus)

    // 设置输出并启动流查询
    iotPatternDF.writeStream
      .format("console")
      .outputMode("update")
      .start()
      .awaitTermination()
  }

6)执行以上程序。当读取到第一个file1.json数据文件时,输出结果如下:

+------+-------------+------+-------------------+
|rackId|highTempCount|status|             lastTS|
+------+-------------+------+-------------------+
| rack1|            1|Normal|2017-06-02 08:02:44|
+------+-------------+------+-------------------+

当读取到第二个file2.json数据文件时,输出结果如下:

+------+-------------+-------+-------------------+
|rackId|highTempCount| status|             lastTS|
+------+-------------+-------+-------------------+
| rack1|            0| Normal|2017-06-02 08:02:59|
| rack2|            3|Warning|2017-06-02 08:04:49|
+------+-------------+-------+-------------------+

当读取到第三个file3.json数据文件时,输出结果如下:

+------+-------------+------+-------------------+
|rackId|highTempCount|status|             lastTS|
+------+-------------+------+-------------------+
| rack3|            1|Normal|2017-06-02 08:08:53|
| rack2|            0|Normal|2017-06-02 08:06:40|
+------+-------------+------+-------------------+

从以上的输出结果中,分以看到,rack1有一些温度读数超过100度;然而,它们不是连续的,因此输出状态处于正常水平。在文件file2.json中rack2有三个连续的温度读数超过100度,而每一个和前一个之间的时间间隔小于60秒,所以rack2的状态处于warning(警告)级别。rack3有三个连续的温度度数超过100度;然而,每一个和前一个之间的时间间隔超过了60秒。因此,它的地位处于正常水平。


《PySpark原理深入与编程实战》