发布日期:2022-11-13 VIP内容

查询数据

Hudi支持以下查询类型:

(1) 快照查询:查询查看到给定提交或压缩操作时表的最新快照。在对merge on read表进行合并的情况下,它会动态地合并最新文件片的基本文件和增量文件,从而公开近乎实时的数据(几分钟)。对于copy on write表,它提供了对现有parquet表的就地替换,同时提供了upsert/delete和其他写侧功能。

(2) 增量查询:查询只看到从给定的提交/压缩开始写入表的新数据。这有效地提供了变更流,以启用增量数据管道。

(3) 读优化查询:查询可以看到表自提交/压缩操作开始的最新快照。仅在最近的文件片中显示基文件/柱状文件并保证与非hudi柱状表相比具有相同的柱状查询性能。

不同查询类型之间的权衡见表12-5。

权衡 快照 读优化
数据延迟
查询延迟 高 (合并base/columnar文件 + 基于行的delta/log文件) 低(原始的base/columnar文件性能)

将数据文件加载到一个DataFrame中,代码如下:

// 从Hudi表中读取数据到DataFrame中
val basePath = "hdfs://xueai8:8020/hudi/hudi_trips_cow"

val tripsSnapshotDF = spark
  .read
  .format("hudi")
  .load(basePath)

tripsSnapshotDF.printSchema()
tripsSnapshotDF.show(5)

在上面的代码中,load(basePath)使用“/partitionKey=partitionValue”文件夹结构用于Spark自动分区发现。

在下面的查询中提供对摄入数据的快照查询,代码如下:

// 将从Hudi表中查询的DataFrame注册到临时表中
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")

// 执行条件查询与字段投影
spark.sql("select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0").show()

// 查询快照信息
spark.sql("""
    select _hoodie_commit_time, 
           _hoodie_record_key, 
           _hoodie_partition_path, 
           rider, 
           driver, 
           fare 
    from  hudi_trips_snapshot""")
   .show()

自0.9.0以来,Hudi支持时间旅行查询。目前支持三种查询时间格式,代码如下:

// 查询快照信息
spark.read.
  format("hudi").
  option("as.of.instant", "20220317123157975").
  load(basePath).
  show()

spark.read.
  format("hudi").
  option("as.of.instant", "2022-03-17 12:31:57.975").     
  load(basePath).
  show()

// 等价于 "as.of.instant = 2022-03-18 00:00:00"
spark.read.
  format("hudi").
  option("as.of.instant", "2022-03-18").
  load(basePath).
  show()

从0.9.0开始,hudi支持一个内置的hudi FileIndex: HoodieFileIndex来查询hudi表,它支持分区修剪和元表查询。这将有助于提高查询性能。它还支持非全局查询路径,这意味着用户可以通过基路径查询表,而无需在查询路径中指定“*”。默认情况下,该特性已为非全局查询路径启用。对于全局查询路径,hudi使用旧的查询路径。