发布日期:2022-10-16 VIP内容

表批处理读写

Delta Lake支持Apache Spark DataFrame读写API提供的大部分选项,用于对表执行批量读写。

对于许多Delta Lake操作,可以在创建新的SparkSession时通过设置相应配置来启用与Apache Spark DataSourceV2和Catalog API(自3.0以来)的集成。设置配置的代码如下:

import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("...")
  .master("...")
  .config("spark.sql.extensions","io.delta.sql.DeltaSparkSessionExtension")
  .config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog")
  .getOrCreate()

或者,可以在使用spark-submit提交Spark应用程序时添加配置,或者在启动spark-shell或pyspark时指定它们作为命令行参数,命令如下:

$ spark-submit 
--conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" 
--conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"  
...

1.创建表

Delta Lake支持创建两种类型的表:在metastore中定义的表和由路径定义的表。

要使用metastore定义的表,必须在创建新的SparkSession时通过设置配置来启用与Apache Spark DataSourceV2和Catalog API的集成。

要创建Delta表,可以先创建一个表定义并定义模式,或者像数据湖一样,简单地编写一个Spark DataFrame以Delta格式存储。

1) 使用SQL DDL命令

可以使用Apache Spark支持的标准SQL DDL命令(例如CREATE TABLE和REPLACE TABLE)来创建Delta表。代码如下:

CREATE TABLE IF NOT EXISTS default.people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
) USING DELTA

CREATE OR REPLACE TABLE default.people10m (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
) USING DELTA

SQL还支持在一个路径上创建一个表,而不需要在Hive metastore中创建一个条目。代码如下:

CREATE OR REPLACE TABLE delta.`/delta/people10m` (
  id INT,
  firstName STRING,
  middleName STRING,
  lastName STRING,
  gender STRING,
  birthDate TIMESTAMP,
  ssn STRING,
  salary INT
) USING DELTA

2)使用DataFrameWriter API

如果想创建一个表并同时从Spark DataFrame或Dataset中插入数据,可以使用DataFrameWriter API,代码如下:

// 使用DataFrame的模式在metastore中创建表并写入数据
df.write.format("delta").saveAsTable("default.people10m")

// 使用DataFrame的模式创建带有路径的表,并向其写入数据
df.write.format("delta").mode("overwrite").save("/delta/people10m")

3)使用DeltaTableBuilder API

也可以使用Delta Lake的DeltaTableBuilder API来创建表。与DataFrameWriter API相比,这个API更容易指定额外的信息,比如列注释、表属性和生成的列。代码如下:

// 在metastore中创建表
DeltaTable.createOrReplace(spark)
  .tableName("default.people10m")
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .execute()

// 使用路径创建表,并添加属性
DeltaTable.createOrReplace(spark)
  .addColumn("id", "INT")
  .addColumn("firstName", "STRING")
  .addColumn("middleName", "STRING")
  .addColumn(
    DeltaTable.columnBuilder("lastName")
      .dataType("STRING")
      .comment("surname")
      .build())
  .addColumn("lastName", "STRING", comment = "surname")
  .addColumn("gender", "STRING")
  .addColumn("birthDate", "TIMESTAMP")
  .addColumn("ssn", "STRING")
  .addColumn("salary", "INT")
  .property("description", "table with people data")
  .location("/delta/people10m")
  .execute()

当创建一个Delta表时,实际是将文件写入一些存储(例如文件系统,云对象存储)。所有文件一起存储在特定结构的目录中组成数据表。因此,当创建一个Delta表时,实际上是在将文件写入某个存储位置。例如,下面的Spark代码片段接受一个现有的Spark DataFrame,并使用Spark DataFrame API以Apache Parquet存储格式将其写入文件夹位置/data中,代码如下:

dataframe.write.format("parquet").save("/data")

只需对上面的代码进行简单修改,就可以创建Delta表,代码如下:

dataframe.write.format("delta").save("/data")

在下面的代码示例中,将首先创建Spark DataFrame数据,然后使用write()方法将表保存到存储中,代码如下:

// 创建数据DataFrame
val data = spark.range(0, 5)

// 将数据DataFrame写入到/delta位置
data.write.format("delta").save("/data_lake/delta_lake/delta")

务必注意,在大多数生产环境中,当处理大量数据时,对数据进行分区非常重要。例如,按日期字段date对数据进行分区,代码如下:

// 将Spark DataFrame写入到Delta表中,按date列分区
data.write.partitionBy("date").format("delta").save("/data_lake/delta_lake/delta")

如果已经有一个现有的Delta表,并且想要向该表追加或覆盖数据,则在语句中包含mode方法。代码如下:

// 向Delta表追加新数据
data.write.format("delta").mode("append").save("/data_lake/delta_lake/delta")

// 覆盖Delta表
data.write.format("delta").mode("overwrite").save("/data_lake/delta_lake/delta")

