发布日期:2023-04-11 VIP内容

ClickHouse集成Spark的几种方式

目前,Spark本身尚未对ClickHouse提供完善友好的支持。如果我们要使用Spark读写ClickHouse的话,可以使用ClickHouse官方提供的JDBC驱动实现,也可以使用第三方的JDBC驱动实现。

准备ClickHouse测试数据

创建一个名为test的数据库,并在该数据库中创建一个名为visit的表,用于跟踪网站访问时长。

1)先运行以下命令,启动一个客户端会话:

$ clickhouse-client --multiline

2)通过执行以下命令创建test数据库:

xueai8 :) CREATE DATABASE test;

3)确认要使用的数据库test:

xueai8 :) USE test;

4)运行下面这个命令创建visits表:

xueai8 :) CREATE TABLE visits (
   id UInt64,
   duration Float64,
   url String,
   created DateTime
) ENGINE = MergeTree() 
PRIMARY KEY id 
ORDER BY id;

5)通过运行以下语句将几行示例网站访问数据插入到刚创建的visits表中:

xueai8 :) INSERT INTO visits VALUES (1, 10.5, 'http://example.com', '2019-01-01 00:01:01');
xueai8 :) INSERT INTO visits VALUES (2, 40.2, 'http://example1.com', '2019-01-03 10:01:01');
xueai8 :) INSERT INTO visits VALUES (3, 13, 'http://example2.com', '2019-01-03 12:01:01');
xueai8 :) INSERT INTO visits VALUES (4, 2, 'http://example3.com', '2019-01-04 02:01:01');

6)查询数据:

xueai8 :) SELECT * FROM visits;

一切正常的话,应该可以看到查询出来的表数据。

在Spark中使用ClickHouse官方提供的JDBC驱动

官方提供的JDBC Driver(yandex/clickhouse-jdbc)使用8123端口。 它基于HTTP实现,整体性能不太出色,大量数据写入时有可能出现超时的现象。

官方提供的yandex/clickhouse-jdbc,如果正能量来说,是比较丰富;如果负能量来说,就是比较杂乱。大约分为如下三个阶段:

  • 0.3.1及之前的版本:驱动程序为ru.yandex.clickhouse.ClickHouseDriver。
  • 0.3.2版本:驱动程序同时支持ru.yandex.clickhouse.ClickHouseDriver和com.clickhouse.jdbc.ClickHouseDriver两种。
  • 0.4.x:驱动程序为com.clickhouse.jdbc.ClickHouseDriver。

所以,在使用时,请确认好你使用的jdbc驱动版本,并提供正确的驱动程序名称。

1. 在下面这个示例中,我们使用0.3.2版本。

Maven依赖

<dependency>
      <groupId>ru.yandex.clickhouse</groupId>
      <artifactId>clickhouse-jdbc</artifactId>
      <version>0.3.2</version>
</dependency>

在Spark中执行简单查询。代码如下:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

object ClickhouseSparkTest {

  def main(args: Array[String]): Unit = {
    // 创建SparkSession实例
    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("clickhouse test")
      .getOrCreate()

      testCk1(spark)   // 官方jdbc(0.3.2)
  }

  /** 官方连接方式:JDBC Driver */
  // 使用官方的jdbc(0.3.2版本)
  def testCk1(spark:SparkSession): Unit = {
    // clickhouse驱动程序和连接信息
    val ckDriver = "ru.yandex.clickhouse.ClickHouseDriver" // 驱动程序(已被弃用,从0.4.0开始会被移除)
    val ckUrl = "jdbc:clickhouse://IP:8123"  // 数据库连接url
    val ckUser = "default"
    val ckPassword = ""

    val df = spark.read
      .format("jdbc")
      .option("driver", ckDriver)
      .option("url", ckUrl)
      .option("user", ckUser)
      .option("password", ckPassword)
      .option("dbtable", "test.visits")
      .load

    df.show
  }
}

执行以上程序,输出内容如下:

+---+--------+-------------------+-------------------+
| id|duration|                url|            created|
+---+--------+-------------------+-------------------+
|  1|    10.5| http://example.com|2019-01-01 00:01:01|
|  2|    40.2|http://example1.com|2019-01-03 10:01:01|
|  3|    13.0|http://example2.com|2019-01-03 12:01:01|
|  4|     2.0|http://example3.com|2019-01-04 02:01:01|
+---+--------+-------------------+-------------------+

2. 在下面这个示例中,我们使用0.4.2版本。

Maven依赖

