案例:运输公司车辆超速实时监测
假设有一个物流公司车队管理解决方案,其中车队中的车辆启用了无线网络功能。每辆运输车需要定期报告其地理位置和多个操作参数,如燃油水平、速度、加速度、轴承、发动机温度等。物流公司希望利用这一远程实时监测数据流来实现一系列应用程序,以帮助他们管理业务的运营和财务方面。
现在我们承担了其中一个任务,需要开发一个流应用程序,实时统计运输车辆每几秒钟的平均速度,用来检查车辆是否超速。程序整体架构如下图所示。
在本案例中,使用Kafka作为流数据源,将从Kafka的cars主题来读取这些事件。同时也将Kafka作为流的Data Sink,检测出的超速事件将写入到Kafka的fastcars主题。
1. 实现技术剖析
PySpark在处理实时数据流时,它会等待一个非常小的间隔,比如1秒(或者甚至0秒—即尽可能地快)。将在此间隔期间收到的所有事件合并到一个微批处理中。在结构化流中,微批数据被构造为一个流式的DataFrame(即无界表),如下图所示。
这个微批处理然后由驱动程序(Driver)调度,作为任务(Task)在执行器(Executor)中执行。完成微批处理执行后,将再次收集并调度下一批。这种调度是经常进行的,以给人以流执行的印象,如下图所示。
在本案例中使用Kafka作为流数据源,流处理程序将从Kafka的cars主题来读取这些事件,代码如下:
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "cars") \
.option("startingOffsets", "earliest") \
.load()
为了模拟车辆发送传感器数据,本案例配套源码中提供一个已经创建好的Kafka生产者程序fastcars_fat.jar(这是一个Java程序执行jar包),它将id、speed(速度)、acceleration(加速度) 和timestamp(时间戳)写入Kafka的cars主题。注意,这里的timestamp时间戳是在事件源处生成事件(消息)的时间,即代表事件时间。这个数据源产生过程如图7-13所示。
接下来,在流处理程序中对从Kafka cars主题拉取的原始数据解析解析,这样就有了一个可以使用的结构,代码如下:
# 解析从Kafka拉取的值value
import pyspark.sql.functions as F
convertedDF = df \
.selectExpr("CAST(value as string)") \
.select(F.split("value",",").alias("data")) \
.select(
F.col("data")[0].alias("car_id"),
F.col("data")[1].alias("speed"),
F.col("data")[2].alias("acceleration"),
F.substring(F.col("data")[3],0,10).alias("ts")
) \
.selectExpr(
"car_id",
"CAST(speed as int)",
"CAST(acceleration as double)",
"CAST(ts as bigint)"
) \
.withColumn("ts",F.to_timestamp(F.from_unixtime("ts")))
convertedDF.printSchema()
这时产生的流DataFrame的结构如下图所示。
注意,在上面的代码中,因为从Kafka中拉取的数据value值为二进制,所以需要先把它转换为字符串,然后调用split方法将其分割为各个字段。其中模拟数据源生成的日期是13位数字的毫秒值,所以在转换为时间戳之前,需要先调用substring方法截掉最后三位数字(变成10位数字的秒值)。接下来,将各个字段由字符串类型转换为相应的数据类型。
接下来执行聚合操作,这从求每辆车的平均速度开始。这可以通过对car_id执行groupBy()并应用avg()聚合函数来实现,代码如下:
aggregates = convertedDF \
.groupBy("car_id") \
.agg(F.avg("speed"))
在结构化流程序中,可以使用触发器控制微批处理的时间间隔。在PySpark中,触发器被设置为指定在检查新数据是否可用之前等待多长时间。如果没有设置触发器,一旦完成前一个微批处理执行,PySpark将立即检查新数据的可用性。
这里需要计算车辆在过去5秒内的平均速度。为此,需要根据事件时间将事件分组为5秒间隔时间组。这种分组称为窗口。
在PySpark中,窗口是通过在groupBy()子句中添加额外的key来实现的。对于每个消息,它的事件时间(传感器生成的时间戳)用于标识消息属于哪个窗口。基于窗口的类型(滚动/滑动),一个事件可能属于一个或多个窗口。
为了实现窗口化,PySpark添加了一个名为window的新列,并将提供的ts列分解为一个或多个行(基于它的值、窗口的大小和滑动),并在该列上执行groupBy()操作。这将隐式地将属于一个时间间隔的所有事件拉到同一个“窗口”中。
这里根据window和car_id对convertedDF 数据进行分组。注意,window()是PySpark中的一个函数,它返回一个列,代码如下:
# 4秒大小的滚动窗口
aggregates = convertedDF
..groupBy(F.window("ts","4 seconds"), "car_id")
.agg(F.avg("speed").alias("speed"))
.where("speed > 100")
定义窗口大小为4秒、滑动为2秒的滑动窗口,代码如下:
convertedDF.groupBy(F.window("ts", "4 seconds", "2 seconds"), "car_id")
这将产生一个car_id、平均速度和相应时间窗口的DataFrame。输出如下图所示。
PySpark提供了三种输出模式:complete、update和append。在处理微批处理后,PySpark更新状态和输出结果的方式各不相同,如下图所示。
在每个微批处理期间,PySpark更新前批处理中的一些key的值,有些是新的,有些保持不变。在complete模式下,输出所有的行,而在update模式下,只输出新的和更新的行。append模式略有不同,在append模式中,不会有任何更新的行,它只输出新增的行。在流程序中指定输出模式,代码如下:
writeToKafka = aggregates \
.selectExpr("CAST(car_id as string) as key", "warn as value") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers","xueai8:9092") \
.option("topic", "fastcars") \
.option("startingOffsets", "earliest") \
.option("checkpointLocation", "/tmp/carsck/") \
.outputMode("update") \
.start()
在使用Kafka接收器时,检查点位置是必须的,它支持故障恢复和精确地一次处理。
运行应用程序的输出如下图所示。
请注意,结构化流API隐式地跨批维护聚合函数的状态。例如,在上面的例子中,第二个微批计算的平均速度将是第1和第2批接收到的事件的平均速度。作为用户,我们不需要自定义状态管理。但随着时间的推移,维护一个庞大的状态也会带来成本。这可以通过使用水印来实现控制。
在PySpark中,水印用于根据当前最大事件时间决定何时清除状态。基于用户所指定的延迟,水印滞后于目前所看到的最大事件时间。例如,如果dealy是3秒,当前最大事件时间是10:00:45,那么水印是在10:00:42。这意味着Spark将保持结束时间小于10:00:42的窗口的状态。在程序中指定水印,代码如下:
# 使用ts字段设置水印,最大延迟为3s
aggregates = convertedDF \
.withWatermark("ts", "3 seconds") \
.groupBy(F.window("ts","4 seconds"), "car_id") \
.agg(F.avg("speed").alias("speed")) \
.where("speed > 100") \
.withColumn("warn",F.concat_ws(", ","car_id","speed"))
需要理解的一个细微但重要的细节是,当使用基于EventTime的处理时,只有当接收到具有更高时间戳值的消息/事件时,时间才会前进。可以将它看作PySpark内部的时钟,但与每秒钟滴滴计时(基于处理时间)的普通时钟不同,该时钟只在接收到具有更高时间戳的事件时移动。
请看下面这个示例,以深入理解在消息到达较迟时PySpark引擎会如何去处理。这里将集中于[10:00到10:10]之间的单个窗口和5秒的最大延迟,代码如下:
.withWatermark("ts", "5 seconds")
引擎跟踪的最大事件时间,在每个触发器开始时将水印设置为(最大事件时间-5秒钟)。如下图所示。
对于上图,说明如下:
- (1) 一个时间戳为10:00的事件到达并落在窗口[10:00,10:10),水印被更新为timestamp - 5。
- (2) 在数据源处生成时间戳为10:02的事件,但该事件会有很大的延迟。这个事件应该落在窗口[10:00,10:10)中。
- (3) 时间戳为10:04的事件在10:05时到达,稍微有些延迟,但它仍然落在窗口[10:00,10:10)中,因为当前水印为09:55,即小于窗口结束时间。水印更新到10:04 - 00:05 = 09:59。
- (4) 一个时间戳为10:16的事件到达,它将水印更新为10:11(此事件将落在窗口[10:10,10:20),但与这里讨论的内容不相关)。
- (5) 带有时间戳10:02的延迟事件姗姗来迟,但是此时窗口[10:00,10:10)已经被清除,因此该事件将被删除。
设置水印将确保状态不会永远增长。另外,请注意本示例中一个延迟事件是如何处理的,而另一个是如何被忽略了(因为已经太迟了)。
2. 完整实现代码
下面给出本案例的完整代码。
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
# 创建SparkSession实例
spark = SparkSession.builder.appName("Cars Demo").getOrCreate()
# 设置shuffle后的分区数为10(测试环境下)
spark.conf.set("spark.sql.shuffle.partitions",10)
# 设置日志级别
# spark.sparkContext.setLogLevel("WARN")
# 创建一个流来监听cars topic的消息
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "xueai8:9092") \
.option("subscribe", "cars") \
.option("startingOffsets", "earliest") \
.load()
# 查看schema
df.printSchema()
# 解析从Kafka拉取的值value
convertedDF = df \
.selectExpr("CAST(value as string)") \
.select(F.split("value",",").alias("data")) \
.select(
F.col("data")[0].alias("car_id"),
F.col("data")[1].alias("speed"),
F.col("data")[2].alias("acceleration"),
F.substring(F.col("data")[3],0,10).alias("ts")
) \
.selectExpr(
"car_id",
"CAST(speed as int)",
"CAST(acceleration as double)",
"CAST(ts as bigint)"
) \
.withColumn("ts",F.to_timestamp(F.from_unixtime("ts")))
convertedDF.printSchema()
# 执行不带窗口的聚合
# aggregates = convertedDF.groupBy("car_id").agg(F.avg("speed"))
# 执行滑动窗口聚合,窗口定义:大小为4秒、滑动为1秒
# convertedDF.groupBy(F.window("ts","4 seconds","1 seconds"), "car_id")
# 如果使用处理时间
# convertedDF.groupBy(window(current_timestamp(),"4 seconds"), "car_id")
# 执行带有水印的窗口聚合
aggregates = convertedDF \
.withWatermark("ts", "3 seconds") \
.groupBy(F.window("ts","4 seconds"), "car_id") \
.agg(F.avg("speed").alias("speed")) \
.where("speed > 100") \
.withColumn("warn",F.concat_ws(", ","car_id","speed"))
aggregates.printSchema()
# 将结果写出到控制台
writeToConsole = aggregates \
.writeStream \
.format("console") \
.option("truncate", "false") \
.outputMode("append") \
.start()
# 将结果写出到Kafka
writeToKafka = aggregates \
.selectExpr("CAST(car_id as string) as key", "warn as value") \
.writeStream \
.format("kafka") \
.option("kafka.bootstrap.servers","xueai8:9092") \
.option("topic", "fastcars") \
.option("startingOffsets", "earliest") \
.option("checkpointLocation", "/tmp/carsck/") \
.outputMode("update") \
.start()
# 等待多个流结束(作为作业文件提交集群上运行时启用)
# 在pyspark shell命令行,在查询对象上调用.stop()方法停止查询
# spark.streams.awaitAnyTermination()
3. 执行步骤演示
本案例涉及到与Kafka的集成,以及使用自定义的数据源程序,所以执行步骤稍稍有点复杂,请按以下说明操作。
1) 测试数据源
本案例使用到一个Kafka生产者程序。该程序是以Java开发、编译和打包的执行jar包,它位于配套的源码包中,名为fastcars_fat.jar,可使用jar命令来运行它。请按以下步骤测试和掌握该数据源程序的使用。
(1) 启动zookeeper。在第一个终端窗口,执行以下命令:
$ cd ~/bigdata/kafka_2.12-2.4.1 $ ./bin/zookeeper-server-start.sh config/zookeeper.properties
这将在2181端口启动ZooKeeper进程,并让ZooKeeper在后台工作。
(2) 接下来,启动Kafka服务器。在第二个终端窗口,执行以下命令:
$ cd ~/bigdata/kafka_2.12-2.4.1 $ ./bin/kafka-server-start.sh config/server.properties
(3) 创建主题cars。在第三个终端窗口,执行以下命令:
$ cd ~/bigdata/kafka_2.12-2.4.1 $ ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic cars # 查看已有的主题: $ ./bin/kafka-topics.sh --list --zookeeper localhost:2181
(4) 运行kafka消费者脚本,指定要连接的Kafka服务器名称和端口号,以及要连接的主题。继续在第三个终端窗口,执行以下命令:
$ ./bin/kafka-console-consumer.sh --topic cars --bootstrap-server localhost:9092
(5) 运行模拟数据源程序jar包。打开第四个终端窗口,执行以下命令:
$ java -jar ~/jars/fastcars_fat.jar localhost:9092 cars
其中参数localhost:9092是要连接的Kafka服务器名称和端口号,参数cars是连接的主题。这两个参数可以根据自己的需要修改,但是要和消费者监听的服务器和主题一致。
执行过程如下图所示。
(6) 回到第三个终端窗口(运行消费者脚本的窗口),可以看到输出了接收到的数据信息,如下图所示。
(7) 如果要终止数据源程序或消费者脚本的运行,同时按下Ctrl + C键即可。
2) 执行流程序
接下来,就可以运行上节开发的PySpark结构化流程序,来实时监测运输车辆的车速,并及时将监测到的超速车辆信息发送到Kafka的fastcars主题。请按以下步骤操作。
(1) 启动zookeeper(如果已经启动,略过此步骤)。在第一个终端窗口,执行以下命令:
$ cd ~/bigdata/kafka_2.12-2.4.1 $ ./bin/zookeeper-server-start.sh config/zookeeper.properties
(2) 接下来,启动Kafka服务器(如果已经启动,略过此步骤)。在第二个终端窗口,执行以下命令:
$ cd ~/bigdata/kafka_2.12-2.4.1 $ ./bin/kafka-server-start.sh config/server.properties
(3) 创建主题fastcars(如果已经创建,略过此步骤)。在第三个终端窗口,执行以下命令:
$ cd ~/bigdata/kafka_2.12-2.4.1 $ ./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic fastcars # 查看已有的主题: $ ./bin/kafka-topics.sh --list --zookeeper localhost:2181
(4) 运行kafka消费者脚本,监听超速车辆信息。继续在第三个终端窗口,执行以下命令:
$ ./bin/kafka-console-consumer.sh --topic fastcars --bootstrap-server xueai8:9092
(5) 运行流程序。打开第四个终端窗口,启动pyspark shell,运行上节开发的流程序代码。(也可以保存到.ipynb文件中,使用spark-submit命令提交执行)
(6) 运行模拟数据源程序jar包。打开第五个终端窗口,执行以下命令:
$ java -jar ~/jars/fastcars_fat.jar localhost:9092 cars
(7) 回到第三个终端窗口,如果一节正常,那么可以看到接收到的超速信息,如下图所示。
(8) 如果要结束程序的运行,按以下顺序关闭:
- 第一步,先关闭数据源程序,同时按下Ctrl + C键。
- 第二步,再关闭流处理程序,键入exit()函数,然后按下Enter键。
- 第三步,然后关闭消费者脚本程序,同时按下Ctrl + C键。
- 第四步,先关闭Kafka集群,同时按下Ctrl + C键。
- 第五步,最后关闭Zookeeper集群,同时按下Ctrl + C键。