Flink流处理程序编程模型
Flink程序的基本构建块是stream和transformation(流和转换)。从概念上讲,stream是数据记录的流(可能永远不会结束),transformation是一个运算,它接受一个或多个流作为输入,经过处理/计算后生成一个或多个输出流。
下面我们实现一个完整的、可工作的Flink流应用程序示例。
【示例】将有关人员的记录流作为输入,并从中筛选出未成年人信息。Scala实现
import org.apache.flink.streaming.api.scala._
/**
* 将有关人员的记录流作为输入,并从中筛选出未成年人信息。
*/
object StreamingJobDemo1 {
// 定义事件类
case class Person(name:String, age:Integer)
def main(args: Array[String]) {
// 设置流执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 读取数据源,构造数据流
val peoples = env.fromElements(
Person("张三", 21),
Person("李四", 16),
Person("王老五", 35)
)
// 对数据流执行filter转换
val adults = peoples.filter(_.age>18)
// 输出结果
adults.print
// 执行
env.execute("Flink Streaming Job")
}
}
执行代码,输出结果如下:
7> Person(张三,21) 1> Person(王老五,35)
Java实现
(1)在IntelliJ IDEA中创建一个Flink项目,使用flink-quickstart-java项目模板。
(2)设置依赖。在pom.xml中添加如下依赖:
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>1.13.2</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>1.13.2</version> <scope>provided</scope> </dependency>
(3)创建一个POJO类,用来表示流中的数据。代码如下。
// POJO类,表示人员信息实体
public class Person {
public String name; // 存储姓名
public Integer age; // 存储年龄
// 空构造器
public Person() {};
// 构造器,初始化属性
public Person(String name, Integer age) {
this.name = name;
this.age = age;
};
// 用于调试时输出信息
public String toString() {
return this.name.toString() + ": age " + this.age.toString();
};
}
(4)打开项目中的StreamingJob对象文件,编辑流处理代码如下。
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.api.common.functions.FilterFunction;
public class StreamingJobDemo1 {
public static void main(String[] args) throws Exception {
// 获得流执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 读取数据源,构造DataStream
DataStream personDS = env.fromElements(
new Person("张三", 21),
new Person("李四", 16),
new Person("王老五", 35)
);
// 执行转换运算(这里是过滤年龄不小于18岁的人)
// 注意,这里我们使用了匿名函数
DataStream<Person> adults = personDS.filter(new FilterFunction() {
@Override
public boolean filter(Person person) throws Exception {
return person.age >= 18;
}
});
// 将结果输出到控制台
adults.print();
// 触发流程序开始执行
env.execute("stream demo");
}
}
(5)执行以上程序,输出结果如下。
张三: age 21 王老五: age 35