<dependency>
      <groupId>com.clickhouse</groupId>
      <artifactId>clickhouse-jdbc</artifactId>
      <version>0.4.2</version>
</dependency>

在Spark中执行简单查询。代码如下:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

/*
  ClickHouse和Spark集成测试

 */
object ClickhouseSparkTest {

  def main(args: Array[String]): Unit = {
    // 创建SparkSession实例
    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("clickhouse test")
      .getOrCreate()

      testCk2(spark)  // 官方jdbc(0.4.x)
  }

  // 使用官方的jdbc(0.4.x版本)
  def testCk2(spark:SparkSession): Unit = {
    // clickhouse驱动程序和连接信息
    // 驱动程序(0.4.0版本后,必须使用这个驱动程序)
    val ckDriver = "com.clickhouse.jdbc.ClickHouseDriver"
    val ckUrl = "jdbc:ch://IP"  // 数据库连接url
    // val ckUrl = "jdbc:clickhouse:http://IP:8123"  // 或
    val ckUser = "default"
    val ckPassword = ""

    val df = spark.read
      .format("jdbc")
      .option("driver", ckDriver)
      .option("url", ckUrl)
      .option("user", ckUser)
      .option("password", ckPassword)
      .option("dbtable", "test.visits")
      .load

    df.show
  }
}

执行以上程序,输出内容如下:

+---+--------+-------------------+-------------------+
| id|duration|                url|            created|
+---+--------+-------------------+-------------------+
|  1|    10.5| http://example.com|2019-01-01 00:01:01|
|  2|    40.2|http://example1.com|2019-01-03 10:01:01|
|  3|    13.0|http://example2.com|2019-01-03 12:01:01|
|  4|     2.0|http://example3.com|2019-01-04 02:01:01|
+---+--------+-------------------+-------------------+

在Spark中使用第三方housepower提供的JDBC驱动

有一个比较活跃并且性能也比较好的第三方housepower,也提供了两种读写ClickHouse的方式:

  • 1)使用ClickHouse-Native-JDBC。
  • 2)使用Spark集成包。

1)使用ClickHouse-Native-JDBC

ClickHouse-Native-JDBC是一个Native JDBC库,用于在Java中访问ClickHouse。

ClickHouse-Native-JDBC与yandex/clickhouse-jdbc的区别如下:

  • (1)数据按列组织和压缩。
  • (2)基于TCP协议实现,支持高性能写入。

但是也存在一些限制:

  • (1)不支持插入复杂值表达式,如insert INTO test_table values (toDate(123456)),但查询是可以的。
  • (2)不支持插入非值格式,如TSV。
  • (3)不支持更多的压缩方法,如ZSTD。

Maven依赖


<dependency>
    <groupId>com.github.housepower</groupId>
    <artifactId>clickhouse-native-jdbc-shaded</artifactId>
    <version>2.6.5</version>
</dependency>


<dependency>
    <groupId>com.github.housepower</groupId>
    <artifactId>clickhouse-native-jdbc</artifactId>
    <version>2.6.5</version>
</dependency>

在Spark中执行简单查询。代码如下:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

/*
  ClickHouse和Spark集成测试

 */
object ClickhouseSparkTest {

  def main(args: Array[String]): Unit = {
    // 创建SparkSession实例
    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("clickhouse test")
      .getOrCreate()

      testCk3(spark)  // 使用housepower的ClickHouse-Native-JDBC
  }

  // 使用housepower的ClickHouse-Native-JDBC
  def testCk3(spark:SparkSession): Unit = {
    // clickhouse驱动程序和连接信息
    val ckDriver = "com.github.housepower.jdbc.ClickHouseDriver" // 驱动程序
    val ckUrl = "jdbc:clickhouse://IP:9000"  // 数据库连接url
    val ckUser = "default"
    val ckPassword = ""

    val df = spark.read
      .format("jdbc")
      .option("driver", ckDriver)
      .option("url", ckUrl)
      .option("user", ckUser)
      .option("password", ckPassword)
      .option("dbtable", "test.visits")
      .load

    df.show
  }
}

执行以上程序,输出内容如下:

+---+--------+-------------------+-------------------+
| id|duration|                url|            created|
+---+--------+-------------------+-------------------+
|  1|    10.5| http://example.com|2019-01-01 00:01:01|
|  2|    40.2|http://example1.com|2019-01-03 10:01:01|
|  3|    13.0|http://example2.com|2019-01-03 12:01:01|
|  4|     2.0|http://example3.com|2019-01-04 02:01:01|
+---+--------+-------------------+-------------------+

