更新数据

这类似于插入新数据。使用数据生成器生成对现有行程的更新,加载到DataFrame中,并将DataFrame写入hudi表。代码如下:

// 生成对现有行程进行更新的数据
val updates = convertToStringList(dataGen.generateUpdates(10))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))

// 以Append模式将新的数据写入到Hudi表中
df.write.format("hudi")
  .options(getQuickstartWriteConfigs)
  .option(PRECOMBINE_FIELD_OPT_KEY, "ts")
  .option(RECORDKEY_FIELD_OPT_KEY, "uuid")
  .option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath")
  .option(TABLE_NAME, tableName)
  .mode(SaveMode.Append)	// 注意这里,Append
  .save(basePath)

注意,保存模式现在是SaveMode.Append。一般来说,除非第一次创建表,否则总是使用append模式。再次查询数据将显示更新的行程。每个写操作都会生成一个由时间戳表示的新提交。代码如下:

// 再次从Hudi表中读取数据到DataFrame中
val tripsSnapshotDF2 = spark
  .read
  .format("hudi")
  .load(basePath)

tripsSnapshotDF2.show(5)

println(tripsSnapshotDF.count)  ......
          

......

抱歉,只有登录会员才可浏览!会员登录


《PySpark原理深入与编程实战》