数据转换-reduce
数据转换使用操作符(operator)将一个或多个数据流转换为新的数据流。转换输入可以是一个或多个数据流,转换输出也可以是零个、一个或多个数据流。程序可以将多个转换组合成复杂的数据流拓扑。
reduce转换将当前元素与最后一个减少的值组合在一起,并发出新的值。下面是进行reduce转换的示例代码(对DataStream进行求和)。
Scala代码:
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
/**
* reduce转换
*/
object TransformerReduce {
def main(args: Array[String]): Unit = {
// 设置批处理执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 加载数据源,然后执行DataStream转换
env.fromElements("Good good study", "Day day up")
.map(_.toLowerCase) // 转换为小写
.flatMap(_.split("\\W+")) // 相当于先map,再flatten
.map((_,1)) // 转换为元组类型
.keyBy(_._1) // 按单词进行分组
.reduce((a,b) => (a._1, a._2 + b._2)) // reduce
.print
// 触发执行
env.execute("flink reduce transformatiion")
}
}
Java代码:
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* reduce转换
*/
public class TransformerReduce {
public static void main(String[] args) throws Exception {
// 设置流执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 首先加载数据,然后执行转换
DataStream<Tuple2<String, Integer>> ds = env
.fromElements("Good good study","Day day up")
.map(String::toLowerCase)
.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String, Integer>> collector) throws Exception {
for(String word : s.split("\\W+")){
collector.collect(new Tuple2<>(word,1));
}
}
});
ds//.keyBy(0) // 已弃用
.keyBy(t -> t.f0)
.reduce(new ReduceFunction<Tuple2<String, Integer>>() {
@Override
public Tuple2<String, Integer> reduce(Tuple2<String, Integer> t1, Tuple2<String, Integer> t2)
throws Exception {
return new Tuple2<>(t1.f0, t1.f1 + t2.f1);
}
}).print();
// 对于流程序,只有执行了下面这个方法,流程序才真正开始执行
env.execute("flink reduce transformatiion");
}
}
6> (good,1) 4> (up,1) 6> (good,2) 7> (day,1) 7> (day,2) 5> (study,1)