发布日期:2021-11-24 VIP内容

订单延迟支付告警程序

在本案例中,我们将使用Flink process function原生的状态编程,实现一个电商订单延迟支付告警程序。

主要内容包括:

  • 需求描述与分析
  • 数据格式描述
  • Scala代码参考实现
  • Java代码参考实现
  • 程序执行结果

需求描述与分析

在电商平台中,有效的订单是用户真正完成支付动作的订单。用户下单的行为可以表明用户对商品的需求,但在现实中,并不是每次下单都会被用户立刻支付。当拖延一段时间后,用户支付的意愿会降低。

所以为了让用户更有紧迫感从而提高支付转化率,同时也为了防范订单支付环节的安全风险,电商网站往往会对订单状态进行监控,设置一个失效时间(比如 15 分钟),如果下单后一段时间仍未支付,订单就会被取消。

此时需要给用户发送一个信息提醒用户,提高支付转换率!

上述需求问题可以简化成:在pay事件超时未发生的情况下,输出超时报警信息。

思路如下:

  • 在订单的create事件到来后注册定时器,15分钟后触发;
  • 用一个布尔类型的 ValueState来作为标识位,表明pay事件是否发生过;
  • 如果pay事件已经发生,状态被置为true,那么就不再需要做什么操作;
  • 而如果pay事件一直没来,状态一直为false,到定时器触发时,就应该输出超时报警信息。

示例数据

我们提供的样例数据,位于PBLP平台的~/data/flink/目录下。

以下为其中部分订单事件数据格式,各字段含义分别为:用户ID,订单状态,订单ID,时间戳。

34729,create,,1558430842
......        
34732,pay,,1558449999 

Scala代码实现参考

import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector

object OrderTimeoutDemo {

  // 定义输入订单事件的样例类
  case class OrderEvent(orderId: Long, eventType: String, txId: String, eventTime: Long)

  // 定义输出结果样例类
  case class OrderResult(orderId: Long, resultMsg: String)

  // 超时标签
  val orderTimeoutOutputTag = new OutputTag[OrderResult]("orderTimeout")

  def main(args: Array[String]) {
    // 设置流执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    env.setParallelism(1)

    val orderEventStream = env.socketTextStream("localhost", 9999)
      .map(data => {
        val dataArray = data.split(",")
        // 构造为OrderEvent对象
        OrderEvent(
          dataArray(0).trim.toLong,
          dataArray(1).trim, dataArray(2).trim,
          dataArray(3).trim.toLong
        )
      })
      .assignAscendingTimestamps(_.eventTime * 1000L)
      .keyBy(_.orderId)

    val orderResultStream = orderEventStream.process(new OrderPayMatch())

    // 主数据流:支付的数据
    orderResultStream.print("payed")

    // 侧输出流:支付超时的数据
    orderResultStream.getSideOutput(orderTimeoutOutputTag).print("time out order")

    // 触发流程序执行
    env.execute("order timeout ob")
  }

  // 自定义的KeyedProcessFunction
  class OrderPayMatch() extends KeyedProcessFunction[Long, OrderEvent, OrderResult]() {
    // 支付状态
    lazy val isPayedState: ValueState[Boolean] = getRuntimeContext.getState(
        new ValueStateDescriptor[Boolean]("is-payed-state", classOf[Boolean]))

    // 定时器时间
    lazy val timerState: ValueState[Long] = getRuntimeContext.getState(
      new ValueStateDescriptor[Long]("timer-state", classOf[Long]))

