发布日期:2021-11-24 VIP内容

运输公司车辆超速实时监测

在本节我们将实现一个Flink实时流应用程序,用来监测运输车辆是否超速,并在超速时给出报警信息。

主要内容包括:

  • 需求说明
  • 自定义数据源实现(Scala、Java)
  • 流处理管道实现(Scala、Java)
  • 程序执行步骤和结果

需求说明

让我们想象一个物流公司车队管理解决方案,其中车队中的车辆启用了无线网络功能。每辆车定期报告其地理位置和许多操作参数,如燃油水平、速度、加速度、轴承、发动机温度等。物流公司希望利用这一遥测数据流来实现一系列应用程序,以帮助公司提高业务运营和财务方面的管理效率。

假设公司现在希望我们实现其中的车辆超速实时监测功能,用来检查运输车辆是否超速。为此我们将创建一个简单的Flink实时流应用程序来计算车辆每几秒钟的平均速度。

在这个案例中,我们使用Kafka作为流数据源,将从Kafka的“cars”主题来读取这些事件。同时也将Kafka作为流的Data Sink,检测出的超速事件将写入到Kafka的“fastcars”主题。

自定义数据源

为了模拟车辆向我们发送传感器数据,我们将创建一个Kafka producer,它将id、speed、acceleration 和timestamp写入Kafka的“cars”主题。

Scala代码:

import java.util.Properties

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}

import scala.annotation.tailrec
import scala.util.{Random => r}

object RandomCarsKafkaProducer {

  def main(args: Array[String]): Unit = {
    val props = new Properties()
    props.put("bootstrap.servers", "localhost:9092")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

    val producer = new KafkaProducer[String, String](props)
    val interval = 1000
    val topic = "cars"
    val numRecsToProduce: Option[Int] = None		// None = infinite
    // val numRecsToProduce: Option[Int] = 1000		// 连续产生1000条数据

    @tailrec
    def produceRecord(numRecToProduce: Option[Int]): Unit = {
      // 每次调用下面这个方法,发送一条车辆行驶数据给Kafka "cars" topic
      def generateCarRecord(topic: String): ProducerRecord[String, String] = {
        val carName = s"car${r.nextInt(10)}"
        val speed = r.nextInt(150)
        val acc = r.nextFloat * 100

        val value = s"$carName,$speed,$acc,${System.currentTimeMillis()}"
        print(s"Writing $value\n")
        val d = r.nextFloat() * 100
        if (d < 2) {
          // 产生随机延迟
          println("抱歉! 有一些网络延迟!")
          Thread.sleep((d*100).toLong)
        }
        new ProducerRecord[String, String](topic,"key", value)
      }

      numRecToProduce match {
        case Some(x) if x > 0 ⇒
	  // 生成一条数据,发送一条数据
          producer.send(generateCarRecord(topic))
          Thread.sleep(interval)
          produceRecord(Some(x - 1))

        case None ⇒
          producer.send(generateCarRecord(topic))
          Thread.sleep(interval)
          produceRecord(None)

        case _ ⇒
      }
    }

    produceRecord(numRecsToProduce)
  }

}

Java代码:

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.Random;

/**
 *
 * 模拟车辆传感器数据源 - 写Kafka
 */
public class RandomCarsKafkaProducer {

    private static final String TOPIC_CARS = "cars";      // topic

    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer producer = new KafkaProducer<>(props);
        // int numRecsToProduce = -1;		// -1 = infinite
        int numRecsToProduce = 1000;		// 连续产生1000条数据

