发布日期:2022-11-13
VIP内容
增量查询与时间点查询
增量查询
Hudi还提供了获取自给定提交时间戳以来更改的记录流的功能。这可以通过使用Hudi的增量查询来实现,并提供一个开始时间,更改需要从该时间流开始。如果想要在提交之后进行的所有更改(这是常见的情况),则不需要指定endTime。代码如下:
// spark-shell
// 重新加载数据
spark.
read.
format("hudi").
load(basePath).
createOrReplaceTempView("hudi_trips_snapshot")
val commits = spark.sql("""
select distinct(_hoodie_commit_time) as commitTime
from hudi_trips_snapshot
order by commitTime
""").map(k => k.getString(0)).take(50)
// commits.foreach(println)
val beginTime = commits(commits.length - 2) // 我们感兴趣的提交时间
// 增量查询数据
val tripsIncrementalDF = spark.read.format("hudi").
option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
load(basePath)
// 注册到临时表中
tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
// 查询
spark.sql("""
select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts
from hudi_trips_incremental
where fare > 20.0
""").show()
这将使用fare > 20.0的过滤器给出在beginTime提交之后发生的所有更改。查询结果如图12-5所示。
这个特性的独特之处在于,它现在允许在批处理数据上编写流管道。
时间点查询
接下来学习如何查询特定时间的数据。具体的时间可以通过将endTime指向特定的提交时间,将beginTime指向“000”(表示尽可能早的提交时间)来表示,代码如下:
val beginTime = "000" // 表示所有的提交大于这个时间
val endTime = commits(commits.length - 2) // 我们感兴趣的提交时间
// 增量查询数据
val tripsPointInTimeDF = spark.read.format("hudi").
option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
option(END_INSTANTTIME_OPT_KEY, endTime).
load(basePath)
tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time")
spark.sql("""
select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts
from hudi_trips_point_in_time
where fare > 20.0
""").show()
可以看到,查询出来的是追加操作前的数据,如图12-6所示。