发布日期:2022-11-13 VIP内容

删除数据

Hudi支持对存储在Hudi表中的数据实现两种类型的删除,允许用户指定不同的记录负载实现:

1) 软删除:保留记录键,并将所有其他字段的值清空。这可以通过确保适当的字段在表模式中为空,并在将这些字段设置为空后简单地upsert表来实现。

2) 硬删除:更强的删除形式是从表中物理地删除记录的任何痕迹。这可以通过3种不同的方式实现。

  • (1) 使用数据源,设置OPERATION_OPT_KEY为DELETE_OPERATION_OPT_VAL。这将删除被提交的数据集中的所有记录。
  • (2) 使用数据源,设置PAYLOAD_CLASS_OPT_KEY的值为org.apache.hudi.EmptyHoodieRecordPayload。这将删除被提交的数据集中的所有记录。
  • (3) 使用数据源或DeltaStreamer,将名为_hoodie_is_deleted的列添加到数据集。对于所有要删除的记录,该列的值必须设置为true,对于任何要被upsert的记录,该列的值要么设置为false,要么设置为null。

对于其中硬删除的第三种方式,假设有原始模式内容如下:

{
  "type":"record",
  "name":"example_tbl",
  "fields":[{
     "name": "uuid",
     "type": "String"
  }, {
     "name": "ts",
     "type": "string"
  },  {
     "name": "partitionPath",
     "type": "string"
  }, {
     "name": "rank",
     "type": "long"
  }
]}

确保添加了_hoodie_is_deleted列,内容如下:

{
  "type":"record",
  "name":"example_tbl",
  "fields":[{
     "name": "uuid",
     "type": "String"
  }, {
     "name": "ts",
     "type": "string"
  },  {
     "name": "partitionPath",
     "type": "string"
  }, {
     "name": "rank",
     "type": "long"
  }, {
    "name" : "_hoodie_is_deleted",
    "type" : "boolean",
    "default" : false
  }
]}

然后对于任何想要删除的记录,都可以将_hoodie_is_deleted标记为true,内容如下:

{
"ts": 0.0, 
"uuid": "19tdb048-c93e-4532-adf9-f61ce6afe10", 
"rank": 1045, 
"partitionpath": "americas/brazil/sao_paulo", 
"_hoodie_is_deleted" : true
}

要删除传入的HoodieKeys记录,代码如下:

// 获取总记录计数
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()

// 获取两条要删除的记录
val ds = spark.sql("select uuid, partitionpath from hudi_trips_snapshot").limit(2)
// val ds = spark.sql("select uuid, partitionPath from hudi_ro_table where rider = 'rider-213'")

// 删除问题数据
val deletes = dataGen.generateDeletes(ds.collectAsList())
val df = spark.read.json(spark.sparkContext.parallelize(deletes, 2))

df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(OPERATION_OPT_KEY,"delete").
  option(PRECOMBINE_FIELD_OPT_KEY, "ts").
  option(RECORDKEY_FIELD_OPT_KEY, "uuid").
  option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
  option(TABLE_NAME, tableName).
  mode(Append).
  save(basePath)

// 重新加载表并验证记录已被删除
val roAfterDeleteViewDF = spark.
  read.
  format("hudi").
  load(basePath)

roAfterDeleteViewDF.registerTempTable("hudi_trips_snapshot")

// fetch应该返回(total - 2)条记录
spark.sql("select uuid, partitionpath from hudi_trips_snapshot").count()

注意: 删除操作只支持Append模式。