发布日期:2021-11-24
VIP内容
订单延迟支付告警程序
在本案例中,我们将使用Flink process function原生的状态编程,实现一个电商订单延迟支付告警程序。
主要内容包括:
- 需求描述与分析
- 数据格式描述
- Scala代码参考实现
- Java代码参考实现
- 程序执行结果
需求描述与分析
在电商平台中,有效的订单是用户真正完成支付动作的订单。用户下单的行为可以表明用户对商品的需求,但在现实中,并不是每次下单都会被用户立刻支付。当拖延一段时间后,用户支付的意愿会降低。
所以为了让用户更有紧迫感从而提高支付转化率,同时也为了防范订单支付环节的安全风险,电商网站往往会对订单状态进行监控,设置一个失效时间(比如 15 分钟),如果下单后一段时间仍未支付,订单就会被取消。
此时需要给用户发送一个信息提醒用户,提高支付转换率!
上述需求问题可以简化成:在pay事件超时未发生的情况下,输出超时报警信息。
思路如下:
- 在订单的create事件到来后注册定时器,15分钟后触发;
- 用一个布尔类型的 ValueState来作为标识位,表明pay事件是否发生过;
- 如果pay事件已经发生,状态被置为true,那么就不再需要做什么操作;
- 而如果pay事件一直没来,状态一直为false,到定时器触发时,就应该输出超时报警信息。
示例数据
我们提供的样例数据,位于PBLP平台的~/data/flink/目录下。
以下为其中部分订单事件数据格式,各字段含义分别为:用户ID,订单状态,订单ID,时间戳。
34729,create,,1558430842 ...... 34732,pay,,1558449999
Scala代码实现参考
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
object OrderTimeoutDemo {
// 定义输入订单事件的样例类
case class OrderEvent(orderId: Long, eventType: String, txId: String, eventTime: Long)
// 定义输出结果样例类
case class OrderResult(orderId: Long, resultMsg: String)
// 超时标签
val orderTimeoutOutputTag = new OutputTag[OrderResult]("orderTimeout")
def main(args: Array[String]) {
// 设置流执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val orderEventStream = env.socketTextStream("localhost", 9999)
.map(data => {
val dataArray = data.split(",")
// 构造为OrderEvent对象
OrderEvent(
dataArray(0).trim.toLong,
dataArray(1).trim, dataArray(2).trim,
dataArray(3).trim.toLong
)
})
.assignAscendingTimestamps(_.eventTime * 1000L)
.keyBy(_.orderId)
val orderResultStream = orderEventStream.process(new OrderPayMatch())
// 主数据流:支付的数据
orderResultStream.print("payed")
// 侧输出流:支付超时的数据
orderResultStream.getSideOutput(orderTimeoutOutputTag).print("time out order")
// 触发流程序执行
env.execute("order timeout ob")
}
// 自定义的KeyedProcessFunction
class OrderPayMatch() extends KeyedProcessFunction[Long, OrderEvent, OrderResult]() {
// 支付状态
lazy val isPayedState: ValueState[Boolean] = getRuntimeContext.getState(
new ValueStateDescriptor[Boolean]("is-payed-state", classOf[Boolean]))
// 定时器时间
lazy val timerState: ValueState[Long] = getRuntimeContext.getState(
new ValueStateDescriptor[Long]("timer-state", classOf[Long]))
// 对于流中每个元素,调用以下方法
override def processElement(value: OrderEvent,
ctx: KeyedProcessFunction[Long, OrderEvent, OrderResult]#Context,
out: Collector[OrderResult]): Unit = {
val isPayed = isPayedState.value() // 获取支付状态
val timerTs = timerState.value() // 获取超时定时器时间
// 1) 如果是订单创建行为
if (value.eventType == "create") {
// 1.1) 如果是乱序,pay信息先到,create信息后到
if (isPayed) {
// 发送支付成功信息
out.collect(OrderResult(value.orderId, "payed successfully"))
// 撤销超时告警定时器
ctx.timerService().deleteEventTimeTimer(timerTs)
// 清除状态(下单、支付已结束)
isPayedState.clear()
timerState.clear()
}
// 1.2) 如果是已经创建订单,但尚未支付
else {
// 注册支付超时告警定时器(15分钟)
val ts = value.eventTime * 1000L + 15 * 60 * 1000L
// 注册定时器
ctx.timerService().registerEventTimeTimer(ts)
// 将定时器时间保存到状态后端
timerState.update(ts)
}
}
// 2)如果是支付行为
else if (value.eventType == "pay") {
// 2.1) 如果存在有定时器时间状态,说明有订单已创建但尚未支付
if (timerTs > 0) {
// 支付时间在设置的超时范围内(不超过15分钟)
if (timerTs > value.eventTime * 1000L) {
// 发送支付成功信息
out.collect(OrderResult(value.orderId, "payed successfully"))
}
// 支付时间超时了,无效
else {
// 发送到侧输出流
ctx.output(orderTimeoutOutputTag, OrderResult(value.orderId, "this order is timeout"))
}
// 删除定时器
ctx.timerService().deleteEventTimeTimer(timerTs)
// 清除状态
isPayedState.clear()
timerState.clear()
}
// 2.2) 如果没有定时器状态存在,说明支付数据先到,订单创建信息还没到(发生了乱序)
else {
// 先把支付信息存储到状态中
isPayedState.update(true)
// 注册定时器,并等待watermark时间
ctx.timerService().registerEventTimeTimer(value.eventTime * 1000L)
// 将定时器时间保存到状态
timerState.update(value.eventTime * 1000L)
}
}
} // end processElement
// 定时器
override def onTimer(timestamp: Long,
ctx: KeyedProcessFunction[Long, OrderEvent, OrderResult]#OnTimerContext,
out: Collector[OrderResult]): Unit = {
// 检索支付状态
val isPayed = isPayedState.value()
// 如果已支付,说明订单创建信息丢失
if (isPayed) {
ctx.output(orderTimeoutOutputTag, OrderResult(ctx.getCurrentKey, "payed but no create"))
}
// 如果未支付,说明用户虽然创建了订单,但是放弃了支付
else {
ctx.output(orderTimeoutOutputTag, OrderResult(ctx.getCurrentKey, "order timeout"))
}
// 清除状态
isPayedState.clear()
timerState.clear()
}
} // end ProcessFunction
}
Java代码参考实现
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
public class OrderTimeoutDemo {
// 定义输入订单事件的样例类
public static class OrderEvent{
public long orderId;
public String eventType;
public String txId;
public long eventTime;
public OrderEvent(){}
public OrderEvent(long orderId, String eventType, String txId, long eventTime) {
this.orderId = orderId;
this.eventType = eventType;
this.txId = txId;
this.eventTime = eventTime;
}
}
// 定义输出结果样例类
public static class OrderResult{
public long orderId;
public String resultMsg;
public OrderResult() {
}
public OrderResult(long orderId, String resultMsg) {
this.orderId = orderId;
this.resultMsg = resultMsg;
}
@Override
public String toString() {
return "OrderResult{" +orderId + ", " + resultMsg + "}";
}
}
// 超时标签(注意这里:因为泛型类型擦除的原因,因此需要通过{}重写这个类)
final static OutputTag<OrderResult> orderTimeoutOutputTag = new OutputTag<OrderResult>("orderTimeout"){};
public static void main(String[] args) throws Exception {
// 设置流执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 设置并行度为1
env.setParallelism(1);
KeyedStream<OrderEvent,Long> orderEventStream = env
.socketTextStream("localhost", 9999)
.map(new MapFunction<String, OrderEvent>() {
@Override
public OrderEvent map(String s) throws Exception {
String[] dataArray = s.split(",");
// 构造为OrderEvent对象
OrderEvent orderEvent = new OrderEvent(
Long.parseLong(dataArray[0].trim()),
dataArray[1].trim(),
dataArray[2].trim(),
Long.parseLong(dataArray[3].trim())
);
return orderEvent;
}
})
.assignTimestampsAndWatermarks(WatermarkStrategy
.<OrderEvent>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<OrderEvent>() {
@Override
public long extractTimestamp(OrderEvent orderEvent, long l) {
return orderEvent.eventTime;
}
})
)
.keyBy(new KeySelector<OrderEvent, Long>() {
@Override
public Long getKey(OrderEvent orderEvent) throws Exception {
return orderEvent.orderId;
}
});
// 执行低层的KeyedProcessFunction处理
SingleOutputStreamOperator orderResultStream = orderEventStream.process(new OrderPayProcessFunction());
// 主数据流:支付的数据
orderResultStream.print("payed");
// 侧输出流:支付超时的数据
orderResultStream.getSideOutput(orderTimeoutOutputTag).print("time out order");
// 触发流程序执行
env.execute("Flink Streaming Java API Skeleton");
}
// 自定义ProcessFunction
public static class OrderPayProcessFunction
extends KeyedProcessFunction<Long, OrderEvent, OrderResult>{
// 声明两个状态存储
private ValueState<Boolean> isPayedState = null;
private ValueState<Long> timerState = null;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
isPayedState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("is-payed-state", Types.BOOLEAN));
timerState = getRuntimeContext()
.getState(new ValueStateDescriptor<>("timer-state", Types.LONG));
}
@Override
public void processElement(OrderEvent value,
Context ctx,
Collector<OrderResult> out) throws Exception {
Boolean isPayed = isPayedState.value(); // 获取支付状态
Long timerTs = timerState.value(); // 获取超时定时器时间
// 1) 如果是订单创建行为
if ("create".equals(value.eventType)) {
// 1.1) 如果是乱序,pay信息先到,create信息后到
if (isPayed != null) {
// 发送支付成功信息
out.collect(new OrderResult(value.orderId, "payed successfully"));
// 撤销超时告警定时器
ctx.timerService().deleteEventTimeTimer(timerTs);
// 清除状态(下单、支付已结束)
isPayedState.clear();
timerState.clear();
}
// 1.2) 如果是已经创建订单,但尚未支付
else {
// 注册支付超时告警定时器(15分钟)
long ts = value.eventTime * 1000L + 15 * 60 * 1000L;
// 注册定时器
ctx.timerService().registerEventTimeTimer(ts);
// 将定时器时间保存到状态后端
timerState.update(ts);
}
}
// 2)如果是支付行为
else if ("pay".equals(value.eventType)) {
// 2.1) 如果存在有定时器时间状态,说明有订单已创建但尚未支付
if (timerTs > 0) {
// 支付时间在设置的超时范围内(不超过15分钟)
if (timerTs > value.eventTime * 1000L) {
// 发送支付成功信息
out.collect(new OrderResult(value.orderId, "payed successfully"));
}
// 支付时间超时了,无效
else {
// 发送到侧输出流
ctx.output(orderTimeoutOutputTag, new OrderResult(value.orderId, "this order is timeout"));
}
// 删除定时器
ctx.timerService().deleteEventTimeTimer(timerTs);
// 清除状态
isPayedState.clear();
timerState.clear();
}
// 2.2) 如果没有定时器状态存在,说明支付数据先到,订单创建信息还没到(发生了乱序)
else {
// 先把支付信息存储到状态中
isPayedState.update(true);
// 注册定时器,并等待watermark时间
ctx.timerService().registerEventTimeTimer(value.eventTime * 1000L);
// 将定时器时间保存到状态
timerState.update(value.eventTime * 1000L);
}
}
}
@Override
public void onTimer(long timestamp,
OnTimerContext ctx,
Collector<OrderResult> out) throws Exception {
// 检索支付状态
Boolean isPayed = isPayedState.value();
// 如果已支付,说明订单创建信息丢失
if (isPayed != null) {
ctx.output(orderTimeoutOutputTag, new OrderResult(ctx.getCurrentKey(), "payed but no create"));
}
// 如果未支付,说明用户虽然创建了订单,但是放弃了支付
else {
ctx.output(orderTimeoutOutputTag, new OrderResult(ctx.getCurrentKey(), "order timeout"));
}
// 清除状态
isPayedState.clear();
timerState.clear();
}
}
}
执行程序查看结果
首先运行Flink流处理程序。
然后在终端执行以下命令,发送订单数据:
$ cat order.txt | nc -lk 9999查看Flink程序处理结果,输出内容如下所示:
payed> OrderResult(34729,payed successfully) payed> OrderResult(34731,payed successfully) payed> OrderResult(34733,payed successfully) time out order> OrderResult(34732,this order is timeout) time out order> OrderResult(34730,order timeout) time out order> OrderResult(34734,order timeout) time out order> OrderResult(34734,order timeout)