ClickHouse集成Spark的几种方式
目前,Spark本身尚未对ClickHouse提供完善友好的支持。如果我们要使用Spark读写ClickHouse的话,可以使用ClickHouse官方提供的JDBC驱动实现,也可以使用第三方的JDBC驱动实现。
- 准备ClickHouse测试数据。
- 在Spark中使用ClickHouse官方提供的JDBC驱动。
- 在Spark中使用第三方housepower提供的JDBC驱动。
- 在Spark中使用第三方Clickhouse4j提供的JDBC驱动。
准备ClickHouse测试数据
创建一个名为test的数据库,并在该数据库中创建一个名为visit的表,用于跟踪网站访问时长。
1)先运行以下命令,启动一个客户端会话:
$ clickhouse-client --multiline
2)通过执行以下命令创建test数据库:
xueai8 :) CREATE DATABASE test;
3)确认要使用的数据库test:
xueai8 :) USE test;
4)运行下面这个命令创建visits表:
xueai8 :) CREATE TABLE visits ( id UInt64, duration Float64, url String, created DateTime ) ENGINE = MergeTree() PRIMARY KEY id ORDER BY id;
5)通过运行以下语句将几行示例网站访问数据插入到刚创建的visits表中:
xueai8 :) INSERT INTO visits VALUES (1, 10.5, 'http://example.com', '2019-01-01 00:01:01'); xueai8 :) INSERT INTO visits VALUES (2, 40.2, 'http://example1.com', '2019-01-03 10:01:01'); xueai8 :) INSERT INTO visits VALUES (3, 13, 'http://example2.com', '2019-01-03 12:01:01'); xueai8 :) INSERT INTO visits VALUES (4, 2, 'http://example3.com', '2019-01-04 02:01:01');
6)查询数据:
xueai8 :) SELECT * FROM visits;
一切正常的话,应该可以看到查询出来的表数据。
在Spark中使用ClickHouse官方提供的JDBC驱动
官方提供的JDBC Driver(yandex/clickhouse-jdbc)使用8123端口。 它基于HTTP实现,整体性能不太出色,大量数据写入时有可能出现超时的现象。
官方提供的yandex/clickhouse-jdbc,如果正能量来说,是比较丰富;如果负能量来说,就是比较杂乱。大约分为如下三个阶段:
- 0.3.1及之前的版本:驱动程序为ru.yandex.clickhouse.ClickHouseDriver。
- 0.3.2版本:驱动程序同时支持ru.yandex.clickhouse.ClickHouseDriver和com.clickhouse.jdbc.ClickHouseDriver两种。
- 0.4.x:驱动程序为com.clickhouse.jdbc.ClickHouseDriver。
所以,在使用时,请确认好你使用的jdbc驱动版本,并提供正确的驱动程序名称。
1. 在下面这个示例中,我们使用0.3.2版本。
Maven依赖
<dependency>
<groupId>ru.yandex.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.3.2</version>
</dependency>
在Spark中执行简单查询。代码如下:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
object ClickhouseSparkTest {
def main(args: Array[String]): Unit = {
// 创建SparkSession实例
val spark = SparkSession.builder()
.master("local[*]")
.appName("clickhouse test")
.getOrCreate()
testCk1(spark) // 官方jdbc(0.3.2)
}
/** 官方连接方式:JDBC Driver */
// 使用官方的jdbc(0.3.2版本)
def testCk1(spark:SparkSession): Unit = {
// clickhouse驱动程序和连接信息
val ckDriver = "ru.yandex.clickhouse.ClickHouseDriver" // 驱动程序(已被弃用,从0.4.0开始会被移除)
val ckUrl = "jdbc:clickhouse://IP:8123" // 数据库连接url
val ckUser = "default"
val ckPassword = ""
val df = spark.read
.format("jdbc")
.option("driver", ckDriver)
.option("url", ckUrl)
.option("user", ckUser)
.option("password", ckPassword)
.option("dbtable", "test.visits")
.load
df.show
}
}
执行以上程序,输出内容如下:
+---+--------+-------------------+-------------------+ | id|duration| url| created| +---+--------+-------------------+-------------------+ | 1| 10.5| http://example.com|2019-01-01 00:01:01| | 2| 40.2|http://example1.com|2019-01-03 10:01:01| | 3| 13.0|http://example2.com|2019-01-03 12:01:01| | 4| 2.0|http://example3.com|2019-01-04 02:01:01| +---+--------+-------------------+-------------------+
2. 在下面这个示例中,我们使用0.4.2版本。
Maven依赖
<dependency>
<groupId>com.clickhouse</groupId>
<artifactId>clickhouse-jdbc</artifactId>
<version>0.4.2</version>
</dependency>
在Spark中执行简单查询。代码如下:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
/*
ClickHouse和Spark集成测试
*/
object ClickhouseSparkTest {
def main(args: Array[String]): Unit = {
// 创建SparkSession实例
val spark = SparkSession.builder()
.master("local[*]")
.appName("clickhouse test")
.getOrCreate()
testCk2(spark) // 官方jdbc(0.4.x)
}
// 使用官方的jdbc(0.4.x版本)
def testCk2(spark:SparkSession): Unit = {
// clickhouse驱动程序和连接信息
// 驱动程序(0.4.0版本后,必须使用这个驱动程序)
val ckDriver = "com.clickhouse.jdbc.ClickHouseDriver"
val ckUrl = "jdbc:ch://IP" // 数据库连接url
// val ckUrl = "jdbc:clickhouse:http://IP:8123" // 或
val ckUser = "default"
val ckPassword = ""
val df = spark.read
.format("jdbc")
.option("driver", ckDriver)
.option("url", ckUrl)
.option("user", ckUser)
.option("password", ckPassword)
.option("dbtable", "test.visits")
.load
df.show
}
}
执行以上程序,输出内容如下:
+---+--------+-------------------+-------------------+ | id|duration| url| created| +---+--------+-------------------+-------------------+ | 1| 10.5| http://example.com|2019-01-01 00:01:01| | 2| 40.2|http://example1.com|2019-01-03 10:01:01| | 3| 13.0|http://example2.com|2019-01-03 12:01:01| | 4| 2.0|http://example3.com|2019-01-04 02:01:01| +---+--------+-------------------+-------------------+
在Spark中使用第三方housepower提供的JDBC驱动
有一个比较活跃并且性能也比较好的第三方housepower,也提供了两种读写ClickHouse的方式:
- 1)使用ClickHouse-Native-JDBC。
- 2)使用Spark集成包。
1)使用ClickHouse-Native-JDBC
ClickHouse-Native-JDBC是一个Native JDBC库,用于在Java中访问ClickHouse。
ClickHouse-Native-JDBC与yandex/clickhouse-jdbc的区别如下:
- (1)数据按列组织和压缩。
- (2)基于TCP协议实现,支持高性能写入。
但是也存在一些限制:
- (1)不支持插入复杂值表达式,如insert INTO test_table values (toDate(123456)),但查询是可以的。
- (2)不支持插入非值格式,如TSV。
- (3)不支持更多的压缩方法,如ZSTD。
Maven依赖
<dependency>
<groupId>com.github.housepower</groupId>
<artifactId>clickhouse-native-jdbc-shaded</artifactId>
<version>2.6.5</version>
</dependency>
<dependency>
<groupId>com.github.housepower</groupId>
<artifactId>clickhouse-native-jdbc</artifactId>
<version>2.6.5</version>
</dependency>
在Spark中执行简单查询。代码如下:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
/*
ClickHouse和Spark集成测试
*/
object ClickhouseSparkTest {
def main(args: Array[String]): Unit = {
// 创建SparkSession实例
val spark = SparkSession.builder()
.master("local[*]")
.appName("clickhouse test")
.getOrCreate()
testCk3(spark) // 使用housepower的ClickHouse-Native-JDBC
}
// 使用housepower的ClickHouse-Native-JDBC
def testCk3(spark:SparkSession): Unit = {
// clickhouse驱动程序和连接信息
val ckDriver = "com.github.housepower.jdbc.ClickHouseDriver" // 驱动程序
val ckUrl = "jdbc:clickhouse://IP:9000" // 数据库连接url
val ckUser = "default"
val ckPassword = ""
val df = spark.read
.format("jdbc")
.option("driver", ckDriver)
.option("url", ckUrl)
.option("user", ckUser)
.option("password", ckPassword)
.option("dbtable", "test.visits")
.load
df.show
}
}
执行以上程序,输出内容如下:
+---+--------+-------------------+-------------------+ | id|duration| url| created| +---+--------+-------------------+-------------------+ | 1| 10.5| http://example.com|2019-01-01 00:01:01| | 2| 40.2|http://example1.com|2019-01-03 10:01:01| | 3| 13.0|http://example2.com|2019-01-03 12:01:01| | 4| 2.0|http://example3.com|2019-01-04 02:01:01| +---+--------+-------------------+-------------------+
2)使用Spark集成包
housepower还提供了一个库,用于与Apache Spark集成。目前,实现基于Spark的JDBC API,支持数据类型映射、自动创建表、截断表、写入、读取等。
Maven依赖:
<!-- 从Spark 2.4.0 可用 -->
<dependency>
<groupId>com.github.housepower</groupId>
<artifactId>clickhouse-integration-spark_2.12</artifactId>
<version>2.6.5</version>
</dependency>
确保在使用ClickHouseDialect之前注册它:
JdbcDialects.registerDialect(ClickHouseDialect)
在Spark中执行简单查询。代码如下:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.jdbc.{ClickHouseDialect,JdbcDialects}
/*
ClickHouse和Spark集成测试
*/
object ClickhouseSparkTest {
def main(args: Array[String]): Unit = {
// 创建SparkSession实例
val spark = SparkSession.builder()
.master("local[*]")
.appName("clickhouse test")
.getOrCreate()
testCk4(spark) // 使用housepower的Spark集成包
}
// 使用housepower的Spark集成包
// 目前,实现基于Spark的JDBC API,支持数据类型映射、自动创建表、截断表、写入、读取等。
def testCk4(spark:SparkSession): Unit = {
// 注册ClickHouseDialect
JdbcDialects.registerDialect(ClickHouseDialect)
// clickhouse驱动程序和连接信息
val ckDriver = "com.github.housepower.jdbc.ClickHouseDriver" // 驱动程序
val ckUrl = "jdbc:clickhouse://IP:9000" // 数据库连接url
val ckUser = "default"
val ckPassword = ""
val df = spark.read
.format("jdbc")
.option("driver", ckDriver)
.option("url", ckUrl)
.option("user", ckUser)
.option("password", ckPassword)
.option("dbtable", "test.visits")
.load
df.show
}
}
执行以上程序,输出内容如下:
+---+--------+-------------------+-------------------+ | id|duration| url| created| +---+--------+-------------------+-------------------+ | 1| 10.5| http://example.com|2019-01-01 00:01:01| | 2| 40.2|http://example1.com|2019-01-03 10:01:01| | 3| 13.0|http://example2.com|2019-01-03 12:01:01| | 4| 2.0|http://example3.com|2019-01-04 02:01:01| +---+--------+-------------------+-------------------+
在Spark中使用第三方Clickhouse4j提供的JDBC驱动
Clickhouse4j提供的JDBC驱动基于HTTP,但是相比官方进行了大量优化,更加轻量级和快速。不过最近两年该项目似乎不太活跃了。
Maven依赖:
<dependency>
<groupId>cc.blynk.clickhouse</groupId>
<artifactId>clickhouse4j</artifactId>
<version>1.4.4</version>
</dependency>
在Spark中执行简单查询。代码如下:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
/*
ClickHouse和Spark集成测试
*/
object ClickhouseSparkTest {
def main(args: Array[String]): Unit = {
// 创建SparkSession实例
val spark = SparkSession.builder()
.master("local[*]")
.appName("clickhouse test")
.getOrCreate()
testCk5(spark) // 使用Clickhouse4j
}
// 使用Clickhouse4j
// 基于HTTP,但是相比官方进行了大量优化,更加轻量级和快速。
def testCk5(spark:SparkSession): Unit = {
// clickhouse驱动程序和连接信息
val ckDriver = "cc.blynk.clickhouse.ClickHouseDriver" // 驱动程序
val ckUrl = "jdbc:clickhouse://IP:8123" // 数据库连接url
val ckUser = "default"
val ckPassword = ""
val df = spark.read
.format("jdbc")
.option("driver", ckDriver)
.option("url", ckUrl)
.option("user", ckUser)
.option("password", ckPassword)
.option("dbtable", "test.visits")
.load
df.show
}
}
执行以上程序,输出内容如下:
+---+--------+-------------------+-------------------+ | id|duration| url| created| +---+--------+-------------------+-------------------+ | 1| 10.5| http://example.com|2019-01-01 00:01:01| | 2| 40.2|http://example1.com|2019-01-03 10:01:01| | 3| 13.0|http://example2.com|2019-01-03 12:01:01| | 4| 2.0|http://example3.com|2019-01-04 02:01:01| +---+--------+-------------------+-------------------+