    // 对于流中每个元素,调用以下方法
    override def processElement(value: OrderEvent,
                                ctx: KeyedProcessFunction[Long, OrderEvent, OrderResult]#Context,
                                out: Collector[OrderResult]): Unit = {
      val isPayed = isPayedState.value()    	// 获取支付状态
      val timerTs = timerState.value()      	// 获取超时定时器时间

      // 1) 如果是订单创建行为
      if (value.eventType == "create") {
        // 1.1) 如果是乱序,pay信息先到,create信息后到
        if (isPayed) {
          // 发送支付成功信息
          out.collect(OrderResult(value.orderId, "payed successfully"))
          // 撤销超时告警定时器
          ctx.timerService().deleteEventTimeTimer(timerTs)
          // 清除状态(下单、支付已结束)
          isPayedState.clear()
          timerState.clear()
        }
        // 1.2) 如果是已经创建订单,但尚未支付
        else {
          // 注册支付超时告警定时器(15分钟)
          val ts = value.eventTime * 1000L + 15 * 60 * 1000L
          // 注册定时器
          ctx.timerService().registerEventTimeTimer(ts)
          // 将定时器时间保存到状态后端
          timerState.update(ts)
        }
      }
      // 2)如果是支付行为
      else if (value.eventType == "pay") {
        // 2.1) 如果存在有定时器时间状态,说明有订单已创建但尚未支付
        if (timerTs > 0) {
          // 支付时间在设置的超时范围内(不超过15分钟)
          if (timerTs > value.eventTime * 1000L) {
            // 发送支付成功信息
            out.collect(OrderResult(value.orderId, "payed successfully"))
          }
          // 支付时间超时了,无效
          else {
            // 发送到侧输出流
            ctx.output(orderTimeoutOutputTag, OrderResult(value.orderId, "this order is timeout"))
          }
          // 删除定时器
          ctx.timerService().deleteEventTimeTimer(timerTs)
          // 清除状态
          isPayedState.clear()
          timerState.clear()
        }
        // 2.2) 如果没有定时器状态存在,说明支付数据先到,订单创建信息还没到(发生了乱序)
        else {
          // 先把支付信息存储到状态中
          isPayedState.update(true)
          // 注册定时器,并等待watermark时间
          ctx.timerService().registerEventTimeTimer(value.eventTime * 1000L)
          // 将定时器时间保存到状态
          timerState.update(value.eventTime * 1000L)
        }
      }
    } // end processElement

    // 定时器
    override def onTimer(timestamp: Long,
                         ctx: KeyedProcessFunction[Long, OrderEvent, OrderResult]#OnTimerContext,
                         out: Collector[OrderResult]): Unit = {
      // 检索支付状态
      val isPayed = isPayedState.value()
      // 如果已支付,说明订单创建信息丢失
      if (isPayed) {
        ctx.output(orderTimeoutOutputTag, OrderResult(ctx.getCurrentKey, "payed but no create"))
      }
      // 如果未支付,说明用户虽然创建了订单,但是放弃了支付
      else {
        ctx.output(orderTimeoutOutputTag, OrderResult(ctx.getCurrentKey, "order timeout"))
      }
      // 清除状态
      isPayedState.clear()
      timerState.clear()
    }

  } // end ProcessFunction

}

Java代码参考实现

import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

public class OrderTimeoutDemo {

	// 定义输入订单事件的样例类
	public static class OrderEvent{
		public long orderId;
		public String eventType;
		public String txId;
		public long eventTime;

		public OrderEvent(){}

		public OrderEvent(long orderId, String eventType, String txId, long eventTime) {
			this.orderId = orderId;
			this.eventType = eventType;
			this.txId = txId;
			this.eventTime = eventTime;
		}
	}

	// 定义输出结果样例类
	public static class OrderResult{
		public long orderId;
		public String resultMsg;

		public OrderResult() {
		}

		public OrderResult(long orderId, String resultMsg) {
			this.orderId = orderId;
			this.resultMsg = resultMsg;
		}

		@Override
		public String toString() {
			return "OrderResult{" +orderId + ", " + resultMsg + "}";
		}
	}

	// 超时标签(注意这里:因为泛型类型擦除的原因,因此需要通过{}重写这个类)
	final static OutputTag<OrderResult> orderTimeoutOutputTag = new OutputTag<OrderResult>("orderTimeout"){};

	public static void main(String[] args) throws Exception {
		// 设置流执行环境
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		// 设置并行度为1
		env.setParallelism(1);

		KeyedStream<OrderEvent,Long> orderEventStream = env
				.socketTextStream("localhost", 9999)
				.map(new MapFunction<String, OrderEvent>() {
					@Override
					public OrderEvent map(String s) throws Exception {
						String[] dataArray = s.split(",");
						// 构造为OrderEvent对象
						OrderEvent orderEvent = new OrderEvent(
								Long.parseLong(dataArray[0].trim()),
								dataArray[1].trim(),
								dataArray[2].trim(),
								Long.parseLong(dataArray[3].trim())
							);
						return orderEvent;
					}
				})
				.assignTimestampsAndWatermarks(WatermarkStrategy
						.<OrderEvent>forMonotonousTimestamps()
						.withTimestampAssigner(new SerializableTimestampAssigner<OrderEvent>() {
							@Override
							public long extractTimestamp(OrderEvent orderEvent, long l) {
								return orderEvent.eventTime;
							}
						})
				)
				.keyBy(new KeySelector<OrderEvent, Long>() {
					@Override
					public Long getKey(OrderEvent orderEvent) throws Exception {
						return orderEvent.orderId;
					}
				});

		// 执行低层的KeyedProcessFunction处理
		SingleOutputStreamOperator orderResultStream = orderEventStream.process(new OrderPayProcessFunction());

		// 主数据流:支付的数据
		orderResultStream.print("payed");

		// 侧输出流:支付超时的数据
		orderResultStream.getSideOutput(orderTimeoutOutputTag).print("time out order");

		// 触发流程序执行
		env.execute("Flink Streaming Java API Skeleton");
	}

