Flink低级操作API_ProcessFunction
从之前的内容我们知道,Flink的转换操作是无法访问事件的时间戳信息和水印信息的。例如我们常用的MapFunction转换操作就无法访问时间戳或者当前事件的事件时间。而在一些应用场景下,访问事件的时间戳信息和水印信息极为重要。因此,Flink DataStream API提供了一系列的低级(Low-Level)转换操作,可以访问时间戳、水印以及注册定时事件。还可以输出特定的一些事件,例如超时事件等。这一类的低级API,被称为ProcessFunction。
ProcessFunction将事件处理与计时器和状态相结合,使其成为流处理应用程序的强大组件。这是使用Flink创建事件驱动应用程序的基础。
ProcessFunction
ProcessFunction是一个低级的流处理操作,允许访问所有(非循环)流应用程序的基本构件:
- events:数据流中的元素。
- state:状态,用于容错和一致性,仅用于keyed stream。
- timer:定时器,支持事件时间和处理时间,仅用于keyed stream。
ProcessFunction可以被认为是一个访问keyed state和定时器的FlatMapFunction。通过为输入流中接收的每个事件调用它来处理事件。
ProcessFunction用来构建事件驱动的应用以及实现自定义的业务逻辑(使用之前的窗口函数和转换算子无法实现)。例如,Flink SQL就是使用ProcessFunction实现的。
Flink提供了8个Process Function:
- ProcessFunction:用于DataStream。
- KeyedProcessFunction:用于KeyedStream,keyBy之后的流处理。
- CoProcessFunction:用于connect连接的流。
- ProcessJoinFunction:用于join流操作。
- BroadcastProcessFunction:用于广播。
- KeyedBroadcastProcessFunction:keyBy之后的广播。
- ProcessWindowFunction:窗口增量聚合。
- ProcessAllWindowFunction:全窗口聚合。
KeyedProcessFunction
KeyedProcessFunction是RichFunction的一种,是最常用的ProcessFunction之一。
作为一个RichFunction,它可以访问使用托管keyed state所需的open和getRuntimeContext方法。对于容错状态,KeyedProcessFunction可以通过 RuntimeContext 访问Flink的keyed state,这与其他有状态函数访问keyed state的方式类似。
作为ProcessFunction的扩展,KeyedProcessFunction在其onTimer(…)方法中通过OnTimerContext提供了对计时器key的访问。
【示例】维护数据流中每个key的计数,并在每过10秒钟(以事件时间)而未更新该key时,发出一个key/count对。
在下面的示例中,KeyedProcessFunction维护每个key的计数,并在每过10秒钟(以事件时间)而未更新该key时,发出一个key/count对:
- 监听Socket数据源,获取输入字符串。
- 把key(单词)、计数和最后修改时间戳存储在一个ValueState状态中, ValueState的作用域是通过key隐式确定的。
- 对于每个记录,KeyedProcessFunction递增计数器并设置最后修改时间戳。
- 该函数还安排了一个10秒后的回调(以事件时间)。
- 在每次回调时,它根据存储的计数的最后修改时间检查回调的事件时间时间戳,并在它们匹配时发出key/count(即,在10秒钟内这个单词没有再次出现,就把这个单词和它出现的总次数发送到下游算子)。
Scala代码:
import java.text.SimpleDateFormat
import java.time.Duration
import java.util.Date
import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
import org.apache.flink.streaming.api.functions.KeyedProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
/**
* 维护每个key的计数,并在每过10秒钟(以事件时间)而未更新该key时,发出一个key/count对:
*/
object KeyedProcessFunDemo {
// 存储在状态中的数据类型
case class CountWithTimestamp(key: String, count: Long, lastModified: Long)
def main(args: Array[String]) {
// 设置流执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 并行度1
env.setParallelism(1)
// 源数据流
val stream = env
.socketTextStream("localhost", 9999)
.flatMap(_.split("\\W+"))
.map((_,1))
// 为事件分配时间戳和水印
stream
.assignTimestampsAndWatermarks(WatermarkStrategy
.forMonotonousTimestamps[(String, Int)]()
.withTimestampAssigner(new SerializableTimestampAssigner[(String, Int)] {
override def extractTimestamp(t: (String, Int), ts: Long): Long = System.currentTimeMillis()
})
.withIdleness(Duration.ofSeconds(5))
)
.keyBy(_._1)
.process(new CountWithTimeoutFunction())
.print()
// 执行流程序
env.execute("Process Function")
}
/**
* KeyedProcessFunction的子类,维护计数和超时。
* 它的作用是将每个单词最新出现时间记录到backend,并创建定时器,
* 定时器触发的时候,检查这个单词距离上次出现是否已经达到10秒,如果是,就发射给下游算子
*/
class CountWithTimeoutFunction extends KeyedProcessFunction[String, (String, Int), (String, Long)] {
/**
* 首先获得由这个处理函数(process function)维护的状态
* 通过 RuntimeContext 访问Flink的keyed state
*/
lazy val state: ValueState[CountWithTimestamp] = getRuntimeContext.getState(
new ValueStateDescriptor[CountWithTimestamp]("myState", classOf[CountWithTimestamp])
)
/**
* 对于在输入流中接收到的每一个事件,此函数就会被调用以处理该事件
* @param value 输入元素
* @param ctx 上下文件环境
* @param out
* @throws Exception
*/
override def processElement(value: (String, Int),
ctx: KeyedProcessFunction[String, (String, Int), (String, Long)]#Context,
out: Collector[(String, Long)]): Unit = {
// 初始化或检索/更新状态
val current = state.value match {
case null =>
CountWithTimestamp(value._1, 1, ctx.timestamp) // 如果是第一个事件,设初始计数为1
case CountWithTimestamp(key, count, lastModified) =>
// 先撤销之前的定时器
// ctx.timerService().deleteEventTimeTimer(lastModified + 10000);
ctx.timerService.deleteProcessingTimeTimer(lastModified + 10000)
CountWithTimestamp(key, count + 1, ctx.timestamp) // 如果不是第一个事件,则累加
}
// 将修改过后的状态写回
state.update(current)
// 从当前事件时间开始安排下一个定时器10秒
// 为当前单词创建定时器,十秒后后触发
val timer = current.lastModified + 10000
// ctx.timerService.registerEventTimeTimer(timer)
ctx.timerService.registerProcessingTimeTimer(timer)
// 打印所有信息,用于核对数据正确性
println(s"process, ${current.key}, ${current.count}, " +
s"lastModified : ${current.lastModified}, (${time(current.lastModified)}), " +
s"timer : ${timer} (${time(timer)})\n")
}
/** 定时器触发后执行的方法
* 如果一分钟内没有新来的相同的单词,则发出 key/count对
*
* @param timestamp 这个时间戳代表的是该定时器的触发时间
* @param ctx
* @param out
* @throws Exception
*/
override def onTimer(timestamp: Long,
ctx: KeyedProcessFunction[String, (String, Int), (String, Long)]#OnTimerContext,
out: Collector[(String, Long)]): Unit = {
// 取得当前单词
val currentKey = ctx.getCurrentKey
// 获取调度此计时器的key的状态(即此单词的状态)
val result = state.value
// 当前元素是否已经连续10秒未出现的标志
var isTimeout = false
// timestamp是定时器触发时间
// 如果等于最后一次更新时间+10秒,就表示这10秒内已经收到过该单词了,
// 这种连续10秒没有出现的元素,被发送到下游算子
result match {
case CountWithTimestamp(key, count, lastModified) if timestamp >= lastModified + 10000 =>
out.collect((key, count)) // 发出key/count对
isTimeout = true
case _ =>
}
// 打印数据,用于核对是否符合预期
println(s"ontimer, ${currentKey}, ${result.count}, " +
s"lastModified : ${result.lastModified}, (${time(result.lastModified)}), " +
s"stamp : ${timestamp} (${time(timestamp)}),isTimeout: ${isTimeout}\n")
}
}
private def time(timeStamp: Long) = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(timeStamp))
}
Java代码:
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.StringUtils;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Date;
/**
* 维护每个key的计数,并在每过10秒钟(以事件时间)而未更新该key时,发出一个key/count对:
*/
public class KeyedProcessFunDemo {
// 存储在状态中的数据类型,POJO类
public static class CountWithTimestamp {
public String key;
public long count;
public long lastModified;
}
public static void main(String[] args) throws Exception {
// 设置流执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 并行度1
env.setParallelism(1);
// 源数据流
DataStream<Tuple2<String, Integer>> stream = env
.socketTextStream("localhost",9999)
.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector)
throws Exception {
if(StringUtils.isNullOrWhitespaceOnly(s)) {
System.out.println("invalid line");
return;
}
for(String word : s.split("\\W+")){
collector.collect(new Tuple2<>(word,1));
}
}
});
stream
.assignTimestampsAndWatermarks(WatermarkStrategy
.<Tuple2<String, Integer>>forMonotonousTimestamps()
.withTimestampAssigner((event, ts) -> System.currentTimeMillis())
.withIdleness(Duration.ofSeconds(5))
)
.keyBy(t -> t.f0)
.process(new CountWithTimeoutFunction())
.print();
// 执行
env.execute("flink streaming job");
}
/**
* KeyedProcessFunction的子类,维护计数和超时。
* 它的作用是将每个单词最新出现时间记录到backend,并创建定时器,
* 定时器触发的时候,检查这个单词距离上次出现是否已经达到10秒,如果是,就发射给下游算子
*/
public static class CountWithTimeoutFunction extends KeyedProcessFunction<
String, // key
Tuple2<String, Integer>, // input
Tuple2<String, Long>> { // output
// 由此函数所维护的存储状态
private ValueState<CountWithTimestamp> state;
// 首先获得由这个处理函数维护的状态
// 通过 RuntimeContext 访问Flink的keyed state
@Override
public void open(Configuration parameters) throws Exception {
// 初始化状态,状态名称是myState
state = getRuntimeContext().getState(new ValueStateDescriptor<>("myState", CountWithTimestamp.class));
}
/**
* 对于在输入流中接收到的每一个事件,此函数就会被调用以处理该事件
* @param value 输入元素
* @param context 上下文件环境
* @param out
* @throws Exception
*/
@Override
public void processElement(
Tuple2<String, Integer> value,
Context context,
Collector<Tuple2<String, Long>> out) throws Exception {
// 取得当前是哪个单词
// String currentKey = context.getCurrentKey();
// 从backend取得当前单词的myState状态
CountWithTimestamp current = state.value();
// 如果myState还从未没有赋值过,就在此初始化
if (current == null) {
current = new CountWithTimestamp();
current.key = value.f0;
}
// 先撤销之前的定时器
// context.timerService().deleteEventTimeTimer(current.lastModified + 10000);
context.timerService().deleteProcessingTimeTimer(current.lastModified + 10000);
// 更新状态计数值(单词数量加1)
current.count++;
// 将状态的时间戳设置为记录分配的事件时间戳
// 取当前元素的时间戳,作为该单词最后一次出现的时间
current.lastModified = context.timestamp();
// 重新保存到backend,包括该单词出现的次数,以及最后一次出现的时间
state.update(current);
// 为当前单词创建定时器,10秒后后触发
long timer = current.lastModified + 10000;
context.timerService().registerEventTimeTimer(timer);
// context.timerService().registerProcessingTimeTimer(timer);
// 打印所有信息,用于核对数据正确性
System.out.println(String.format("process, %s, %d, lastModified : %d (%s), timer : %d (%s)\n",
current.key,
current.count,
current.lastModified,
time(current.lastModified),
timer,
time(timer)));
}
/** 定时器触发后执行的方法
* 如果一分钟内没有新来的相同的单词,则发出 key/count对
*
* @param timestamp 这个时间戳代表的是该定时器的触发时间
* @param ctx
* @param out
* @throws Exception
*/
@Override
public void onTimer(
long timestamp,
OnTimerContext ctx,
Collector<Tuple2<String, Long>> out) throws Exception {
// 取得当前单词
String currentKey = ctx.getCurrentKey();
// 获取调度此计时器的key的状态(即此单词的状态)
CountWithTimestamp result = state.value();
// 当前元素是否已经连续10秒未出现的标志
boolean isTimeout = false;
// timestamp是定时器触发时间
// 超过10秒钟该单词(key)没有再出现,就发送给下游
if (timestamp >= result.lastModified + 10000) {
out.collect(new Tuple2<>(result.key, result.count));
isTimeout = true;
}
// 打印数据,用于核对是否符合预期
System.out.println(String.format("ontimer, %s, %d, lastModified : %d (%s), stamp : %d (%s), isTimeout : %s\n\n",
currentKey,
result.count,
result.lastModified,
time(result.lastModified),
timestamp,
time(timestamp),
String.valueOf(isTimeout)));
}
}
private static String time(long timeStamp) {
return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date(timeStamp));
}
}
在processElement方法中,state.value()可以取得当前单词的状态,state.update(current)可以设置当前单词的状态执行以上代码。
请按以下步骤执行上面的程序:
1)在控制台执行命令nc -lk 9999,这样就可以从控制台向9999端口发送字符串了;
2)在IDEA上直接执行上同的程序代码,程序运行就开始监听本机的9999端口了;
3)在netcat的控制台输入aaa再回车,连续两次,中间间隔不要超过10秒;然后再输入ttt回车。如下图所示: