发布日期:2022-11-13
VIP内容
删除数据
Hudi支持对存储在Hudi表中的数据实现两种类型的删除,允许用户指定不同的记录负载实现:
1) 软删除:保留记录键,并将所有其他字段的值清空。这可以通过确保适当的字段在表模式中为空,并在将这些字段设置为空后简单地upsert表来实现。
2) 硬删除:更强的删除形式是从表中物理地删除记录的任何痕迹。这可以通过3种不同的方式实现。
- (1) 使用数据源,设置OPERATION_OPT_KEY为DELETE_OPERATION_OPT_VAL。这将删除被提交的数据集中的所有记录。
- (2) 使用数据源,设置PAYLOAD_CLASS_OPT_KEY的值为org.apache.hudi.EmptyHoodieRecordPayload。这将删除被提交的数据集中的所有记录。
- (3) 使用数据源或DeltaStreamer,将名为_hoodie_is_deleted的列添加到数据集。对于所有要删除的记录,该列的值必须设置为true,对于任何要被upsert的记录,该列的值要么设置为false,要么设置为null。
对于其中硬删除的第三种方式,假设有原始模式内容如下:
{
"type":"record",
"name":"example_tbl",
"fields":[{
"name": "uuid",
"type": "String"
}, {
"name": "ts",
"type": "string"
}, {
"name": "partitionPath",
"type": "string"
}, {
"name": "rank",
"type": "long"
}
]}
确保添加了_hoodie_is_deleted列,内容如下:
{
"type":"record",
"name":"example_tbl",
"fields":[{
"name": "uuid",
"type": "String"
}, {
"name": "ts",
"type": "string"
}, {
"name": "partitionPath",
"type": "string"
}, {
"name": "rank",
"type": "long"
}, {
"name" : "_hoodie_is_deleted",
"type" : "boolean",
"default" : false
}
]}
然后对于任何想要删除的记录,都可以将_hoodie_is_deleted标记为true,内容如下:
{
"ts": 0.0,
"uuid": "19tdb048-c93e-4532-adf9-f61ce6afe10",
"rank": 1045,
"partitionpath": "americas/brazil/sao_paulo",
"_hoodie_is_deleted" : true
}
要删除传入的HoodieKeys记录,代码如下:
// 获取总记录计数
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
// 获取两条要删除的记录
val ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2)
// val ds = spark.sql("select uuid, partitionPath from hudi_ro_table where rider = 'rider-213'")
// 删除问题数据
val deletes = dataGen.generateDeletes(ds.collectAsList())
val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(OPERATION_OPT_KEY,"delete").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)
// 重新加载表并验证记录已被删除
val roAfterDeleteViewDF = spark.
read.
format("hudi").
load(basePath)
roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")
// fetch应该返回(total - 2)条记录
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()
注意: 删除操作只支持Append模式。