        produceRecord(producer,numRecsToProduce);
    }

    private static void produceRecord(KafkaProducer<String, String> producer, int recordNum){
        int interval = 1000;

        // 生产有限数据记录
        if(recordNum > 0){
            // 生成一条数据,发送一条数据
            producer.send(generateCarRecord(TOPIC_CARS));
            try {
                Thread.sleep(interval);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            produceRecord(producer, recordNum - 1);
        }
        // 生产无限数据记录
        else if(recordNum < 0){
            producer.send(generateCarRecord(TOPIC_CARS));
            try {
                Thread.sleep(interval);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            produceRecord(producer, -1);
        }
    }

    // 生成一条车辆监测信息的方法
    private static ProducerRecord<String, String> generateCarRecord(String topic){
        Random r = new Random();
        String carName = "car" +r.nextInt(10);
        int speed = r.nextInt(150);
        float acc = r.nextFloat() * 100;
        long ts = System.currentTimeMillis();

        String value = carName + "," + speed + "," + acc + "," + ts;
        System.out.println("==Writing==:" + value);
        float d = r.nextFloat() * 100;
        if (d < 2) {
            // 产生随机延迟
            System.out.println("抱歉! 有一些网络延迟!");
            try {
                Thread.sleep(Float.valueOf(d*100).longValue());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        return new ProducerRecord<>(topic,"key", value);
    }
}

注意,这里的时间戳是在源处生成事件(消息)的时间。

流处理管道实现

下面是流处理管道的实现。

Scala代码:

import java.sql.Timestamp
import java.time.Duration
import java.util.Properties

import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.functions.AggregateFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaProducer, KafkaSerializationSchema}
import org.apache.flink.util.Collector
import org.apache.kafka.clients.producer.ProducerRecord

/**
 * 物流公司车辆超速检测程序
 */
object FastCarsDetect {

  // 定义流数据类型,这里用了伴生类和伴生对象
  case class CarEvent(carId: String, speed: Int, acceleration: Double, timestamp: Long)
  object CarEvent {
    def apply(rawStr: String): CarEvent = {
      val parts = rawStr.split(",")
      CarEvent(parts(0), parts(1).toInt, parts(2).toDouble, parts(3).toLong)
    }
  }

  // 平均速度流数据类型
  case class CarAvgEvent(carId: String, avgSpeed: Double, start: String, end: String){

    def getCarId:String = carId
    def getAvgSpeed:Double = avgSpeed
    def getStart:String = start
    def getEnd:String = end
  }

  def main(args: Array[String]) {
    // 设置流执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 启用检查点
    env.enableCheckpointing(50000)

    // Kafka Source
    val source = KafkaSource.builder[String]
      .setBootstrapServers("localhost:9092")
      .setTopics("cars")
      .setGroupId("group-test")
      .setStartingOffsets(OffsetsInitializer.earliest)
      .setValueOnlyDeserializer(new SimpleStringSchema)
      .build

    // Kafka Sink
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
    // Kafka brokers 默认的最大事务超时(transaction.max.timeout.ms)为15 minutes
    // 当使用Semantic.EXACTLY_ONCE语义时,下面这个属性值不能超过15分钟(默认为1 hour)
    properties.setProperty("transaction.timeout.ms", String.valueOf(5 * 60 * 1000))
    val myProducer = new FlinkKafkaProducer[CarAvgEvent](
      "fastcars",                                       // 目标topic
      new ObjSerializationSchema("fastcars"),           // 序列化schema
      properties,                                       // producer配置
      FlinkKafkaProducer.Semantic.EXACTLY_ONCE          // 容错性
    )

    myProducer.setWriteTimestampToKafka(true)

    // 水印策略
    val watermarkStrategy = WatermarkStrategy
      .forBoundedOutOfOrderness[String](Duration.ofSeconds(1))
      .withTimestampAssigner(new SerializableTimestampAssigner[String]() {
        override def extractTimestamp(s: String, l: Long): Long = s.split(",")(3).toLong
      })

    env
      // 指定Kafka数据源
      .fromSource(source, watermarkStrategy, "from cars topic")
      .map(r => CarEvent(r.toString))
      .keyBy(carEvent => carEvent.carId)
      // 大小为5s,滑动为2s 的滑动窗口
      .window(SlidingEventTimeWindows.of(Time.seconds(5),Time.seconds(2)))
      .aggregate(new AvgSpeedAggFun(), new AvgSpeedProcessFun())
      .filter(carAvgEvent => carAvgEvent.avgSpeed > 120.0)
      // 输出
      .addSink(myProducer)

    // 触发流程序执行
    env.execute("Flink Kafka Source")
  }

  // 增量处理函数
  class AvgSpeedAggFun extends AggregateFunction[CarEvent, (Double, Long), Double] {
    // 创建初始ACC
    override def createAccumulator() = (0.0, 0L)

    // 累加每个传感器(每个分区)的事件
    override def add(carEvent: CarEvent, acc: (Double, Long)) =
      (carEvent.speed + acc._1, acc._2 + 1L)

    // 分区合并
    override def merge(acc1: (Double, Long), acc2: (Double, Long)) =
      (acc1._1 + acc2._1, acc1._2 + acc2._2)

    // 返回每辆车的平均速度
    override def getResult(acc: (Double, Long)): Double = acc._1 / acc._2
  }

  // 窗口处理函数(注意这里引入的ProcessWindowFunction不要引错了java的)
  // ProcessWindowFunction[IN, OUT, KEY, W]
  class AvgSpeedProcessFun extends ProcessWindowFunction[Double, CarAvgEvent, String, TimeWindow] {
    override def process(key: String,
                         context: Context,
                         elements: Iterable[Double],
                         out: Collector[CarAvgEvent]): Unit = {
      // 计算平均速度
      val average = Math.round(elements.iterator.next * 100) / 100.0
      // 发送到下游算子case class CarAvgEvent(carId: String, avgSpeed: Double, timestamp: Timestamp)
      out.collect(CarAvgEvent(key, average,
        new Timestamp(context.window.getStart).toString,
        new Timestamp(context.window.getEnd).toString))
    }
  }

  // 自定义的序列化模式
  // 将CarAvgEvent作为json保存到Kafka
  class ObjSerializationSchema(topic: String) extends KafkaSerializationSchema[CarAvgEvent] {
    private var mapper = new ObjectMapper()

    override def serialize(obj: CarAvgEvent,
                            timestamp: java.lang.Long
                          ): ProducerRecord[Array[Byte], Array[Byte]] = {
      var b:Array[Byte] = null
      try{
        b = mapper.writeValueAsBytes(obj)
      }catch {
        case e: JsonProcessingException => // TODO
      }
      new ProducerRecord[Array[Byte], Array[Byte]](topic, b)
    }
  }
}

Java代码:

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.sql.Timestamp;
import java.time.Duration;
import java.util.Properties;

/**
 * 物流公司车辆超速检测程序
 */
public class FastCarsDetect {

	// POJO类,检测到的车速数据类型
	public static class CarEvent {

		public String carId;           	// 车辆id
		public int speed;      		// 速度
		public double acceleration;  	// 加速度
		public long timestamp;		// 时间戳

		public CarEvent() { }

		public CarEvent(String carId, int speed, double acceleration, long timestamp) {
			this.carId = carId;
			this.speed = speed;
			this.acceleration = acceleration;
			this.timestamp = timestamp;
		}

		@Override
		public String toString() {
			return "CarEvent{" +
					"carId='" + carId + '\'' +
					", speed=" + speed +
					", acceleration=" + acceleration +
					", timestamp=" + timestamp +
					'}';
		}
	}

	// POJO类,检测到的平均车速数据类型
	public static class CarAvgEvent {

		public String carId;           		// 车辆id
		public double avgSpeed;      		// 平均速度
		public String start;  				// 计算平均值的时间范围下限
		public String end;				// 计算平均值的时间范围上限

		public CarAvgEvent() { }

		public CarAvgEvent(String carId, double avgSpeed, String start, String end) {
			this.carId = carId;
			this.avgSpeed = avgSpeed;
			this.start = start;
			this.end = end;
		}

		@Override
		public String toString() {
			return "CarEvent{" +
					"carId='" + carId + '\'' +
					", avgSpeed=" + avgSpeed +
					", start='" + start + '\'' +
					", end='" + end + '\'' +
					'}';
		}
	}

	public static void main(String[] args) throws Exception {
		// 设置流执行环境
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		// Kafka Source
		KafkaSource<String> source = KafkaSource.<String>builder()
				.setBootstrapServers("localhost:9092")
				.setTopics("cars")
				.setGroupId("group-test")
				.setStartingOffsets(OffsetsInitializer.earliest())
				.setValueOnlyDeserializer(new SimpleStringSchema())
				.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
				.build();

		// Kafka Sink
		Properties properties = new Properties();
		properties.setProperty("bootstrap.servers", "localhost:9092");
		// Kafka brokers 默认的最大事务超时(transaction.max.timeout.ms)为15 minutes
		// 当使用Semantic.EXACTLY_ONCE语义时,下面这个属性值不能超过15分钟(默认为1 hour)
		properties.setProperty("transaction.timeout.ms", String.valueOf(5 * 60 * 1000));
		FlinkKafkaProducer myProducer = new FlinkKafkaProducer<CarAvgEvent>(
				"fastcars",                                       // 目标topic
				new ObjSerializationSchema("fastcars"),           // 序列化schema
				properties,                                       // producer配置
				FlinkKafkaProducer.Semantic.EXACTLY_ONCE          // 容错性
    	        );

		myProducer.setWriteTimestampToKafka(true);

		// 水印策略
		WatermarkStrategy<String> watermarkStrategy = WatermarkStrategy
				.<String>forBoundedOutOfOrderness(Duration.ofSeconds(2))
				.withTimestampAssigner(new SerializableTimestampAssigner<String>() {
					@Override
					public long extractTimestamp(String s, long l) {
						return Long.parseLong(s.split(",")[3]);
					}
				});

		env
				// 指定Kafka数据源
				.fromSource(source, watermarkStrategy, "from cars topic")
				// 转换为DataStream<SensorReading>
				.map(new MapFunction<String, CarEvent>() {
					@Override
					public CarEvent map(String s) throws Exception {
						String[] fields = s.split(",");
						return new CarEvent(
								fields[0],
								Integer.parseInt(fields[1]),
								Double.parseDouble(fields[2]),
								Long.parseLong(fields[3])
						);
					}
				})
				// 转换为KeyedStream
				.keyBy(new KeySelector<CarEvent, String>() {
					@Override
					public String getKey(CarEvent carEvent) throws Exception {
						return carEvent.carId;
					}
				})
				// 大小为5s,滑动为2s 的滑动窗口
				.window(SlidingEventTimeWindows.of(Time.seconds(5),Time.seconds(2)))
				// 执行增量聚合
				.aggregate(new AvgSpeedAggFun(),new AvgSpeedProcessFun())
				.filter(new FilterFunction() {
					@Override
					public boolean filter(CarAvgEvent carAvgEvent) throws Exception {
						return carAvgEvent.avgSpeed > 120.0;
					}
				})
				.addSink(myProducer);

		// 触发流程序执行
		env.execute("Flink Sensor Temperature Demo");
	}

	// 增量处理函数
	public static class AvgSpeedAggFun implements AggregateFunction<
				CarEvent, 	// input
                                Tuple2, 		// acc, 
				Double> {	// output, avg

		// 创建初始ACC
		@Override
		public Tuple2<Double,Long> createAccumulator() {
			return new Tuple2<>(0.0,0L);
		}

		// 累加每个传感器(每个分区)的事件
		@Override
		public Tuple2<Double,Long> add(CarEvent carEvent, Tuple2<Double,Long> acc) {
			return new Tuple2<>(carEvent.speed + acc.f0, acc.f1 + 1);
		}

		// 分区合并
		@Override
		public Tuple2<Double,Long> merge(
				Tuple2<Double,Long> acc1,
				Tuple2<Double,Long> acc2) {
			return new Tuple2<>(acc1.f0 + acc2.f0, acc1.f1 + acc2.f1);
		}

		// 返回每个车辆的平均温度
		@Override
		public Double getResult(Tuple2<Double,Long> t2) {
			return t2.f0/t2.f1;
		}
	}

	// 窗口处理函数
	public static class AvgSpeedProcessFun extends ProcessWindowFunction<
                Double,             			// input type
		CarAvgEvent,  				// output type
                String,                          	// key type
                TimeWindow> {                   	// window type

		@Override
		public void process(
				String id,							// key
				Context context,
				Iterable<Double> events,
				Collector<CarAvgEvent> out) {
			double average = Math.round(events.iterator().next()*100) / 100.0;
			out.collect(new CarAvgEvent(
					id,
					average,
					new Timestamp(context.window().getStart()).toString(),
					new Timestamp(context.window().getEnd()).toString())
			);
		}
	}

	// 自定义的序列化模式
	public static class ObjSerializationSchema implements KafkaSerializationSchema<CarAvgEvent> {

		private String topic;
		private ObjectMapper mapper;

		public ObjSerializationSchema(String topic) {
			super();
			this.topic = topic;
		}

		@Override
		public ProducerRecord<byte[], byte[]> serialize(CarAvgEvent obj, Long timestamp) {
			byte[] b = null;
			if (mapper == null) {
				mapper = new ObjectMapper();
			}
			try {
				b = mapper.writeValueAsBytes(obj);
			} catch (JsonProcessingException e) {
				// TODO
			}
			return new ProducerRecord<>(topic, b);
		}
	}
}

程序执行步骤和结果

请按以下步骤执行。

1、启动zookeeper

$ ./bin/zookeeper-server-start.sh config/zookeeper.properties

2、启动Kafka

$ ./bin/kafka-server-start.sh config/server.properties

3、创建两个topic:

$ ./bin/kafka-topics.sh --list --zookeeper localhost:2181
$ ./bin/kafka-topics.sh --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --create --topic cars
$ ./bin/kafka-topics.sh --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --create --topic fastcars

4、先在一个新的终端窗口中,执行消费者脚本,来拉取fastcars topic数据:

$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic fastcars

5、执行流计算程序

6、执行数据源程序

$ cd cars
$ java -jar fastcars.jar

7、回到消息者脚本执行窗口(第4步的窗口),查看超速数据。如果一切正常,应该看到在“fastcars” topic收到的超速数据如下(部分):

......
{"carId":"car2","avgSpeed":144.0,"start":"2021-08-27 12:20:48.0","end":"2021-08-27 12:20:53.0"}
{"carId":"car7","avgSpeed":130.0,"start":"2021-08-27 12:20:48.0","end":"2021-08-27 12:20:53.0"}
{"carId":"car7","avgSpeed":130.0,"start":"2021-08-27 12:20:50.0","end":"2021-08-27 12:20:55.0"}
{"carId":"car4","avgSpeed":148.0,"start":"2021-08-27 12:20:56.0","end":"2021-08-27 12:21:01.0"}
{"carId":"car4","avgSpeed":148.0,"start":"2021-08-27 12:20:58.0","end":"2021-08-27 12:21:03.0"}
{"carId":"car4","avgSpeed":148.0,"start":"2021-08-27 12:21:00.0","end":"2021-08-27 12:21:05.0"}
{"carId":"car1","avgSpeed":126.5,"start":"2021-08-27 12:21:00.0","end":"2021-08-27 12:21:05.0"}
{"carId":"car1","avgSpeed":134.0,"start":"2021-08-27 12:21:02.0","end":"2021-08-27 12:21:07.0"}
{"carId":"car1","avgSpeed":134.0,"start":"2021-08-27 12:21:04.0","end":"2021-08-27 12:21:09.0"}
{"carId":"car7","avgSpeed":149.0,"start":"2021-08-27 12:21:12.0","end":"2021-08-27 12:21:17.0"}
{"carId":"car7","avgSpeed":149.0,"start":"2021-08-27 12:21:14.0","end":"2021-08-27 12:21:19.0"}
{"carId":"car7","avgSpeed":149.0,"start":"2021-08-27 12:21:16.0","end":"2021-08-27 12:21:21.0"}
{"carId":"car6","avgSpeed":125.0,"start":"2021-08-27 12:21:16.0","end":"2021-08-27 12:21:21.0"}
{"carId":"car1","avgSpeed":139.0,"start":"2021-08-27 12:21:20.0","end":"2021-08-27 12:21:25.0"}
{"carId":"car1","avgSpeed":139.0,"start":"2021-08-27 12:21:22.0","end":"2021-08-27 12:21:27.0"}
{"carId":"car0","avgSpeed":144.0,"start":"2021-08-27 12:21:40.0","end":"2021-08-27 12:21:45.0"}
{"carId":"car0","avgSpeed":144.0,"start":"2021-08-27 12:21:42.0","end":"2021-08-27 12:21:47.0"}
{"carId":"car0","avgSpeed":144.0,"start":"2021-08-27 12:21:44.0","end":"2021-08-27 12:21:49.0"}
{"carId":"car4","avgSpeed":130.0,"start":"2021-08-27 12:21:46.0","end":"2021-08-27 12:21:51.0"}
{"carId":"car4","avgSpeed":130.0,"start":"2021-08-27 12:21:48.0","end":"2021-08-27 12:21:53.0"}
{"carId":"car4","avgSpeed":130.0,"start":"2021-08-27 12:21:50.0","end":"2021-08-27 12:21:55.0"}
......