发布日期:2022-11-13 VIP内容

插入数据与插入覆盖

插入数据

Hudi的hudi-spark模块提供了DataSource API来将一个Spark DataFrame写入(和读取)到一个Hudi表。读写时可以指定多个选项:

  • (1) HoodieWriteConfig:TABLE_NAME,必须项。
  • (2) DataSourceWriteOptions:数据源写入选项。

每次向Hudi数据集写入DataFrame时,必须指定DataSourceWriteOptions。这些选项在写操作之间可能是相同的。

下面生成一些新的行程数据,将它们加载到DataFrame中,然后再将DataFrame写入Hudi表中,代码如下:

// 将数据加载到DataFrame
val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.printSchema()
df.show(5)

// 然后将该DataFrame写入Hudi表中
df.write.format("hudi")
  .options(getQuickstartWriteConfigs)
  .option(PRECOMBINE_FIELD_OPT_KEY, "ts")
  .option(RECORDKEY_FIELD_OPT_KEY, "uuid")
  .option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath")
  .option(TABLE_NAME, tableName)
  .mode(SaveMode.Overwrite)
  .save(basePath)

注意上面指定写入模式为SaveMode.Overwrite,含义是如果表已经存在,则会覆盖并重新创建表。这里提供了一个记录键(模式中的uuid)、分区字段(region/country/city)和组合逻辑(模式中的ts),以确保每个分区中的行程记录是唯一的。这里使用默认的写操作:upsert。如果工作负载没有更新,还可以执行insert或bulk_insert操作,这样会更快。

插入覆盖

生成一些新的行程数据,覆盖输入中出现的所有分区。对于批处理ETL作业,此操作可以比upsert更快,后者是一次性重新计算整个目标分区(而不是增量地更新目标表)。这是因为,能够完全绕过upsert写路径中的索引、预组合和其他重新分区步骤。代码如下:

spark
  .read.format("hudi")
  .load(basePath)
  .select("uuid","partitionpath")
  .sort("partitionpath","uuid")
  .show(100, false)

// 生成要插入的新数据
val inserts = convertToStringList(dataGen.generateInserts(10))

// 将这些数据构造为DataFrame
val df = spark.
    read.json(spark.sparkContext.parallelize(inserts, 2)).
    filter("partitionpath = 'americas/united_states/san_francisco'") // 挑出特定的分区数据

// 将新的数据插入并覆盖表数据
df.write.format("hudi").
  options(getQuickstartWriteConfigs).
  option(OPERATION.key(),"insert_overwrite").
  option(PRECOMBINE_FIELD.key(), "ts").
  option(RECORDKEY_FIELD.key(), "uuid").
  option(PARTITIONPATH_FIELD.key(), "partitionpath").
  option(TBL_NAME.key(), tableName).
  mode(SaveMode.Append).
  save(basePath)

// 与之前的查询相比,现在San Francisco应该有不同的key
spark.
  read.format("hudi").
  load(basePath).
  select("uuid","partitionpath").
  sort("partitionpath","uuid").
  show(100, false)

执行以上代码,输出内容如下图所示。