将流计算结果保存到MySQL数据库中
Flink还支持使用自定义的Sink来满足多样化的输出需求。
想要实现自定义的Sink,需要直接或者间接实现SinkFunction接口。通常情况下,我们都是实现其抽象类RichSinkFunction,相比于 SinkFunction,其提供了更多的与生命周期相关的方法。
【示例】改写上一示例,将流计算结果保存到MySQL数据库中。
下面我们自定义一个 FlinkToMySQLSink,将流计算结果写出到指定的MySQL数据库中。
1、在MySQL的xueai8数据库中,创建一个数据表wc,用来存储Flink的计算结果。创建wc表的脚本代码如下:
mysql> create table wc(word varchar(30), cnt int);
2、在IntelliJ IDEA中创建一个Flink项目,使用flink-quickstart-scala/flink-quickstart-java项目模板。(Flink项目创建过程,请参见2.2节)
3、设置依赖。在pom.xml中添加依赖。
4、创建流应用程序类。代码如下:
Scala代码:
import java.sql.{Connection, DriverManager, PreparedStatement}
import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.functions.sink.SinkFunction.Context
import org.apache.flink.streaming.api.scala._
/**
* Created by www.xueai8.com
* Data Sink:使用addSink(...)方法,指定MySQL数据库作为 Data Sink
*/
object DataSinkToMysql {
// 自定义的 Sink类,继承自 RichSinkFunction
class FlinkToMySQLSink extends RichSinkFunction[(String, Int)] {
// 定义mysql数据库连接url和驱动程序及账号、密码
val url = "jdbc:mysql://192.168.190.133:3306/xueai8?characterEncoding=UTF-8&useSSL=false"
val driver = "com.mysql.jdbc.Driver"
val username = "root"
val userpwd = "admin"
// 声明数据库连接对象和SQL预编译语句
var conn: Connection = _
var stmt: PreparedStatement = _
// 重写open方法:加载驱动,建立连接,预编译SQL语句
// 先于invoker方法用,仅执行一次
override def open(parameters: Configuration): Unit = {
super.open(parameters)
// 加载驱动程序
Class.forName(driver)
// 连接数据库
conn = DriverManager.getConnection(url, username, userpwd)
// 执行SQL语句
val sql = "insert into wc(word, cnt) values(?, ?)"
stmt = conn.prepareStatement(sql)
}
// 重写事件驱动方法调用
override def invoke(value: (String, Int), context: Context): Unit = {
stmt.setString(1, value._1)
stmt.setInt(2, value._2)
stmt.executeUpdate
}
// 关闭资源
// 后于invoke方法调用,仅执行一次
override def close(): Unit = {
super.close()
if (stmt != null) stmt.close()
if (conn != null) conn.close()
}
}
// 测试
def main(args: Array[String]): Unit = {
// 设置流执行环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
// 为流数据启用检查点
env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE)
// 得到输入数据,进行转换:
val input = env.fromElements("Good good study", "Day day up")
.map(_.toLowerCase) // 转小写
.flatMap(_.split("\\W+")) // 相当于先map,再flatten
.map((_,1)) // 转换为元组
// 使用自定义的Sink
input.addSink(new FlinkToMySQLSink) // 写出到数据库中
// 触发流程序执行
env.execute("Data Sink Demo")
}
}
Java代码:
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.util.Collector;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
/**
* Data Sink:使用addSink(...)方法,指定MySQL数据库作为 Data Sink
*/
public class DataSinkToMysql {
// 自定义的 Sink类,继承自 RichSinkFunction
public static class FlinkToMySQLSink extends RichSinkFunction<Tuple2<String,Integer>> {
// 定义mysql数据库连接url和驱动程序及账号、密码
private static String JDBC_URL = "jdbc:mysql://localhost:3306/xueai8" +
"?characterEncoding=UTF-8&useSSL=false";
private static String USER_NAME = "root";
private static String USER_PWD = "1234";
// 声明数据库连接对象和SQL预编译语句
private PreparedStatement stmt;
private Connection conn;
// 重写open方法:加载驱动,建立连接,预编译SQL语句
@Override
public void open(Configuration parameters) throws Exception {
Class.forName("com.mysql.jdbc.Driver");
conn = DriverManager.getConnection(JDBC_URL, USER_NAME, USER_PWD);
String sql = "insert into wc(word, cnt) values(?, ?)";
stmt = conn.prepareStatement(sql);
}
// 重写事件驱动方法调用
@Override
public void invoke(Tuple2<String,Integer> t, Context context) throws Exception {
stmt.setString(1, t.f0);
stmt.setInt(2, t.f1);
stmt.executeUpdate();
}
// 关闭资源
@Override
public void close() throws Exception {
super.close();
if (stmt != null) {
stmt.close();
}
if (conn != null) {
conn.close();
}
}
}
public static void main(String[] args) throws Exception {
// 设置流执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 为流数据启用检查点
env.enableCheckpointing(2000);
// 获得数据,执行map和flatMap转换
DataStream<Tuple2<String,Integer>> stream = env
.fromElements("Good good study","Day day up")
.map(String::toLowerCase)
.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
@Override
public void flatMap(String s, Collector<Tuple2<String,Integer>> out) throws Exception {
for(String word : s.split("\\W+")){
out.collect(new Tuple2<>(word,1));
}
}
});
// 指定使用自定义的Sink
stream.addSink(new FlinkToMySQLSink());
// 执行流程序
env.execute("Data Sink Demo");
}
}
执行以上程序,然后在MySQL中查看表内容:
mysql> select * from wc;
可以看到,流程序计算的结果已经保存到数据库中了。
在上面的代码中,我们自定义Sink是继承自RichSinkFunction类,它是SinkFunction的“Rich”变体。与SinkFunction相比,它有一些额外的方法,包括:
- open(Configuration c)
- close()
- getRuntimeContext()
在操作符初始化期间调用Open()一次。例如,这是加载一些静态数据或打开到外部服务的连接的机会。而getRuntimeContext()提供了对底层环境的访问能力,例如创建和访问由Flink管理的状态。