表批处理读写
Delta Lake支持Apache Spark DataFrame读写API提供的大部分选项,用于对表执行批量读写。
对于许多Delta Lake操作,可以在创建新的SparkSession时通过设置相应配置来启用与Apache Spark DataSourceV2和Catalog API(自3.0以来)的集成。设置配置的代码如下:
import org.apache.spark.sql.SparkSession
val spark = SparkSession
.builder()
.appName("...")
.master("...")
.config("spark.sql.extensions","io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate()
或者,可以在使用spark-submit提交Spark应用程序时添加配置,或者在启动spark-shell或pyspark时指定它们作为命令行参数,命令如下:
$ spark-submit --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" ...
1.创建表
Delta Lake支持创建两种类型的表:在metastore中定义的表和由路径定义的表。
要使用metastore定义的表,必须在创建新的SparkSession时通过设置配置来启用与Apache Spark DataSourceV2和Catalog API的集成。
要创建Delta表,可以先创建一个表定义并定义模式,或者像数据湖一样,简单地编写一个Spark DataFrame以Delta格式存储。
1) 使用SQL DDL命令
可以使用Apache Spark支持的标准SQL DDL命令(例如CREATE TABLE和REPLACE TABLE)来创建Delta表。代码如下:
CREATE TABLE IF NOT EXISTS default.people10m ( id INT, firstName STRING, middleName STRING, lastName STRING, gender STRING, birthDate TIMESTAMP, ssn STRING, salary INT ) USING DELTA CREATE OR REPLACE TABLE default.people10m ( id INT, firstName STRING, middleName STRING, lastName STRING, gender STRING, birthDate TIMESTAMP, ssn STRING, salary INT ) USING DELTA
SQL还支持在一个路径上创建一个表,而不需要在Hive metastore中创建一个条目。代码如下:
CREATE OR REPLACE TABLE delta.`/delta/people10m` ( id INT, firstName STRING, middleName STRING, lastName STRING, gender STRING, birthDate TIMESTAMP, ssn STRING, salary INT ) USING DELTA
2)使用DataFrameWriter API
如果想创建一个表并同时从Spark DataFrame或Dataset中插入数据,可以使用DataFrameWriter API,代码如下:
// 使用DataFrame的模式在metastore中创建表并写入数据
df.write.format("delta").saveAsTable("default.people10m")
// 使用DataFrame的模式创建带有路径的表,并向其写入数据
df.write.format("delta").mode("overwrite").save("/delta/people10m")
3)使用DeltaTableBuilder API
也可以使用Delta Lake的DeltaTableBuilder API来创建表。与DataFrameWriter API相比,这个API更容易指定额外的信息,比如列注释、表属性和生成的列。代码如下:
// 在metastore中创建表
DeltaTable.createOrReplace(spark)
.tableName("default.people10m")
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.execute()
// 使用路径创建表,并添加属性
DeltaTable.createOrReplace(spark)
.addColumn("id", "INT")
.addColumn("firstName", "STRING")
.addColumn("middleName", "STRING")
.addColumn(
DeltaTable.columnBuilder("lastName")
.dataType("STRING")
.comment("surname")
.build())
.addColumn("lastName", "STRING", comment = "surname")
.addColumn("gender", "STRING")
.addColumn("birthDate", "TIMESTAMP")
.addColumn("ssn", "STRING")
.addColumn("salary", "INT")
.property("description", "table with people data")
.location("/delta/people10m")
.execute()
当创建一个Delta表时,实际是将文件写入一些存储(例如文件系统,云对象存储)。所有文件一起存储在特定结构的目录中组成数据表。因此,当创建一个Delta表时,实际上是在将文件写入某个存储位置。例如,下面的Spark代码片段接受一个现有的Spark DataFrame,并使用Spark DataFrame API以Apache Parquet存储格式将其写入文件夹位置/data中,代码如下:
dataframe.write.format("parquet").save("/data")
只需对上面的代码进行简单修改,就可以创建Delta表,代码如下:
dataframe.write.format("delta").save("/data")
在下面的代码示例中,将首先创建Spark DataFrame数据,然后使用write()方法将表保存到存储中,代码如下:
// 创建数据DataFrame
val data = spark.range(0, 5)
// 将数据DataFrame写入到/delta位置
data.write.format("delta").save("/data_lake/delta_lake/delta")
务必注意,在大多数生产环境中,当处理大量数据时,对数据进行分区非常重要。例如,按日期字段date对数据进行分区,代码如下:
// 将Spark DataFrame写入到Delta表中,按date列分区
data.write.partitionBy("date").format("delta").save("/data_lake/delta_lake/delta")
如果已经有一个现有的Delta表,并且想要向该表追加或覆盖数据,则在语句中包含mode方法。代码如下:
// 向Delta表追加新数据
data.write.format("delta").mode("append").save("/data_lake/delta_lake/delta")
// 覆盖Delta表
data.write.format("delta").mode("overwrite").save("/data_lake/delta_lake/delta")
2.读取表
与写Delta表类似,可以使用DataFrame API从Delta表读取相同的文件。可以通过指定表名或路径来将Delta表加载为DataFrame,代码如下:
spark.table("default.people10m") // metastore中的查询表
spark.read.format("delta").load("/delta/people10m") // 按路径创建的表
import io.delta.implicits._
spark.read.delta("/delta/people10m") // 简洁写法
例如,读取前面写入的Delta Lake表数据,代码如下:
// 从/delta位置读取数据到DataFrame
spark.read.format("delta").load("/data_lake/delta_lake/delta").show()
返回的DataFrame为任何查询自动读取表的最新快照。当查询中有适用的谓词时,Delta Lake会自动使用分区和统计来读取最小数量的数据。 还可以使用SQL读取表,方法是在指定delta之后指定文件位置,代码如下:
%sql SELECT * FROM delta.`/data_lake/delta_lake/delta`
前面是直接从文件系统读取了Delta表。但是怎么读取一个metastore中定义的Delta表呢?为此,需要首先使用saveAsTable()方法或CREATE TABLE语句在metastore中定义表,代码如下:
// 将数据DataFrame写入metastore,定义为myTable
data.write.format("delta").saveAsTable("myTable")
注意,当使用saveAsTable()方法时,将把Delta表文件保存到metastore管理的位置(例如/user/hive/warehouse/myTable)。如果想使用SQL或控制Delta表的位置,那么首先使用save方法在指定位置的地方保存它(例如/delta),然后使用SQL语句创建表。代码如下:
%sql -- 在metastore中创建表 CREATE TABLE myTable ( id INTEGER ) USING DELTA LOCATION "/delta"
如前所述,对大型表进行分区非常重要。例如,按date列分区,代码如下:
%sql CREATE TABLE id ( date DATE id INTEGER ) USING DELTA PARTITION BY date LOCATION "/delta"
注意,LOCATION属性指向构成Delta表的底层文件的。
什么是metastore?
什么是metastore? 通常在Hadoop生态系统中,通过SQL(Spark SQL, Hive, Presto, Impala等)引用数据集,需要在metastore(通常是Hive metastore)中定义表。这个表定义是一个元数据条目,它将向Apache Spark等数据处理框架描述数据位置、存储格式、表模式以及其他属性。
3.写入表
写入表时,可以指定不同的模式。
1)Append模式
要自动向现有的Delta表添加新数据,请使用append模式,代码如下:
df.write.format("delta").mode("append").save("/delta/people10m")
df.write.format("delta").mode("append").saveAsTable("default.people10m")
import io.delta.implicits._
df.write.mode("append").delta("/delta/people10m")
或者使用如下的SQL语句:
INSERT INTO default.people10m SELECT * FROM morePeople
2)Overwrite模式
如果需要自动替换表中的所有数据,使用overwrite模式,代码如下:
df.write.format("delta").mode("overwrite").save("/delta/people10m")
df.write.format("delta").mode("overwrite").saveAsTable("default.people10m")
import io.delta.implicits._
df.write.mode("overwrite").delta("/delta/people10m")
或者使用如下的SQL语句:
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople
使用DataFrame,还可以选择性地只覆盖与任意表达式匹配的数据。此功能在Delta Lake 1.1.0及以上版本中可用。例如,使用df中的数据自动替换目标表中按start_date分区的一月份的事件,代码如下:
df.write
.format("delta")
.mode("overwrite")
.option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
.save("/delta/events")
此示例代码写出df中的数据,验证数据是否与谓词匹配,并执行原子替换。如果想写出不完全匹配谓词的数据,替换目标表中匹配的行,那么可以通过相应的设置来禁用约束检查,代码如下:
spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)
在Delta Lake 1.0.0及以下版本中,replaceWhere选项的意思是只覆盖分区列上匹配谓词的数据。例如,使用df中的数据自动替换目标表中按birthDate分区的一月份的数据,代码如下:
df.write
.format("delta")
.mode("overwrite")
.option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
.save("/delta/people10m")
在Delta Lake 1.1.0及以上版本中,如果想恢复到以前的行为,可以禁用spark.databricks.delta.replaceWhere.dataColumns.enabled 标志,代码如下:
spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)
注意: 与Apache Spark中的文件API不同,Delta Lake会记住并强制表的模式。这意味着在默认情况下,覆盖不会替换现有表的模式。
可以设置用户自定义提交元数据,指定用户定义的字符串作为这些操作提交中的元数据,这可以通过使用DataFrameWriter选项userMetadata或SparkSession配置spark.databricks.delta.commitInfo.userMetadata来实现。如果两者都已指定,则选项option优先。用户定义的元数据在history操作中是可读的。代码如下:
df.write.format("delta")
.mode("overwrite")
.option("userMetadata", "overwritten-for-fixing-incorrect-data")
.save("/delta/people10m")