水印策略
Flink提供了用于处理事件时间、时间戳和水印的API。
什么是水印策略?
为了处理事件时间,Flink流程序需要知道事件的时间戳,这意味着流中的每个元素都需要分配其事件时间戳。这通常是通过使用TimestampAssigner从元素中的某个字段访问/提取时间戳来实现的。
时间戳分配与生成水印密切相关,生成水印是告诉系统事件时间进度的一种方式。可以通过指定WatermarkGenerator来配置水印。 TimestampAssigner和WatermarkGenerator形成了一个水印策略,它定义了如何在流源中生成水印。Flink API需要一个同时包含TimestampAssigner和WatermarkGenerator的WatermarkStrategy。
使用水印策略
在Flink应用程序中有两个地方可以使用WatermarkStrategy:
- 1)直接在源上使用;
- 2)在非源操作之后使用。
第一种选择更可取,因为它允许源利用关于水印逻辑中的分片/分区/分割的知识。源通常可以在更细的水平上跟踪水印,源产生的整体水印将更准确。直接在源上指定WatermarkStrategy通常意味着必须使用一个源特定的接口。
第二个选项(在任意操作后设置WatermarkStrategy)应该只在不能直接在源上设置策略的情况下使用。
内置水印生成器
Flink提供了抽象,允许程序员分配他们自己的时间戳并发出他们自己的水印。更具体地说,可以通过实现WatermarkGenerator接口来实现。
不过,为了进一步简化此类任务的编程工作,Flink附带了一些预先实现的时间戳赋值器,包括:
- 单调递增时间戳
- 固定的迟到时间
分配时间戳和水印示例
下面通过一个示例来掌握如何为数据流中的事件分配时间戳和水印。
【示例】(Scala实现)为数据流中的事件分配时间戳和水印。
请按以下步骤执行。
1、在IntelliJ IDEA中创建一个Flink项目,使用flink-quickstart-scala项目模板。(Flink项目创建过程,请参见2.2节)
2、在pom.xml中添加依赖。
3、创建流应用程序类。代码如下:
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.streaming.api.scala._
/**
* 功能:为数据流中的事件分配时间戳和水印
*/
object AssignerDemo2 {
// case类,表示流元素数据类型
case class MessageInfo(hostname: String, status: String)
// main方法
def main(args: Array[String]) {
// 设置流执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 数据流源
val stream = env.fromElements(
MessageInfo("host1", "1234"),
MessageInfo("host2", "2234"),
MessageInfo("host3", "1234")
)
// 因为模拟数据没有时间戳,所以用此方法添加递增时间戳和水印
// 为数据流中的元素分配时间戳,并生成标记以指示事件时间进展
val withTimestampsAndWatermarks = stream.assignTimestampsAndWatermarks(
WatermarkStrategy.forMonotonousTimestamps[MessageInfo]()
.withTimestampAssigner(new SerializableTimestampAssigner[MessageInfo]() {
// 取当前时间戳为事件时间戳
override def extractTimestamp(t: MessageInfo, ts: Long): Long = {
System.currentTimeMillis()
}
})
)
// 将结果输出到控制台
withTimestampsAndWatermarks.print
// 执行
env.execute("Flink Watermark Strategy ")
}
}
【示例】(Java实现)为数据流中的事件分配时间戳和水印。
请按以下步骤执行。
1、在IntelliJ IDEA中创建一个Flink项目,使用flink-quickstart-java项目模板。(Flink项目创建过程,请参见2.2节)
2、在pom.xml中添加依赖。
3、创建事件数据结构。代码如下:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* 功能:为数据流中的事件分配时间戳和水印
*/
public class AssignerDemo {
public static class MessageInfo {
public String hostname;
public String status;
public MessageInfo() {
}
public MessageInfo(String hostname, String status){
this.hostname = hostname;
this.status = status;
}
@Override
public String toString() {
return "MessageInfo{" +
"hostname='" + hostname + '\'' +
", status='" + status + '\'' +
'}';
}
}
public static void main(String[] args) throws Exception {
// 设置流执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 触发流程序执行
env.execute("Flink Watermark Strategy");
}
}
4、创建流应用程序类。代码如下:
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* 功能:为数据流中的事件分配时间戳和水印
*/
public class AssignerDemo {
public static class MessageInfo {
public String hostname;
public String status;
public MessageInfo() {
}
public MessageInfo(String hostname, String status){
this.hostname = hostname;
this.status = status;
}
@Override
public String toString() {
return "MessageInfo{" +
"hostname='" + hostname + '\'' +
", status='" + status + '\'' +
'}';
}
}
public static void main(String[] args) throws Exception {
// 设置流执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 源数据流
DataStream<MessageInfo> stream = env.fromElements(
new MessageInfo("host1","1234"),
new MessageInfo("host2","2234"),
new MessageInfo("host3","1234")
);
// 因为模拟数据没有时间戳,所以用此方法添加单调增加时间戳和水印
DataStream<MessageInfo> withTimestampsAndWatermarks = stream
.assignTimestampsAndWatermarks(WatermarkStrategy
.<MessageInfo>forMonotonousTimestamps()
.withTimestampAssigner((MessageInfo, ts) -> System.currentTimeMillis()));
// 将结果输出到控制台
withTimestampsAndWatermarks.print();
// 触发流程序执行
env.execute("Flink Watermark Strategy");
}
}