使用Kafka连接器
Flink为读写Kafka主题提供了专门的Kafka连接器,支持将Kafka做为Flink流应用程序的数据源和Data Sink。Flink Kafka消费者集成了Flink的检查点机制来提供精确一次的处理语义。为了实现这一点,Flink并不完全依赖Kafka的消费者组的偏移跟踪,而是在内部跟踪和检查这些偏移。
Kafka源
Kafka源提供了一个构建类来构造KafkaSource的实例。
下面的代码片段展示了如何构建一个KafkaSource来消费主题“input-topic”的最早偏移量的消息,带有消费组“my-group”,并且只将message的值反序列化为字符串。
KafkaSourcesource = KafkaSource. builder() .setBootstrapServers(brokers) .setTopics("input-topic") .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
【示例】使用Kafka自带的生产者脚本向Kafka的words主题写数据。编写一个Flink流应用程序,消费Kafka中words主题的数据。
1、在IntelliJ IDEA中创建一个Flink项目,使用flink-quickstart-java项目模板。(Flink项目创建过程,请参见2.2节)
2、在pom.xml中设置依赖。
3、创建流应用程序主类。代码如下。
Scala代码:
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.connector.kafka.source.KafkaSource
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
/**
* Kafka Source使用(最新 1.13 版本)
*
* 读取 Kafka中 words 主题的数据
*/
object KafkaSourceDemo {
def main(args: Array[String]) {
// 设置流执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 定义Kafka数据源
val source = KafkaSource.builder[String]
.setBootstrapServers("192.168.190.133:9092")
.setTopics("words")
.setGroupId("group-test")
.setStartingOffsets(OffsetsInitializer.earliest)
.setValueOnlyDeserializer(new SimpleStringSchema)
.build
env
// 指定Kafka数据源
.fromSource(source, WatermarkStrategy.noWatermarks[String], "Kafka Source")
.flatMap(new FlatMapFunction[String, String]() {
@throws[Exception]
override def flatMap(s: String, collector: Collector[String]): Unit = {
for (word <- s.split("\\W+")) {
collector.collect(word)
}
}
})
// 输出从Kafka words topic拉取的数据
.print
// 触发流程序执行
env.execute("Flink Kafka Source")
}
}
Java代码:
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* Kafka Source使用(最新 1.13 版本)
*
* 读取 Kafka中 words 主题的数据
*/
public class KafkaSourceDemo {
public static void main(String[] args) throws Exception {
// 设置流执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 定义Kafka数据源
KafkaSource<String> source = KafkaSource.<String>builder()
.setBootstrapServers("192.168.190.133:9092")
.setTopics("words")
.setGroupId("group-test")
.setStartingOffsets(OffsetsInitializer.earliest())
.setValueOnlyDeserializer(new SimpleStringSchema())
.build();
env
// 指定Kafka数据源
.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source")
// 做flatMap转换
.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String s, Collector<String> collector) throws Exception {
for(String word : s.split("\\W+")){
collector.collect(word);
}
}
})
// 输出从Kafka words topic拉取的数据
.print();
// 触发流程序执行
env.execute("Flink Kafka Source");
}
}
4、执行程序。请按以下步骤执行。
1)启动zookeeper服务和kafka服务。打开一个终端窗口,启动ZooKeeper(不要关闭)
$ ./bin/zookeeper-server-start.sh config/zookeeper.properties
2)打开另一个终端窗口,启动Kafka服务(不要关闭)
$ ./bin/kafka-server-start.sh config/server.properties
3)打开第三个终端窗口,在Kafka中创建一个名为"words"的主题(topic)
$ ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic words
4)查看已经创建的Topic:
$ ./bin/kafka-topics.sh --list --bootstrap-server localhost:9092
5)运行我们上面编写的流执行程序(相当于Kafka的消费者程序)。
6)在第三个终端窗口,执行Kafka自带的生产者脚本:
$ ./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic words
7)然后随意输入一些句子,单词之间以空格分隔开。如下:
good good study day day up
5、可以得到如下的输出结果:
8> good 8> good 8> study 8> day 8> day 8> up
Kafka Sink
Flink的Kafka生产者被称为FlinkKafkaProducer,通过它可以将记录流写入一个或多个Kafka主题,这时Kafka作为Flink流程序的Data Sink。
FlinkKafkaProducer的构造函数接受以下参数:
- 一个默认的输出主题,事件应该在其中写入;
- SerializationSchema / KafkaSerializationSchema用于将数据序列化到Kafka中;
- Kafka客户端的属性。以下属性是必需的:
- “bootstrap.servers” (用逗号分隔的Kafka broker列表)。
- 一个容错语义。
【示例】编写一个Flink流应用程序,将数据写入到Kafka中的指定主题words。使用Kafka自带的消费者脚本来查看写入的内容。
请按以下步骤操作。
1、在IntelliJ IDEA中创建一个Flink项目,使用flink-quickstart-java项目模板。(Flink项目创建过程,请参见2.2节)
2、在pom.xml中设置依赖。
3、创建流应用程序主类。代码如下。
Scala代码:
import java.util.Properties
import org.apache.flink.api.common.functions.FlatMapFunction
import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer
import org.apache.flink.util.Collector
/**
* 将流数据写入到 Kafka中 的 words 主题
*/
object KafkaProducerDemo {
def main(args: Array[String]) {
// 设置流执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 启用检查点
env.enableCheckpointing(50000)
val properties = new Properties()
properties.setProperty("bootstrap.servers", "192.168.190.133:9092")
// 构造Flink Kafka Sink,默认使用FlinkKafkaProducer.Semantic.AT_LEAST_ONCE语义
val myProducer = new FlinkKafkaProducer[String](
"words", // 目标topic
new SimpleStringSchema, // 序列化schema
properties // producer配置
)
// 将数据写入到Kafka的"words"主题
env
.fromElements("good good study", "day day up")
.flatMap(new FlatMapFunction[String, String]() {
@throws[Exception]
override def flatMap(s: String, out: Collector[String]): Unit = {
for (word <- s.split("\\W+")) {
out.collect(word)
}
}
})
.addSink(myProducer)
// 触发流程序执行
env.execute("Flink Streaming Scala API Skeleton")
}
}
Java代码:
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.nio.charset.StandardCharsets;
import java.util.Properties;
/**
* 将流数据写入到 Kafka中 的 words 主题
*/
public class KafkaProcuderDemo {
public static void main(String[] args) throws Exception {
// 设置流执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 启用检查点
env.enableCheckpointing(50000);
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.190.133:9092");
// 构造Flink Kafka Sink,默认使用FlinkKafkaProducer.Semantic.AT_LEAST_ONCE语义
FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<String>(
"words", // 目标topic
new SimpleStringSchema(), // 序列化schema
properties // producer配置
);
// 将数据写入到Kafka的"words"主题
env
.fromElements("good good study","day day up")
.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String s, Collector out) throws Exception {
for(String word : s.split("\\W+")){
out.collect(word);
}
}
})
.addSink(myProducer);
// 触发流程序执行
env.execute("Kafka Flink Producer Demo");
}
}
默认FlinkKafkaProducer使用的是AT_LEAST_ONCE语义,这要求启用检查点。随着Flink的检查点启用,Flink Kafka消费者将消费一个主题的记录,并定期检查点所有的Kafka偏移,以及其他操作的状态。如果检查点被禁用,Kafka消费者会定期向Zookeeper提交偏移量。
4、执行程序。请按以下步骤执行。
1)启动zookeeper服务和kafka服务。打开一个终端窗口,启动ZooKeeper(不要关闭)
$ ./bin/zookeeper-server-start.sh config/zookeeper.properties
2)打开另一个终端窗口,启动Kafka服务(不要关闭)
$ ./bin/kafka-server-start.sh config/server.properties
3)打开第三个终端窗口,在Kafka中创建一个名为“words”的主题(topic)
$ ./bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic words
4)查看已经创建的Topic:
$ ./bin/kafka-topics.sh --list --bootstrap-server localhost:9092
5)在第三个终端窗口,执行Kafka自带的消费者脚本:
$ ./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic words
6)运行我们上面编写的流执行程序(相当于Kafka的生产者程序)。
5、在第三个终端窗口(运行Kafka消费者脚本的窗口),可以看到如下的输出内容:
day day up good good study
在上面的代码中,我们使用默认的Semantic.AT_LEAST_ONCE语义,这时可以简单地指定序列化模式为SimpleStringSchema。但是,如果我们要指定FlinkKafkaProducer使用EXACTLY_ONCE语义,那么就需要自定义序列化模式,它需要实现KafkaSerializationSchema