2.读取表

与写Delta表类似,可以使用DataFrame API从Delta表读取相同的文件。可以通过指定表名或路径来将Delta表加载为DataFrame,代码如下:

spark.table("default.people10m")      		// metastore中的查询表

spark.read.format("delta").load("/delta/people10m")  	// 按路径创建的表

import io.delta.implicits._
spark.read.delta("/delta/people10m")		// 简洁写法

例如,读取前面写入的Delta Lake表数据,代码如下:

// 从/delta位置读取数据到DataFrame
spark.read.format("delta").load("/data_lake/delta_lake/delta").show()

返回的DataFrame为任何查询自动读取表的最新快照。当查询中有适用的谓词时,Delta Lake会自动使用分区和统计来读取最小数量的数据。 还可以使用SQL读取表,方法是在指定delta之后指定文件位置,代码如下:

%sql
SELECT * FROM delta.`/data_lake/delta_lake/delta`

前面是直接从文件系统读取了Delta表。但是怎么读取一个metastore中定义的Delta表呢?为此,需要首先使用saveAsTable()方法或CREATE TABLE语句在metastore中定义表,代码如下:

// 将数据DataFrame写入metastore,定义为myTable
data.write.format("delta").saveAsTable("myTable")

注意,当使用saveAsTable()方法时,将把Delta表文件保存到metastore管理的位置(例如/user/hive/warehouse/myTable)。如果想使用SQL或控制Delta表的位置,那么首先使用save方法在指定位置的地方保存它(例如/delta),然后使用SQL语句创建表。代码如下:

%sql
-- 在metastore中创建表
CREATE TABLE myTable (
  id INTEGER
)
USING DELTA
LOCATION "/delta"

如前所述,对大型表进行分区非常重要。例如,按date列分区,代码如下:

%sql
CREATE TABLE id (
  date DATE
  id INTEGER
)
USING DELTA
PARTITION BY date
LOCATION "/delta"

注意,LOCATION属性指向构成Delta表的底层文件的。

什么是metastore?

什么是metastore? 通常在Hadoop生态系统中,通过SQL(Spark SQL, Hive, Presto, Impala等)引用数据集,需要在metastore(通常是Hive metastore)中定义表。这个表定义是一个元数据条目,它将向Apache Spark等数据处理框架描述数据位置、存储格式、表模式以及其他属性。

3.写入表

写入表时,可以指定不同的模式。

1)Append模式

要自动向现有的Delta表添加新数据,请使用append模式,代码如下:

df.write.format("delta").mode("append").save("/delta/people10m")
df.write.format("delta").mode("append").saveAsTable("default.people10m")

import io.delta.implicits._
df.write.mode("append").delta("/delta/people10m")

或者使用如下的SQL语句:

INSERT INTO default.people10m SELECT * FROM morePeople

2)Overwrite模式

如果需要自动替换表中的所有数据,使用overwrite模式,代码如下:

df.write.format("delta").mode("overwrite").save("/delta/people10m")
df.write.format("delta").mode("overwrite").saveAsTable("default.people10m")

import io.delta.implicits._
df.write.mode("overwrite").delta("/delta/people10m")

或者使用如下的SQL语句:

INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople

使用DataFrame,还可以选择性地只覆盖与任意表达式匹配的数据。此功能在Delta Lake 1.1.0及以上版本中可用。例如,使用df中的数据自动替换目标表中按start_date分区的一月份的事件,代码如下:

df.write
  .format("delta")
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .save("/delta/events")

此示例代码写出df中的数据,验证数据是否与谓词匹配,并执行原子替换。如果想写出不完全匹配谓词的数据,替换目标表中匹配的行,那么可以通过相应的设置来禁用约束检查,代码如下:

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)

在Delta Lake 1.0.0及以下版本中,replaceWhere选项的意思是只覆盖分区列上匹配谓词的数据。例如,使用df中的数据自动替换目标表中按birthDate分区的一月份的数据,代码如下:

df.write
  .format("delta")
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .save("/delta/people10m")

在Delta Lake 1.1.0及以上版本中,如果想恢复到以前的行为,可以禁用spark.databricks.delta.replaceWhere.dataColumns.enabled 标志,代码如下:

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)

注意: 与Apache Spark中的文件API不同,Delta Lake会记住并强制表的模式。这意味着在默认情况下,覆盖不会替换现有表的模式。

可以设置用户自定义提交元数据,指定用户定义的字符串作为这些操作提交中的元数据,这可以通过使用DataFrameWriter选项userMetadata或SparkSession配置spark.databricks.delta.commitInfo.userMetadata来实现。如果两者都已指定,则选项option优先。用户定义的元数据在history操作中是可读的。代码如下:

df.write.format("delta")
  .mode("overwrite")
  .option("userMetadata", "overwritten-for-fixing-incorrect-data")
  .save("/delta/people10m")