发布日期:2021-11-06
数据转换-filter
数据转换使用操作符(operator)将一个或多个数据流转换为新的数据流。转换输入可以是一个或多个数据流,转换输出也可以是零个、一个或多个数据流。程序可以将多个转换组合成复杂的数据流拓扑。
filter转换
filter函数对条件进行评估,如果结果为true,则该条数据输出。filter函数可以输出零个记录。下面是进行filter转换的示例代码。
Scala代码:
import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
object TransformerFilter{
def main(args: Array[String]) {
// 设置批处理执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 得到输入数据,然后执行filter转换
env.fromElements("Good good study", "Day day up")
.map(_.toLowerCase)
.filter(_.contains("study"))
.print()
// 执行
env.execute("flink filter transformatiion")
}
}
Java代码:
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* filter转换
*/
public class TransformerFilter {
public static void main(String[] args) throws Exception {
// 设置流执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 首先从环境中获取一些数据,再执行map和filter转换:
env.fromElements("Good good study","Day day up")
.map(String::toLowerCase)
.filter((FilterFunction) s -> s.contains("study"))
.print();
// 对于流程序,只有执行了下面这个方法,流程序才真正开始执行
env.execute("flink map transformatiion");
}
}
输出结果如下所示:
good good study
课程章节 返回课程首页
-
Ch01 Flink架构与集群搭建
-
Ch02 Flink开发准备
-
Ch03 开发Flink实时处理程序
- Flink流处理程序编程模型
- Flink流应用程序剖析
- 读取Socket数据源
- 读取文件数据源
- 使用集合数据源
- 自定义数据源
- 数据转换-map
- 数据转换-flatMap
- 数据转换-filter
- 数据转换-keyBy
- 数据转换-reduce
- 数据转换-聚合转换
- 数据转换-union
- 数据转换-connect
- 数据转换-project
- Flink数据分区
- 自定义数据分区示例
- 将流计算结果写出到CSV文件
- 将流计算结果保存到MySQL数据库中
- 时间和水印
- 水印策略
- Flink窗口操作概念及示例
- Flink低级操作API_ProcessFunction
- 案例:服务器故障检测报警程序 VIP
- 侧输出流
- 使用Kafka连接器
- 使用JDBC连接器
-
Ch04 Flink流处理案例
-
ch05 状态和容错