发布日期:2021-11-24
VIP内容
运输公司车辆超速实时监测
在本节我们将实现一个Flink实时流应用程序,用来监测运输车辆是否超速,并在超速时给出报警信息。
主要内容包括:
- 需求说明
- 自定义数据源实现(Scala、Java)
- 流处理管道实现(Scala、Java)
- 程序执行步骤和结果
需求说明
让我们想象一个物流公司车队管理解决方案,其中车队中的车辆启用了无线网络功能。每辆车定期报告其地理位置和许多操作参数,如燃油水平、速度、加速度、轴承、发动机温度等。物流公司希望利用这一遥测数据流来实现一系列应用程序,以帮助公司提高业务运营和财务方面的管理效率。
假设公司现在希望我们实现其中的车辆超速实时监测功能,用来检查运输车辆是否超速。为此我们将创建一个简单的Flink实时流应用程序来计算车辆每几秒钟的平均速度。
在这个案例中,我们使用Kafka作为流数据源,将从Kafka的“cars”主题来读取这些事件。同时也将Kafka作为流的Data Sink,检测出的超速事件将写入到Kafka的“fastcars”主题。
自定义数据源
为了模拟车辆向我们发送传感器数据,我们将创建一个Kafka producer,它将id、speed、acceleration 和timestamp写入Kafka的“cars”主题。
Scala代码:
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import scala.annotation.tailrec
import scala.util.{Random => r}
object RandomCarsKafkaProducer {
def main(args: Array[String]): Unit = {
val props = new Properties()
props.put("bootstrap.servers", "localhost:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String, String](props)
val interval = 1000
val topic = "cars"
val numRecsToProduce: Option[Int] = None // None = infinite
// val numRecsToProduce: Option[Int] = 1000 // 连续产生1000条数据
@tailrec
def produceRecord(numRecToProduce: Option[Int]): Unit = {
// 每次调用下面这个方法,发送一条车辆行驶数据给Kafka "cars" topic
def generateCarRecord(topic: String): ProducerRecord[String, String] = {
val carName = s"car${r.nextInt(10)}"
val speed = r.nextInt(150)
val acc = r.nextFloat * 100
val value = s"$carName,$speed,$acc,${System.currentTimeMillis()}"
print(s"Writing $value\n")
val d = r.nextFloat() * 100
if (d < 2) {
// 产生随机延迟
println("抱歉! 有一些网络延迟!")
Thread.sleep((d*100).toLong)
}
new ProducerRecord[String, String](topic,"key", value)
}
numRecToProduce match {
case Some(x) if x > 0 ⇒
// 生成一条数据,发送一条数据
producer.send(generateCarRecord(topic))
Thread.sleep(interval)
produceRecord(Some(x - 1))
case None ⇒
producer.send(generateCarRecord(topic))
Thread.sleep(interval)
produceRecord(None)
case _ ⇒
}
}
produceRecord(numRecsToProduce)
}
}
Java代码:
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
import java.util.Random;
/**
*
* 模拟车辆传感器数据源 - 写Kafka
*/
public class RandomCarsKafkaProducer {
private static final String TOPIC_CARS = "cars"; // topic
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer producer = new KafkaProducer<>(props);
// int numRecsToProduce = -1; // -1 = infinite
int numRecsToProduce = 1000; // 连续产生1000条数据
produceRecord(producer,numRecsToProduce);
}
private static void produceRecord(KafkaProducer<String, String> producer, int recordNum){
int interval = 1000;
// 生产有限数据记录
if(recordNum > 0){
// 生成一条数据,发送一条数据
producer.send(generateCarRecord(TOPIC_CARS));
try {
Thread.sleep(interval);
} catch (InterruptedException e) {
e.printStackTrace();
}
produceRecord(producer, recordNum - 1);
}
// 生产无限数据记录
else if(recordNum < 0){
producer.send(generateCarRecord(TOPIC_CARS));
try {
Thread.sleep(interval);
} catch (InterruptedException e) {
e.printStackTrace();
}
produceRecord(producer, -1);
}
}
// 生成一条车辆监测信息的方法
private static ProducerRecord<String, String> generateCarRecord(String topic){
Random r = new Random();
String carName = "car" +r.nextInt(10);
int speed = r.nextInt(150);
float acc = r.nextFloat() * 100;
long ts = System.currentTimeMillis();
String value = carName + "," + speed + "," + acc + "," + ts;
System.out.println("==Writing==:" + value);
float d = r.nextFloat() * 100;
if (d < 2) {
// 产生随机延迟
System.out.println("抱歉! 有一些网络延迟!");
try {
Thread.sleep(Float.valueOf(d*100).longValue());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
return new ProducerRecord<>(topic,"key", value);
}
}
注意,这里的时间戳是在源处生成事件(消息)的时间。
流处理管道实现
下面是流处理管道的实现。
Scala代码:
import java.sql.Timestamp
import java.time.Duration
import java.util.Properties
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaProducer, KafkaSerializationSchema}
import org.apache.flink.util.Collector
import org.apache.kafka.clients.producer.ProducerRecord
/**
* 物流公司车辆超速检测程序
*/
object FastCarsDetect {
// 定义流数据类型,这里用了伴生类和伴生对象
case class CarEvent(carId: String, speed: Int, acceleration: Double, timestamp: Long)
object CarEvent {
def apply(rawStr: String): CarEvent = {
val parts = rawStr.split(",")
CarEvent(parts(0), parts(1).toInt, parts(2).toDouble, parts(3).toLong)
}
}
// 平均速度流数据类型
case class CarAvgEvent(carId: String, avgSpeed: Double, start: String, end: String){
def getCarId:String = carId
def getAvgSpeed:Double = avgSpeed
def getStart:String = start
def getEnd:String = end
}
def main(args: Array[String]) {
// 设置流执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 启用检查点
env.enableCheckpointing(50000)
// Kafka Source
val source = KafkaSource.builder[String]
.setBootstrapServers("localhost:9092")
.setTopics("cars")
.setGroupId("group-test")
.setStartingOffsets(OffsetsInitializer.earliest)
.setValueOnlyDeserializer(new SimpleStringSchema)
.build
// Kafka Sink
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
// Kafka brokers 默认的最大事务超时(transaction.max.timeout.ms)为15 minutes
// 当使用Semantic.EXACTLY_ONCE语义时,下面这个属性值不能超过15分钟(默认为1 hour)
properties.setProperty("transaction.timeout.ms", String.valueOf(5 * 60 * 1000))
val myProducer = new FlinkKafkaProducer[CarAvgEvent](
"fastcars", // 目标topic
new ObjSerializationSchema("fastcars"), // 序列化schema
properties, // producer配置
FlinkKafkaProducer.Semantic.EXACTLY_ONCE // 容错性
)
myProducer.setWriteTimestampToKafka(true)
// 水印策略
val watermarkStrategy = WatermarkStrategy
.forBoundedOutOfOrderness[String](Duration.ofSeconds(1))
.withTimestampAssigner(new SerializableTimestampAssigner[String]() {
override def extractTimestamp(s: String, l: Long): Long = s.split(",")(3).toLong
})
env
// 指定Kafka数据源
.fromSource(source, watermarkStrategy, "from cars topic")
.map(r => CarEvent(r.toString))
.keyBy(carEvent => carEvent.carId)
// 大小为5s,滑动为2s 的滑动窗口
.window(SlidingEventTimeWindows.of(Time.seconds(5),Time.seconds(2)))
.aggregate(new AvgSpeedAggFun(), new AvgSpeedProcessFun())
.filter(carAvgEvent => carAvgEvent.avgSpeed > 120.0)
// 输出
.addSink(myProducer)
// 触发流程序执行
env.execute("Flink Kafka Source")
}
// 增量处理函数
class AvgSpeedAggFun extends AggregateFunction[CarEvent, (Double, Long), Double] {
// 创建初始ACC
override def createAccumulator() = (0.0, 0L)
// 累加每个传感器(每个分区)的事件
override def add(carEvent: CarEvent, acc: (Double, Long)) =
(carEvent.speed + acc._1, acc._2 + 1L)
// 分区合并
override def merge(acc1: (Double, Long), acc2: (Double, Long)) =
(acc1._1 + acc2._1, acc1._2 + acc2._2)
// 返回每辆车的平均速度
override def getResult(acc: (Double, Long)): Double = acc._1 / acc._2
}
// 窗口处理函数(注意这里引入的ProcessWindowFunction不要引错了java的)
// ProcessWindowFunction[IN, OUT, KEY, W]
class AvgSpeedProcessFun extends ProcessWindowFunction[Double, CarAvgEvent, String, TimeWindow] {
override def process(key: String,
context: Context,
elements: Iterable[Double],
out: Collector[CarAvgEvent]): Unit = {
// 计算平均速度
val average = Math.round(elements.iterator.next * 100) / 100.0
// 发送到下游算子case class CarAvgEvent(carId: String, avgSpeed: Double, timestamp: Timestamp)
out.collect(CarAvgEvent(key, average,
new Timestamp(context.window.getStart).toString,
new Timestamp(context.window.getEnd).toString))
}
}
// 自定义的序列化模式
// 将CarAvgEvent作为json保存到Kafka
class ObjSerializationSchema(topic: String) extends KafkaSerializationSchema[CarAvgEvent] {
private var mapper = new ObjectMapper()
override def serialize(obj: CarAvgEvent,
timestamp: java.lang.Long
): ProducerRecord[Array[Byte], Array[Byte]] = {
var b:Array[Byte] = null
try{
b = mapper.writeValueAsBytes(obj)
}catch {
case e: JsonProcessingException => // TODO
}
new ProducerRecord[Array[Byte], Array[Byte]](topic, b)
}
}
}
Java代码:
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.sql.Timestamp;
import java.time.Duration;
import java.util.Properties;
/**
* 物流公司车辆超速检测程序
*/
public class FastCarsDetect {
// POJO类,检测到的车速数据类型
public static class CarEvent {
public String carId; // 车辆id
public int speed; // 速度
public double acceleration; // 加速度
public long timestamp; // 时间戳
public CarEvent() { }
public CarEvent(String carId, int speed, double acceleration, long timestamp) {
this.carId = carId;
this.speed = speed;
this.acceleration = acceleration;
this.timestamp = timestamp;
}
@Override
public String toString() {
return "CarEvent{" +
"carId='" + carId + '\'' +
", speed=" + speed +
", acceleration=" + acceleration +
", timestamp=" + timestamp +
'}';
}
}
// POJO类,检测到的平均车速数据类型
public static class CarAvgEvent {
public String carId; // 车辆id
public double avgSpeed; // 平均速度
public String start; // 计算平均值的时间范围下限
public String end; // 计算平均值的时间范围上限
public CarAvgEvent() { }
public CarAvgEvent(String carId, double avgSpeed, String start, String end) {
this.carId = carId;
this.avgSpeed = avgSpeed;
this.start = start;
this.end = end;
}
@Override
public String toString() {
return "CarEvent{" +
"carId='" + carId + '\'' +
", avgSpeed=" + avgSpeed +
", start='" + start + '\'' +
", end='" + end + '\'' +
'}';
}
}
public static void main(String[] args) throws Exception {
// 设置流执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Kafka Source
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("localhost:9092")
.setTopics("cars")
.setGroupId("group-test")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
.build();
// Kafka Sink
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");
// Kafka brokers 默认的最大事务超时(transaction.max.timeout.ms)为15 minutes
// 当使用Semantic.EXACTLY_ONCE语义时,下面这个属性值不能超过15分钟(默认为1 hour)
properties.setProperty("transaction.timeout.ms", String.valueOf(5 * 60 * 1000));
FlinkKafkaProducer myProducer = new FlinkKafkaProducer<CarAvgEvent>(
"fastcars", // 目标topic
new ObjSerializationSchema("fastcars"), // 序列化schema
properties, // producer配置
FlinkKafkaProducer.Semantic.EXACTLY_ONCE // 容错性
);
myProducer.setWriteTimestampToKafka(true);
// 水印策略
WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy
.<String>forBoundedOutOfOrderness(Duration.ofSeconds(2))
.withTimestampAssigner(new SerializableTimestampAssigner<String>() {
@Override
public long extractTimestamp(String s, long l) {
return Long.parseLong(s.split(",")[3]);
}
});
env
// 指定Kafka数据源
.fromSource(source, watermarkStrategy, "from cars topic")
// 转换为DataStream<SensorReading>
.map(new MapFunction<String, CarEvent>() {
@Override
public CarEvent map(String s) throws Exception {
String[] fields = s.split(",");
return new CarEvent(
fields[0],
Integer.parseInt(fields[1]),
Double.parseDouble(fields[2]),
Long.parseLong(fields[3])
);
}
})
// 转换为KeyedStream
.keyBy(new KeySelector<CarEvent, String>() {
@Override
public String getKey(CarEvent carEvent) throws Exception {
return carEvent.carId;
}
})
// 大小为5s,滑动为2s 的滑动窗口
.window(SlidingEventTimeWindows.of(Time.seconds(5),Time.seconds(2)))
// 执行增量聚合
.aggregate(new AvgSpeedAggFun(),new AvgSpeedProcessFun())
.filter(new FilterFunction() {
@Override
public boolean filter(CarAvgEvent carAvgEvent) throws Exception {
return carAvgEvent.avgSpeed > 120.0;
}
})
.addSink(myProducer);
// 触发流程序执行
env.execute("Flink Sensor Temperature Demo");
}
// 增量处理函数
public static class AvgSpeedAggFun implements AggregateFunction<
CarEvent, // input
Tuple2, // acc,
Double> { // output, avg
// 创建初始ACC
@Override
public Tuple2<Double,Long> createAccumulator() {
return new Tuple2<>(0.0,0L);
}
// 累加每个传感器(每个分区)的事件
@Override
public Tuple2<Double,Long> add(CarEvent carEvent, Tuple2<Double,Long> acc) {
return new Tuple2<>(carEvent.speed + acc.f0, acc.f1 + 1);
}
// 分区合并
@Override
public Tuple2<Double,Long> merge(
Tuple2<Double,Long> acc1,
Tuple2<Double,Long> acc2) {
return new Tuple2<>(acc1.f0 + acc2.f0, acc1.f1 + acc2.f1);
}
// 返回每个车辆的平均温度
@Override
public Double getResult(Tuple2<Double,Long> t2) {
return t2.f0/t2.f1;
}
}
// 窗口处理函数
public static class AvgSpeedProcessFun extends ProcessWindowFunction<
Double, // input type
CarAvgEvent, // output type
String, // key type
TimeWindow> { // window type
@Override
public void process(
String id, // key
Context context,
Iterable<Double> events,
Collector<CarAvgEvent> out) {
double average = Math.round(events.iterator().next()*100) / 100.0;
out.collect(new CarAvgEvent(
id,
average,
new Timestamp(context.window().getStart()).toString(),
new Timestamp(context.window().getEnd()).toString())
);
}
}
// 自定义的序列化模式
public static class ObjSerializationSchema implements KafkaSerializationSchema<CarAvgEvent> {
private String topic;
private ObjectMapper mapper;
public ObjSerializationSchema(String topic) {
super();
this.topic = topic;
}
@Override
public ProducerRecord<byte[], byte[]> serialize(CarAvgEvent obj, Long timestamp) {
byte[] b = null;
if (mapper == null) {
mapper = new ObjectMapper();
}
try {
b = mapper.writeValueAsBytes(obj);
} catch (JsonProcessingException e) {
// TODO
}
return new ProducerRecord<>(topic, b);
}
}
}
程序执行步骤和结果
请按以下步骤执行。
1、启动zookeeper
$ ./bin/zookeeper-server-start.sh config/zookeeper.properties
2、启动Kafka
$ ./bin/kafka-server-start.sh config/server.properties
3、创建两个topic:
$ ./bin/kafka-topics.sh --list --zookeeper localhost:2181 $ ./bin/kafka-topics.sh --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --create --topic cars $ ./bin/kafka-topics.sh --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --create --topic fastcars
4、先在一个新的终端窗口中,执行消费者脚本,来拉取fastcars topic数据:
$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic fastcars
5、执行流计算程序
6、执行数据源程序
$ cd cars $ java -jar fastcars.jar
7、回到消息者脚本执行窗口(第4步的窗口),查看超速数据。如果一切正常,应该看到在“fastcars” topic收到的超速数据如下(部分):
......
{"carId":"car2","avgSpeed":144.0,"start":"2021-08-27 12:20:48.0","end":"2021-08-27 12:20:53.0"}
{"carId":"car7","avgSpeed":130.0,"start":"2021-08-27 12:20:48.0","end":"2021-08-27 12:20:53.0"}
{"carId":"car7","avgSpeed":130.0,"start":"2021-08-27 12:20:50.0","end":"2021-08-27 12:20:55.0"}
{"carId":"car4","avgSpeed":148.0,"start":"2021-08-27 12:20:56.0","end":"2021-08-27 12:21:01.0"}
{"carId":"car4","avgSpeed":148.0,"start":"2021-08-27 12:20:58.0","end":"2021-08-27 12:21:03.0"}
{"carId":"car4","avgSpeed":148.0,"start":"2021-08-27 12:21:00.0","end":"2021-08-27 12:21:05.0"}
{"carId":"car1","avgSpeed":126.5,"start":"2021-08-27 12:21:00.0","end":"2021-08-27 12:21:05.0"}
{"carId":"car1","avgSpeed":134.0,"start":"2021-08-27 12:21:02.0","end":"2021-08-27 12:21:07.0"}
{"carId":"car1","avgSpeed":134.0,"start":"2021-08-27 12:21:04.0","end":"2021-08-27 12:21:09.0"}
{"carId":"car7","avgSpeed":149.0,"start":"2021-08-27 12:21:12.0","end":"2021-08-27 12:21:17.0"}
{"carId":"car7","avgSpeed":149.0,"start":"2021-08-27 12:21:14.0","end":"2021-08-27 12:21:19.0"}
{"carId":"car7","avgSpeed":149.0,"start":"2021-08-27 12:21:16.0","end":"2021-08-27 12:21:21.0"}
{"carId":"car6","avgSpeed":125.0,"start":"2021-08-27 12:21:16.0","end":"2021-08-27 12:21:21.0"}
{"carId":"car1","avgSpeed":139.0,"start":"2021-08-27 12:21:20.0","end":"2021-08-27 12:21:25.0"}
{"carId":"car1","avgSpeed":139.0,"start":"2021-08-27 12:21:22.0","end":"2021-08-27 12:21:27.0"}
{"carId":"car0","avgSpeed":144.0,"start":"2021-08-27 12:21:40.0","end":"2021-08-27 12:21:45.0"}
{"carId":"car0","avgSpeed":144.0,"start":"2021-08-27 12:21:42.0","end":"2021-08-27 12:21:47.0"}
{"carId":"car0","avgSpeed":144.0,"start":"2021-08-27 12:21:44.0","end":"2021-08-27 12:21:49.0"}
{"carId":"car4","avgSpeed":130.0,"start":"2021-08-27 12:21:46.0","end":"2021-08-27 12:21:51.0"}
{"carId":"car4","avgSpeed":130.0,"start":"2021-08-27 12:21:48.0","end":"2021-08-27 12:21:53.0"}
{"carId":"car4","avgSpeed":130.0,"start":"2021-08-27 12:21:50.0","end":"2021-08-27 12:21:55.0"}
......