发布日期:2022-11-13
VIP内容
查询数据
Hudi支持以下查询类型:
(1) 快照查询:查询查看到给定提交或压缩操作时表的最新快照。在对merge on read表进行合并的情况下,它会动态地合并最新文件片的基本文件和增量文件,从而公开近乎实时的数据(几分钟)。对于copy on write表,它提供了对现有parquet表的就地替换,同时提供了upsert/delete和其他写侧功能。
(2) 增量查询:查询只看到从给定的提交/压缩开始写入表的新数据。这有效地提供了变更流,以启用增量数据管道。
(3) 读优化查询:查询可以看到表自提交/压缩操作开始的最新快照。仅在最近的文件片中显示基文件/柱状文件并保证与非hudi柱状表相比具有相同的柱状查询性能。
不同查询类型之间的权衡见表12-5。
| 权衡 | 快照 | 读优化 |
|---|---|---|
| 数据延迟 | 低 | 高 |
| 查询延迟 | 高 (合并base/columnar文件 + 基于行的delta/log文件) | 低(原始的base/columnar文件性能) |
将数据文件加载到一个DataFrame中,代码如下:
// 从Hudi表中读取数据到DataFrame中
val basePath = "hdfs://xueai8:8020/hudi/hudi_trips_cow"
val tripsSnapshotDF = spark
.read
.format("hudi")
.load(basePath)
tripsSnapshotDF.printSchema()
tripsSnapshotDF.show(5)
在上面的代码中,load(basePath)使用“/partitionKey=partitionValue”文件夹结构用于Spark自动分区发现。
在下面的查询中提供对摄入数据的快照查询,代码如下:
// 将从Hudi表中查询的DataFrame注册到临时表中
tripsSnapshotDF.createOrReplaceTempView("hudi_trips_snapshot")
// 执行条件查询与字段投影
spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show()
// 查询快照信息
spark.sql("""
select _hoodie_commit_time,
_hoodie_record_key,
_hoodie_partition_path,
rider,
driver,
fare
from hudi_trips_snapshot""")
.show()
自0.9.0以来,Hudi支持时间旅行查询。目前支持三种查询时间格式,代码如下:
// 查询快照信息
spark.read.
format("hudi").
option("as.of.instant", "20220317123157975").
load(basePath).
show()
spark.read.
format("hudi").
option("as.of.instant", "2022-03-17 12:31:57.975").
load(basePath).
show()
// 等价于 "as.of.instant = 2022-03-18 00:00:00"
spark.read.
format("hudi").
option("as.of.instant", "2022-03-18").
load(basePath).
show()
从0.9.0开始,hudi支持一个内置的hudi FileIndex: HoodieFileIndex来查询hudi表,它支持分区修剪和元表查询。这将有助于提高查询性能。它还支持非全局查询路径,这意味着用户可以通过基路径查询表,而无需在查询路径中指定“*”。默认情况下,该特性已为非全局查询路径启用。对于全局查询路径,hudi使用旧的查询路径。