数据转换-connect
DataStream的connect操作创建的是ConnectedStreams或BroadcastConnectedStream,它用了两个泛型,即不要求两个dataStream的element是同一类型。
union虽然可以合并多个数据流,但有一个限制,即多个数据流的数据类型必须相同。connect提供了和union类似的功能,用来连接两个数据流,它与union的区别在于:
- connect只能连接两个数据流,union可以连接多个数据流。
- connect所连接的两个数据流的数据类型可以不一致,union所连接的两个数据流的数据类型必须一致。
- 两个DataStream经过connect之后被转化为ConnectedStreams,ConnectedStreams会对两个流的数据应用不同的处理方法,且双流之间可以共享状态。
两个输入流经过connect合并后,可以进一步使用CoProcessFunction、CoMap、CoFlatMap、KeyedCoProcessFunction等API 对两个流分别处理。
下面是进行connect转换的示例代码。
Scala代码:
import org.apache.flink.streaming.api.functions.co.{CoMapFunction, CoProcessFunction}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
/**
* connect转换操作
* connect和union都有一个共同的作用,就是将2个流或多个流合成一个流。
* 但是两者的区别是:union连接的2个流的类型必须一致,connect连接的流可以不一致,但是可以统一处理。
*/
object TransformerConnect {
def main(args: Array[String]): Unit = {
// 设置流执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// connect
// 第一个数据集
val ds1:DataStream[(String, Int)] = env.fromElements("good good study").flatMap(_.split("\\W+")).map((_, 1))
// 第二个数据集
val ds2:DataStream[String] = env.fromElements("day day up").flatMap(_.split("\\W+"))
// 连接两个数据集
val ds:ConnectedStreams[(String, Int),String] = ds1.connect(ds2)
// 调用process方法
// CoProcessFunction泛型参数:[输入流1数据类型,输入流2数据类型,输出流数据类型]
ds.process(new CoProcessFunction[(String, Int), String, (String, Int)]{
// 处理输入流1的元素
override def processElement1(in1: (String, Int),
context: CoProcessFunction[(String, Int), String, (String, Int)]#Context,
out: Collector[(String, Int)]): Unit = {
out.collect(in1) // 发送给下游算子
}
// 处理输入流2的元素
override def processElement2(in2: String,
context: CoProcessFunction[(String, Int),
String, (String, Int)]#Context,
out: Collector[(String, Int)]): Unit = {
out.collect((in2,1)) // 将来自输入流2的元素转换为元组,再发送给下游算子
}
}).print
// map
ds.map(new CoMapFunction[(String, Int), String, (String, Int)] {
override def map1(in1: (String, Int)): (String, Int) = {
(in1._1.toUpperCase, in1._2)
}
override def map2(in2: String): (String, Int) = {
(in2, 1)
}
}).print
// 执行
env.execute("flink connect transformatiion")
}
}
Java代码
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
import org.apache.flink.streaming.api.functions.co.CoProcessFunction;
import org.apache.flink.util.Collector;
/**
* connect转换操作
* connect和union都有一个共同的作用,就是将2个流或多个流合成一个流。
* 但是两者的区别是:union连接的2个流的类型必须一致,connect连接的流可以不一致,但是可以统一处理。
*/
public class TransformerConnect {
public static void main(String[] args) throws Exception {
// 设置流执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// connect转换操作
// 第一个数据集
DataStream<Tuple2<String, Integer>> ds1 = env.fromElements("good good study")
.map(String::toLowerCase)
.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out)
throws Exception {
for(String word: value.split("\\W+")){
out.collect(new Tuple2<>(word,1));
}
}
});
// 第二个数据集
DataStream<String> ds2 = env.fromElements("day day up")
.map(String::toLowerCase)
.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out) throws Exception {
for(String word: value.split("\\W+")){
out.collect(word);
}
}
});
// 连接两个数据集
ConnectedStreams<Tuple2<String, Integer>,String> ds = ds1.connect(ds2);
ds.process(new CoProcessFunction<Tuple2<String,Integer>, String, Tuple2<String,Integer>>() {
@Override
public void processElement1(Tuple2<String, Integer> t,
Context context,
Collector<Tuple2<String,Integer>> out) throws Exception {
out.collect(t);
}
@Override
public void processElement2(String s, Context context, Collector<Tuple2<String,Integer>> out)
throws Exception {
out.collect(new Tuple2<>(s,1));
}
}).print("process");
ds.map(new CoMapFunction<Tuple2<String,Integer>, String, Tuple2<String,Integer>>() {
@Override
public Tuple2<String, Integer> map1(Tuple2<String, Integer> t) throws Exception {
return new Tuple2<>(t.f0.toUpperCase(),t.f1);
}
@Override
public Tuple2<String, Integer> map2(String s) throws Exception {
return new Tuple2<>(s, 1);
}
}).print("map");
// 执行
env.execute("flink connect transformatiion");
}
}
执行以上代码,输出结果如下所示:
process:1> (day,1) process:1> (day,1) process:1> (up,1) map:1> (day,1) map:1> (day,1) map:1> (up,1) map:5> (GOOD,1) map:5> (GOOD,1) map:5>> (STUDY,1) process:5> (good,1) process:5> (good,1) process:5> (study,1)