2)使用Spark集成包

housepower还提供了一个库,用于与Apache Spark集成。目前,实现基于Spark的JDBC API,支持数据类型映射、自动创建表、截断表、写入、读取等。

Maven依赖:

<!-- 从Spark 2.4.0 可用 -->
<dependency>
    <groupId>com.github.housepower</groupId>
    <artifactId>clickhouse-integration-spark_2.12</artifactId>
    <version>2.6.5</version>
</dependency>

确保在使用ClickHouseDialect之前注册它:

JdbcDialects.registerDialect(ClickHouseDialect)

在Spark中执行简单查询。代码如下:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.jdbc.{ClickHouseDialect,JdbcDialects}

/*
  ClickHouse和Spark集成测试
 */
object ClickhouseSparkTest {

  def main(args: Array[String]): Unit = {
    // 创建SparkSession实例
    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("clickhouse test")
      .getOrCreate()

      testCk4(spark)  // 使用housepower的Spark集成包
  }

  // 使用housepower的Spark集成包
  // 目前,实现基于Spark的JDBC API,支持数据类型映射、自动创建表、截断表、写入、读取等。
  def testCk4(spark:SparkSession): Unit = {
    // 注册ClickHouseDialect
    JdbcDialects.registerDialect(ClickHouseDialect)

    // clickhouse驱动程序和连接信息
    val ckDriver = "com.github.housepower.jdbc.ClickHouseDriver" // 驱动程序
    val ckUrl = "jdbc:clickhouse://IP:9000"  // 数据库连接url
    val ckUser = "default"
    val ckPassword = ""

    val df = spark.read
      .format("jdbc")
      .option("driver", ckDriver)
      .option("url", ckUrl)
      .option("user", ckUser)
      .option("password", ckPassword)
      .option("dbtable", "test.visits")
      .load

    df.show
  }
}

执行以上程序,输出内容如下:

+---+--------+-------------------+-------------------+
| id|duration|                url|            created|
+---+--------+-------------------+-------------------+
|  1|    10.5| http://example.com|2019-01-01 00:01:01|
|  2|    40.2|http://example1.com|2019-01-03 10:01:01|
|  3|    13.0|http://example2.com|2019-01-03 12:01:01|
|  4|     2.0|http://example3.com|2019-01-04 02:01:01|
+---+--------+-------------------+-------------------+

在Spark中使用第三方Clickhouse4j提供的JDBC驱动

Clickhouse4j提供的JDBC驱动基于HTTP,但是相比官方进行了大量优化,更加轻量级和快速。不过最近两年该项目似乎不太活跃了。

Maven依赖:

<dependency>
     <groupId>cc.blynk.clickhouse</groupId>
     <artifactId>clickhouse4j</artifactId>
     <version>1.4.4</version>
</dependency>

在Spark中执行简单查询。代码如下:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

/*
  ClickHouse和Spark集成测试
 */
object ClickhouseSparkTest {

  def main(args: Array[String]): Unit = {
    // 创建SparkSession实例
    val spark = SparkSession.builder()
      .master("local[*]")
      .appName("clickhouse test")
      .getOrCreate()

      testCk5(spark)  // 使用Clickhouse4j
  }

  // 使用Clickhouse4j
  // 基于HTTP,但是相比官方进行了大量优化,更加轻量级和快速。
  def testCk5(spark:SparkSession): Unit = {
    // clickhouse驱动程序和连接信息
    val ckDriver = "cc.blynk.clickhouse.ClickHouseDriver" // 驱动程序
    val ckUrl = "jdbc:clickhouse://IP:8123"  // 数据库连接url
    val ckUser = "default"
    val ckPassword = ""

    val df = spark.read
      .format("jdbc")
      .option("driver", ckDriver)
      .option("url", ckUrl)
      .option("user", ckUser)
      .option("password", ckPassword)
      .option("dbtable", "test.visits")
      .load

    df.show
  }
}

执行以上程序,输出内容如下:

+---+--------+-------------------+-------------------+
| id|duration|                url|            created|
+---+--------+-------------------+-------------------+
|  1|    10.5| http://example.com|2019-01-01 00:01:01|
|  2|    40.2|http://example1.com|2019-01-03 10:01:01|
|  3|    13.0|http://example2.com|2019-01-03 12:01:01|
|  4|     2.0|http://example3.com|2019-01-04 02:01:01|
+---+--------+-------------------+-------------------+