侧输出流
除了DataStream操作产生的主流之外,还可以生成任意数量的附加侧输出(side output)结果流。
什么是侧输出流?
很多场景下都需要从一个Flink操作符输出一个以上的输出流,比如:
- 异常
- 格式不正确的事件
- 延迟事件
- 操作警报,例如到外部服务的超时连接
除了错误报告之外,侧输出也是实现流的N-路拆分的好方法。
每个侧输出通道都与一个OutputTag
当使用侧输出时,首先需要定义一个OutputTag,用于标识侧输出流:
Java代码:
// 这需要是一个匿名的内部类,以便我们可以分析类型 OutputTagoutputTag = new OutputTag ("side-output") {};
Scala代码:
val outputTag = OutputTag[String]("side-output")
请注意OutputTag是如何根据侧输出流包含的元素类型进行类型化的。
可以通过以下函数将数据发送到侧输出流:
- ProcessFunction
- KeyedProcessFunction
- CoProcessFunction
- ProcessWindowFunction
- ProcessAllWindowFunction
可以使用上面函数中向用户公开的Context参数,将数据发送到OutputTag标识的侧输出。
要获得侧输出流,可以对DataStream操作的结果使用getSideOutput(OutputTag)。这将得到一个DataStream,这是输入到侧输出流的结果。
侧输出流应用示例
下面我们通过一个示例来理解侧输出流的应用方法。
【示例】下面是一个从ProcessFunction发出侧输出数据的例子,将数据集中的负数挑出来,输出到侧输出中。
Scala代码:
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.scala._
import org.apache.flink.util.Collector
/**
* 侧输出示例:将数据集中的负数挑出来,输出到侧输出中
*/
object SideOutputDemo {
def main(args: Array[String]): Unit = {
// 设置流执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 侧输出
val numbers = List(1, 2, -3, 4, 5, -6, 7, 8, -9, 10)
val ds = env.fromElements(numbers:_*)
// 定义用来标记侧输出的标签,注意泛型参数是侧输出数据的类型
val outputTag = new OutputTag[Int]("side-output") {}
// 分流处理
val mainDataStream = ds.process(new ProcessFunction[Int, Int] {
override def processElement(value: Int,
ctx: ProcessFunction[Int, Int]#Context,
out: Collector[Int]): Unit = {
if (value > 0) out.collect(value) // 将数据发送到常规输出
else ctx.output(outputTag, value) // 向侧输出发送负数
}
})
// 获取侧输出结果
val sideOutputStream = mainDataStream.getSideOutput[Int](outputTag)
// 打印注数据流
mainDataStream.print("主数据流")
// 打印侧输出流
sideOutputStream.print("侧输出流")
// 执行
env.execute("flink transformatiion")
}
}
Java代码:
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
/**
* 侧输出示例:将数据集中的负数挑出来,输出到侧输出中
*/
public class SideOutputDemo {
public static void main(String[] args) throws Exception {
// 设置流执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 侧输出
DataStream<Integer> ds = env.fromElements(1,2,-3,4,5,-6,7,8,-9,10);
// 定义用来标记侧输出的标签,注意泛型参数是侧输出数据的类型
final OutputTag<Integer> outputTag = new OutputTag<Integer>("side-output"){};
// 分流处理
SingleOutputStreamOperator<Integer> mainDataStream = ds
.process(new ProcessFunction<Integer, Integer>() {
@Override
public void processElement(Integer value, Context ctx, Collector<Integer> out)
throws Exception {
if(value > 0){
out.collect(value); // 将数据发送到常规输出
}else{
ctx.output(outputTag, value); // 向侧输出发送负数
}
}
});
// 获取侧输出结果
DataStream<Integer> sideOutputStream = mainDataStream.getSideOutput(outputTag);
// 打印主输出流
// mainDataStream.print("主数据流");
// 打印侧输出流
sideOutputStream.print("侧输出流");
// 执行
env.execute("flink transformatiion");
}
}
执行以上代码,输出结果如下所示:
侧输出流:8> -3 侧输出流:3> -6 侧输出流:6> -9