数据转换-聚合转换
DataStream API支持各种聚合,比如min、max、sum等等。这些函数可以应用于KeyedDataStream,以获得滚动聚合。下面是进行聚合转换的示例代码。
Scala代码:
import org.apache.flink.streaming.api.scala._
/**
* DataStream API支持各种聚合,比如min、max、sum等等。
* 这些函数可以应用于KeyedDataStream,以获得滚动聚合。
*/
object TransformerAgg{
def main(args: Array[String]): Unit = {
// 设置流执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 首先从环境中获取一些数据,并使用操作符转换 DataStream[String]。比如:
val ds_keyed = env.fromElements(("good",1),("good",2),("study",1)).keyBy(_._1)
ds_keyed.sum(1).print
ds_keyed.min(1).print
ds_keyed.max(1).print
ds_keyed.minBy(1).print
ds_keyed.maxBy(1).print
env.execute("flink aggregation transformatiion")
}
}
Java代码:
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* DataStream API支持各种聚合,比如min、max、sum等等。
* 这些函数可以应用于KeyedDataStream,以获得滚动聚合。
*/
public class TransformerAgg {
public static void main(String[] args) throws Exception {
// 设置流执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 首先从环境中获取一些数据,再执行map和flatMap转换:
DataStream<Tuple2<String,Integer>> ds = env.fromElements(
new Tuple2<>("good",1),
new Tuple2<>("good",2),
new Tuple2<>("study",1));
ds.keyBy(t -> t.f0).sum(1).print(); // 参数也可以是字段名
ds.keyBy(t -> t.f0).min(1).print();
ds.keyBy(t -> t.f0).max(1).print();
ds.keyBy(t -> t.f0).minBy(1).print();
ds.keyBy(t -> t.f0).maxBy(1).print();
// 对于流程序,只有执行了下面这个方法,流程序才真正开始执行
env.execute("flink aggregation transformatiion");
}
}