将流计算结果写出到CSV文件
在使用 Flink 进行数据处理时,数据经数据源流入,然后通过系列转换,最终可以通过Data Sink将计算结果进行输出,Flink Data Sinks(数据接收器)就是用于定义数据流最终的输出位置,它消费数据流并将它们转发到文件、套接字、外部系统或打印输出。
内置数据接收器
Flink提供了多种内置的Data Sink API 用于日常的开发,具体如下:
- writeAsText("/path/to/file"):用于将计算结果以字符串的方式并行地写入到指定文件夹下。这些字符串是通过调用每个元素的toString()方法获得的,使用的输出类是TextOutputFormat。这个方法除了路径参数是必选外,还可以通过指定第二个参数来定义输出模式。输出模式有以下两个可选值:
- WriteMode.NO_OVERWRITE:当指定路径上不存在任何文件时,才执行写出操作;
- WriteMode.OVERWRITE:不论指定路径上是否存在文件,都执行写出操作;如果原来已有文件,则进行覆盖。
- writeAsCsv("/path/to/file"):用于将计算结果以CSV的文件格式写出到指定目录。行和字段分隔符是可配置的。每个字段的值来自对象的toString()方法。使用的输出类是CsvOutputFormat。
- print()/printToErr():在标准输出/标准错误流上打印输出每个元素的toString()值。可选地,可以提供输出的前缀,这有助于区分不同的print调用。如果并行度大于1,输出也将以产生输出的任务的id作为前缀。print/printToErr是测试当中最常用的方式,用于将计算结果以标准输出流或错误输出流的方式打印到控制台上。
- writeUsingOutputFormat():自定义文件输出的基类和方法。支持自定义对象到字节的转换。在定义自定义格式时,需要继承自FileOutputFormat,它负责序列化和反序列化。上面介绍的 writeAsText 和 writeAsCsv 其底层调用的都是该方法。
- writeToSocket(host, port, SerializationSchema):用于将计算结果以指定的格式写出到指定的socket套接字。为了正确的序列化和格式化,需要定义SerializationSchema。
- addSink:调用自定义接收器函数。Flink与作为接收器函数实现的其他系统(如Apache Kafka)的连接器捆绑在一起。
注:以上方法中,以writeAs*开头的方法,在Flink API文档中,已经标识为"Deprecated",即弃用状态,在未来的版本中有可能被删除,因此使用时要慎重。原因请参见下一节。
【示例】分析流数据,并将分析结果写出到csv文件中。
下面我们通过示例来掌所致常用Flink Data Sink的用法。请按以下步骤执行。
1、在IntelliJ IDEA中创建一个Flink项目,使用flink-quickstart-scala/flink-quickstart-java项目模板。(Flink项目创建过程,请参见2.2节)
2、设置依赖。在pom.xml中添加如下依赖:
Scala Maven依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>1.13.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>1.13.2</version>
<provided</scope>
</dependency>
Java Maven依赖:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.13.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>1.13.2</version>
<scope>provided</scope>
</dependency>
3、创建流应用程序类。代码如下。
Scala代码:
import org.apache.flink.streaming.api.scala._
/**
* Data Sink:writeAsCSV方法,将结果写入到csv文件
*/
object DataSinkAsCSV {
def main(args: Array[String]): Unit = {
// 设置流执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 得到输入数据,进行转换:
env.fromElements("Good good study", "Day day up")
.map(_.toLowerCase)
.flatMap(_.split("\\W+")) // 相当于先map,再flatten
.map((_,1)) // 转换为元组
.writeAsCsv("result.csv") // 写出到结果文件中
.setParallelism(1) // 结果写到单个文件中
env.execute("Data Sink Demo")
}
}
Java代码:
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
/**
* Data Sink:writeAsCSV方法,将结果写入到csv文件
*/
public class DataSinkDemo1 {
public static void main(String[] args) throws Exception {
// 设置流执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 获得数据,执行map和flatMap转换
env.fromElements("Good good study","Day day up")
.map(String::toLowerCase)
.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String,Integer>> out) throws Exception {
for(String word : s.split("\\W+")){
out.collect(new Tuple2<>(word,1));
}
}
})
.writeAsCsv("result.csv") // 写出到指定的结果文件中
.setParallelism(1); // 写出到一个结果文件中
// 执行流程序
env.execute("Data Sink Demo");
}
}
4、执行以上程序,可以看到,在项目的根目录下生成了一个结果文件result。查看输出的结果文件result,内容如下所示:
good,1 good,1 study,1 day,1 day,1 up,1