将流计算结果保存到MySQL数据库中

Flink还支持使用自定义的Sink来满足多样化的输出需求。

想要实现自定义的Sink,需要直接或者间接实现SinkFunction接口。通常情况下,我们都是实现其抽象类RichSinkFunction,相比于 SinkFunction,其提供了更多的与生命周期相关的方法。

【示例】改写上一示例,将流计算结果保存到MySQL数据库中。

下面我们自定义一个 FlinkToMySQLSink,将流计算结果写出到指定的MySQL数据库中。

1、在MySQL的xueai8数据库中,创建一个数据表wc,用来存储Flink的计算结果。创建wc表的脚本代码如下:

mysql> create table wc(word varchar(30), cnt int);

2、在IntelliJ IDEA中创建一个Flink项目,使用flink-quickstart-scala/flink-quickstart-java项目模板。(Flink项目创建过程,请参见2.2节)

3、设置依赖。在pom.xml中添加依赖。

4、创建流应用程序类。代码如下:

Scala代码:

import java.sql.{Connection, DriverManager, PreparedStatement}

import org.apache.flink.configuration.Configuration
import org.apache.flink.streaming.api.CheckpointingMode
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
import org.apache.flink.streaming.api.functions.sink.SinkFunction.Context
import org.apache.flink.streaming.api.scala._

/**
  * Created by www.xueai8.com
  * Data Sink:使用addSink(...)方法,指定MySQL数据库作为 Data Sink
  */
object DataSinkToMysql {

  // 自定义的 Sink类,继承自 RichSinkFunction
  class FlinkToMySQLSink extends RichSinkFunction[(String, Int)] {
    // 定义mysql数据库连接url和驱动程序及账号、密码
    val url = "jdbc:mysql://192.168.190.133:3306/xueai8?characterEncoding=UTF-8&useSSL=false"
    val driver = "com.mysql.jdbc.Driver"
    val username = "root"
    val userpwd = "admin"

    // 声明数据库连接对象和SQL预编译语句
    var conn: Connection = _
    var stmt: PreparedStatement = _

    // 重写open方法:加载驱动,建立连接,预编译SQL语句
    // 先于invoker方法用,仅执行一次
    override def open(parameters: Configuration): Unit = {
      super.open(parameters)
      // 加载驱动程序
      Class.forName(driver)
      // 连接数据库
      conn = DriverManager.getConnection(url, username, userpwd)
      // 执行SQL语句
      val sql = "insert into wc(word, cnt) values(?, ?)"
      stmt = conn.prepareStatement(sql)
    }

    // 重写事件驱动方法调用
    override def invoke(value: (String, Int), context: Context): Unit = {
      stmt.setString(1, value._1)
      stmt.setInt(2, value._2)
      stmt.executeUpdate
    }

    // 关闭资源
    // 后于invoke方法调用,仅执行一次
    override def close(): Unit = {
      super.close()
      if (stmt != null) stmt.close()
      if (conn != null) conn.close()
    }
  }

  // 测试
  def main(args: Array[String]): Unit = {
    // 设置流执行环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // 为流数据启用检查点
    env.enableCheckpointing(2000, CheckpointingMode.EXACTLY_ONCE)

    // 得到输入数据,进行转换:
    val input = env.fromElements("Good good study", "Day day up")
        .map(_.toLowerCase)                 // 转小写
        .flatMap(_.split("\\W+"))           // 相当于先map,再flatten
        .map((_,1))                         // 转换为元组

    // 使用自定义的Sink
    input.addSink(new FlinkToMySQLSink)     // 写出到数据库中

    // 触发流程序执行
    env.execute("Data Sink Demo")
  }
}

Java代码:

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.util.Collector;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

/**
 * Data Sink:使用addSink(...)方法,指定MySQL数据库作为 Data Sink
 */
public class DataSinkToMysql {

	// 自定义的 Sink类,继承自 RichSinkFunction
	public static class FlinkToMySQLSink extends RichSinkFunction<Tuple2<String,Integer>> {
		// 定义mysql数据库连接url和驱动程序及账号、密码
		private static String JDBC_URL = "jdbc:mysql://localhost:3306/xueai8" +
				"?characterEncoding=UTF-8&useSSL=false";
		private static String USER_NAME = "root";
		private static String USER_PWD = "1234";

		// 声明数据库连接对象和SQL预编译语句
		private PreparedStatement stmt;
		private Connection conn;

		// 重写open方法:加载驱动,建立连接,预编译SQL语句
		@Override
		public void open(Configuration parameters) throws Exception {
			Class.forName("com.mysql.jdbc.Driver");
			conn = DriverManager.getConnection(JDBC_URL, USER_NAME, USER_PWD);
			String sql = "insert into wc(word, cnt) values(?, ?)";
			stmt = conn.prepareStatement(sql);
		}

		// 重写事件驱动方法调用
		@Override
		public void invoke(Tuple2<String,Integer> t, Context context) throws Exception {
			stmt.setString(1, t.f0);
			stmt.setInt(2, t.f1);
			stmt.executeUpdate();
		}

		// 关闭资源
		@Override
		public void close() throws Exception {
			super.close();
			if (stmt != null) {
				stmt.close();
			}
			if (conn != null) {
				conn.close();
			}
		}
	}

	public static void main(String[] args) throws Exception {
		// 设置流执行环境
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		// 为流数据启用检查点
		env.enableCheckpointing(2000);

		// 获得数据,执行map和flatMap转换
		DataStream<Tuple2<String,Integer>> stream = env
				.fromElements("Good good study","Day day up")
				.map(String::toLowerCase)
				.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
					@Override
					public void flatMap(String s, Collector<Tuple2<String,Integer>> out) throws Exception {
						for(String word : s.split("\\W+")){
							out.collect(new Tuple2<>(word,1));
						}
					}
				});

		// 指定使用自定义的Sink
		stream.addSink(new FlinkToMySQLSink());

		// 执行流程序
		env.execute("Data Sink Demo");
	}
}

执行以上程序,然后在MySQL中查看表内容:

mysql> select * from wc;

可以看到,流程序计算的结果已经保存到数据库中了。

在上面的代码中,我们自定义Sink是继承自RichSinkFunction类,它是SinkFunction的“Rich”变体。与SinkFunction相比,它有一些额外的方法,包括:

  • open(Configuration c)
  • close()
  • getRuntimeContext()

在操作符初始化期间调用Open()一次。例如,这是加载一些静态数据或打开到外部服务的连接的机会。而getRuntimeContext()提供了对底层环境的访问能力,例如创建和访问由Flink管理的状态。


《PySpark原理深入与编程实战》