	// 自定义ProcessFunction
	public static class OrderPayProcessFunction
			extends KeyedProcessFunction<Long, OrderEvent, OrderResult>{

		// 声明两个状态存储
		private ValueState<Boolean> isPayedState = null;
		private ValueState<Long> timerState = null;

		@Override
		public void open(Configuration parameters) throws Exception {
			super.open(parameters);
			isPayedState = getRuntimeContext()
					.getState(new ValueStateDescriptor<>("is-payed-state", Types.BOOLEAN));
			timerState = getRuntimeContext()
					.getState(new ValueStateDescriptor<>("timer-state", Types.LONG));
		}

		@Override
		public void processElement(OrderEvent value,
					   Context ctx,
					   Collector<OrderResult> out) throws Exception {
			Boolean isPayed = isPayedState.value();    	// 获取支付状态
			Long timerTs = timerState.value();      	// 获取超时定时器时间
			// 1) 如果是订单创建行为
			if ("create".equals(value.eventType)) {
				// 1.1) 如果是乱序,pay信息先到,create信息后到
				if (isPayed != null) {
					// 发送支付成功信息
					out.collect(new OrderResult(value.orderId, "payed successfully"));
					// 撤销超时告警定时器
					ctx.timerService().deleteEventTimeTimer(timerTs);
					// 清除状态(下单、支付已结束)
					isPayedState.clear();
					timerState.clear();
				}
				// 1.2) 如果是已经创建订单,但尚未支付
				else {
					// 注册支付超时告警定时器(15分钟)
					long ts = value.eventTime * 1000L + 15 * 60 * 1000L;
					// 注册定时器
					ctx.timerService().registerEventTimeTimer(ts);
					// 将定时器时间保存到状态后端
					timerState.update(ts);
				}
			}
			// 2)如果是支付行为
			else if ("pay".equals(value.eventType)) {
				// 2.1) 如果存在有定时器时间状态,说明有订单已创建但尚未支付
				if (timerTs > 0) {
					// 支付时间在设置的超时范围内(不超过15分钟)
					if (timerTs > value.eventTime * 1000L) {
						// 发送支付成功信息
						out.collect(new OrderResult(value.orderId, "payed successfully"));
					}
					// 支付时间超时了,无效
					else {
						// 发送到侧输出流
						ctx.output(orderTimeoutOutputTag, new OrderResult(value.orderId, "this order is timeout"));
					}
					// 删除定时器
					ctx.timerService().deleteEventTimeTimer(timerTs);
					// 清除状态
					isPayedState.clear();
					timerState.clear();
				}
				// 2.2) 如果没有定时器状态存在,说明支付数据先到,订单创建信息还没到(发生了乱序)
				else {
					// 先把支付信息存储到状态中
					isPayedState.update(true);
					// 注册定时器,并等待watermark时间
					ctx.timerService().registerEventTimeTimer(value.eventTime * 1000L);
					// 将定时器时间保存到状态
					timerState.update(value.eventTime * 1000L);
				}
			}
		}

		@Override
		public void onTimer(long timestamp,
				    OnTimerContext ctx,
				    Collector<OrderResult> out) throws Exception {
			// 检索支付状态
			Boolean isPayed = isPayedState.value();
			// 如果已支付,说明订单创建信息丢失
			if (isPayed != null) {
				ctx.output(orderTimeoutOutputTag, new OrderResult(ctx.getCurrentKey(), "payed but no create"));
			}
			// 如果未支付,说明用户虽然创建了订单,但是放弃了支付
			else {
				ctx.output(orderTimeoutOutputTag, new OrderResult(ctx.getCurrentKey(), "order timeout"));
			}

			// 清除状态
			isPayedState.clear();
			timerState.clear();
		}
	}
}

执行程序查看结果

首先运行Flink流处理程序。

然后在终端执行以下命令,发送订单数据:

$ cat order.txt | nc -lk 9999

查看Flink程序处理结果,输出内容如下所示:

payed> OrderResult(34729,payed successfully)
payed> OrderResult(34731,payed successfully)
payed> OrderResult(34733,payed successfully)
time out order> OrderResult(34732,this order is timeout)
time out order> OrderResult(34730,order timeout)
time out order> OrderResult(34734,order timeout)
time out order> OrderResult(34734,order timeout)