使用集合数据源
也可以直接在内存中将一个数据集合读取为DataStream。Flink提供了如下几个方法。
- fromCollection(Seq):从Java Java.util. collection创建一个数据流。集合中的所有元素必须具有相同的类型。
- fromCollection(Iterator):从迭代器创建数据流。该类指定迭代器返回的元素的数据类型。
- fromElements(elements: _*):根据给定的对象序列创建数据流。所有对象必须具有相同的类型。
- fromParallelCollection(SplittableIterator):并行地从迭代器创建数据流。该类指定迭代器返回的元素的数据类型。
- generateSequence(from, to):并行地生成给定区间内的数字序列。
【示例】使用集合数据源。
1、在IntelliJ IDEA中创建一个Flink项目,使用flink-quickstart-scala/flink-quickstart-java项目模板。
2、打开项目中的StreamingJob对象文件,编辑流处理代码。
Scala代码:
import org.apache.flink.streaming.api.scala._
object CollectionSourceDemo {
// 定义事件类
case class Person(name:String, age:Integer)
def main(args: Array[String]): Unit = {
// 设置流执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 首先从环境中获取一些数据,比如:
val peoples = List(Person("张三", 21),Person("李四", 16),Person("王老五", 35))
val personStream = env.fromCollection(peoples)
// 对DataStream执行转换操作,并输出计算结果
personStream.filter(_.age>18).print()
// 触发流程序执行
env.execute("Flink Collection Source Demo")
}
}
Java代码:
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.util.ArrayList;
import java.util.List;
public class CollectionSourceDemo {
// POJO类
public static class Person {
public String name;
public Integer age;
public Person() {}
public Person(String name, Integer age) {
this.name = name;
this.age = age;
}
public String toString() {
return this.name.toString() + ": age " + this.age.toString();
}
}
public static void main(String[] args) throws Exception {
// 设置流执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 读取数据源,构造DataStream
List peoples = new ArrayList<>();
peoples.add(new Person("张三", 21));
peoples.add(new Person("李四", 16));
peoples.add(new Person("王老五", 35));
DataStream personStream = env.fromCollection(peoples);
// 对DataStream执行filter转换操作
DataStream adults = personStream.filter(new FilterFunction() {
@Override
public boolean filter(Person person) throws Exception {
return person.age >= 18;
}
});
// 输出流计算结果
adults.print();
// 触发流程序执行
env.execute("Flink File Source");
}
}
3、执行以上程序,输出结果如下:
张三: age 21 王老五: age 35