发布日期:2022-11-13 VIP内容

增量查询与时间点查询

增量查询

Hudi还提供了获取自给定提交时间戳以来更改的记录流的功能。这可以通过使用Hudi的增量查询来实现,并提供一个开始时间,更改需要从该时间流开始。如果想要在提交之后进行的所有更改(这是常见的情况),则不需要指定endTime。代码如下:

// spark-shell
// 重新加载数据
spark.
  read.
  format("hudi").
  load(basePath).
  createOrReplaceTempView("hudi_trips_snapshot")

val commits = spark.sql("""
    select distinct(_hoodie_commit_time) as commitTime 
    from  hudi_trips_snapshot 
    order by commitTime
""").map(k => k.getString(0)).take(50)

// commits.foreach(println)

val beginTime = commits(commits.length - 2) 	// 我们感兴趣的提交时间

// 增量查询数据
val tripsIncrementalDF = spark.read.format("hudi").
  option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
  option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
  load(basePath)

// 注册到临时表中
tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")

// 查询
spark.sql("""
    select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts 
    from hudi_trips_incremental 
    where fare > 20.0
""").show()

这将使用fare > 20.0的过滤器给出在beginTime提交之后发生的所有更改。查询结果如图12-5所示。

这个特性的独特之处在于,它现在允许在批处理数据上编写流管道。

时间点查询

接下来学习如何查询特定时间的数据。具体的时间可以通过将endTime指向特定的提交时间,将beginTime指向“000”(表示尽可能早的提交时间)来表示,代码如下:

val beginTime = "000" 			// 表示所有的提交大于这个时间
val endTime = commits(commits.length - 2) 	// 我们感兴趣的提交时间

// 增量查询数据
val tripsPointInTimeDF = spark.read.format("hudi").
  option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
  option(BEGIN_INSTANTTIME_OPT_KEY, beginTime).
  option(END_INSTANTTIME_OPT_KEY, endTime).
  load(basePath)
tripsPointInTimeDF.createOrReplaceTempView("hudi_trips_point_in_time")

spark.sql("""
    select `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts 
    from hudi_trips_point_in_time 
    where fare > 20.0
""").show()

可以看到,查询出来的是追加操作前的数据,如图12-6所示。