SQL DDL - 创建和修改表
我们可以跨各种引擎使用SQL来创建和修改Hudi表。
使用SQL的一些限制
Hudi支持使用Spark SQL操作Hudi的DDL/DML的语法,使得所有用户(非工程师、分析师等)更容易访问和操作Hudi。
但是,在使用Spark SQL操作Hudi时,需要遵守一定的约束。
(1)支持在Hudi客户端执行Spark SQL操作Hudi。
(2)支持在Spark的JDBCServer中执行Spark SQL操作Hudi。
(3)不支持在Spark的客户端执行Spark SQL操作Hudi。
(4)不支持在Hive、Hetu引擎中写hudi表,以及修改hudi表结构,仅支持读。
(5)由于SQL的KeyGenerator默认是org.apache.hudi.keygen.ComplexKeyGenerator,要求DataSource方式写入时KeyGenerator与SQL设置的一致。
(6)对于MOR表的修改操作只允许在主表操作,ro和rt后缀的表仅供查询使用。
创建Hudi表-语法
可以使用标准的CREATE TABLE语法创建表,该语法支持分区和传递表属性。
CREATE TABLE [IF NOT EXISTS] [db_name.]table_name [(col_name data_type [COMMENT col_comment], ...)] [COMMENT table_comment] [PARTITIONED BY (col_name, ...)] [ROW FORMAT row_format] [STORED AS file_format] [LOCATION path] [TBLPROPERTIES (property_name=property_value, ...)] [AS select_statement];
CREATE TABLE命令通过指定带有表属性的字段列表来创建Hudi Table。
其中表属性(TBLPROPERTIES)中包含的参数和含义如下:
| 参数 | 描述 |
|---|---|
| type | 表类型。'cow' 表示 COPY-ON-WRITE 表,'mor' 表示 MERGE-ON-READ 表。未指定type的话,默认值为 'cow'。 |
| primaryKey | 主键名,多个字段用逗号分隔,该字段为必填字段。 |
| preCombineField | 表的Pre-Combine字段,该字段为必填字段。 |
创建Hudi表-示例
创建分区表
创建非分区表与创建普通表一样简单,示例如下:
-- create a Hudi table CREATE TABLE IF NOT EXISTS hudi_table ( id INT, name STRING, price DOUBLE ) USING hudi
创建非分区表
可以通过添加partitioned by子句来创建分区表。分区有助于根据分区列将数据组织到多个文件夹中。它还可以通过限制元数据、索引和扫描数据的数量来帮助加快查询和索引查找。示例如下:
CREATE TABLE IF NOT EXISTS hudi_table_partitioned ( id BIGINT, name STRING, dt STRING, hh STRING ) USING hudi TBLPROPERTIES ( type = 'cow' ) PARTITIONED BY (dt);
注意:还可以通过提供逗号分隔的字段名来创建由多个字段划分的表。例如,"partitioned by dt, hh"。
创建带有主键和排序字段的表
Hudi表使用记录键(record key)跟踪表中的每条记录。在目前的示例中,Hudi为每条新记录自动生成一个高度压缩的键。如果希望使用现有字段作为键,可以设置primaryKey选项。通常,这还伴随着配置preCombineField选项来处理无序数据和传入写操作中具有相同键的潜在重复记录。
注意,可以根据需要选择多个字段作为给定表的主键。例如,"primaryKey = 'id, name'",这将实现两个字段的复合键,这对于探索表非常有用。
下面是一个使用这两个选项创建表的示例。通常,表示事件或事实的时间的字段(例如,订单创建时间、事件生成时间等)用作preCombineField。当在表上运行查询时,Hudi通过基于此字段排序来解析同一记录的多个版本。
CREATE TABLE IF NOT EXISTS hudi_table_keyed ( id INT, name STRING, price DOUBLE, ts BIGINT ) USING hudi TBLPROPERTIES ( type = 'cow', primaryKey = 'id', preCombineField = 'ts' );
从外部位置创建表
通常,Hudi表是由流编写器(如streamer工具)创建的,稍后可能需要在这些表上运行一些SQL语句。可以使用location语句创建External表。
提示:不需要指定模式和除分区列(如果存在)之外的任何属性。Hudi可以自动识别模式和配置。
CREATE TABLE hudi_table_external USING hudi LOCATION 'file:///tmp/hudi_table/';
使用CTAS创建表
Hudi支持CTAS(Create table as select)来支持初始加载到Hudi表中。为了确保高效地完成此操作,即使对于大负载,CTAS也使用批量插入(bulk insert)作为写操作。例如,下面的示例中创建一个受托管的parquet表:
CREATE TABLE parquet_table USING parquet LOCATION 'file:///tmp/parquet_dataset/'; # CTAS - 加载数据表Hudi表中 CREATE TABLE hudi_table_ctas USING hudi TBLPROPERTIES ( type = 'cow', preCombineField = 'ts' ) PARTITIONED BY (dt) AS SELECT * FROM parquet_table;
也可以创建一个非分区表,代码如下:
# create managed parquet table CREATE TABLE parquet_table USING parquet LOCATION 'file:///tmp/parquet_dataset/'; # CTAS by loading data into Hudi table CREATE TABLE hudi_table_ctas USING hudi TBLPROPERTIES ( type = 'cow', preCombineField = 'ts' ) AS SELECT * FROM parquet_table;
如果希望显式设置记录键,可以通过在表属性中设置primaryKey配置项来实现。
CREATE TABLE hudi_table_ctas USING hudi TBLPROPERTIES ( type = 'cow', primaryKey = 'id' ) PARTITIONED BY (dt) AS SELECT 1 AS id, 'a1' AS name, 10 AS price, 1000 AS dt;
还可以使用CTAS跨外部位置复制数据,例如:
# 创建托管的parquet表 CREATE TABLE parquet_table USING parquet LOCATION 'file:///tmp/parquet_dataset/*.parquet'; # CTAS加载数据到hudi表中 CREATE TABLE hudi_table_ctas USING hudi LOCATION 'file:///tmp/hudi/hudi_tbl/' TBLPROPERTIES ( type = 'cow' ) AS SELECT * FROM parquet_table;
创建索引(实验性质)
Hudi支持在表上创建和删除索引,包括函数索引。
注意,通过SQL创建索引仅在1.0.0-beta版(即测试版)中处于预览状态。它将在1.0.0版本中普遍可用。其语法如下所示:
-- 创建索引 CREATE INDEX [IF NOT EXISTS] index_name ON [TABLE] table_name [USING index_type] (column_name1 [OPTIONS(key1=value1, key2=value2, ...)], column_name2 [OPTIONS(key1=value1, key2=value2, ...)], ...) [OPTIONS (key1=value1, key2=value2, ...)] -- 删除索引 DROP INDEX [IF EXISTS] index_name ON [TABLE] table_name
其中:
- index_name是要创建或删除的索引的名称。
- table_name是在其上创建或删除索引的表名。
- index_type是要创建的索引的类型。目前,只支持files、column_stats和bloom_filters。
- column_name是创建索引的列名。
- 创建索引的索引和列都可以使用键值对形式的一些选项进行限定。我们将在下面的函数索引示例中看到这一点。
创建函数索引
函数索引是对列的函数的索引。它是Hudi的多模态索引子系统的新添加,它提供了更快的访问方法,并且还吸收了分区作为索引系统的一部分。让我们看一些创建函数索引的例子。
-- 使用格式为'yyyy-MM-dd'的from_unixtime函数在表hudi_table的列`ts`(unix epoch)上创建函数索引, CREATE INDEX IF NOT EXISTS ts_datestr ON hudi_table USING column_stats(ts) OPTIONS(func='from_unixtime', format='yyyy-MM-dd'); -- 使用函数'hour'在表'hudi_table'的列'ts'(timestamp时间戳类型,格式为yyyy-MM-dd HH:mm:ss)上创建函数索引 CREATE INDEX ts_hour ON hudi_table USING column_stats(ts) options(func='hour');
请注意:
(1)func选项是创建函数索引所必需的,它应该是一个有效的Spark SQL函数。目前,只支持以单列作为输入的函数。下面列出了一些支持的有用函数。
- identity
- from_unixtime
- date_format
- to_date
- to_timestamp
- year
- month
- day
- hour
- lower
- upper
- substring
- regexp_extract
- regexp_replace
- concat
- length
(2)请在Spark SQL文档中查看上述函数的语法,并提供相应的选项。例如,from_unixtime函数需要format选项。
(3)不支持UDF。
下面是创建和使用函数索引的例子:
-- 创建一个hudi表
CREATE TABLE hudi_table_func_index (
ts STRING,
uuid STRING,
rider STRING,
driver STRING,
fare DOUBLE,
city STRING
) USING HUDI
tblproperties (primaryKey = 'uuid')
PARTITIONED BY (city)
location 'file:///tmp/hudi_table_func_index';
-- 禁用小文件处理,以便每次插入都创建新文件 --
set hoodie.parquet.small.file.limit=0;
INSERT INTO hudi_table_func_index VALUES ('2023-09-20 03:58:59','334e26e9-8355-45cc-97c6-c31daf0df330','rider-A','driver-K',19.10,'san_francisco');
INSERT INTO hudi_table_func_index VALUES ('2023-09-19 08:46:34','e96c4396-3fad-413a-a942-4cb36106d721','rider-C','driver-M',27.70 ,'san_francisco');
INSERT INTO hudi_table_func_index VALUES ('2023-09-18 17:45:31','9909a8b1-2d15-4d3d-8ec9-efc48c536a00','rider-D','driver-L',33.90 ,'san_francisco');
INSERT INTO hudi_table_func_index VALUES ('2023-09-22 13:12:56','1dced545-862b-4ceb-8b43-d2a568f6616b','rider-E','driver-O',93.50,'san_francisco');
INSERT INTO hudi_table_func_index VALUES ('2023-09-24 06:15:45','e3cf430c-889d-4015-bc98-59bdce1e530c','rider-F','driver-P',34.15,'sao_paulo');
INSERT INTO hudi_table_func_index VALUES ('2023-09-22 15:21:36','7a84095f-737f-40bc-b62f-6b69664712d2','rider-G','driver-Q',43.40 ,'sao_paulo');
INSERT INTO hudi_table_func_index VALUES ('2023-09-20 12:35:45','3eeb61f7-c2b0-4636-99bd-5d7a5a1d2c04','rider-I','driver-S',41.06 ,'chennai');
INSERT INTO hudi_table_func_index VALUES ('2023-09-19 05:34:56','c8abbe79-8d89-47ea-b4ce-4d224bae5bfa','rider-J','driver-T',17.85,'chennai');
-- 当还没有索引时,使用hour函数过滤器进行查询 --
spark-sql> SELECT city, fare, rider, driver FROM hudi_table_func_index WHERE city NOT IN ('chennai') AND hour(ts) > 12;
san_francisco 93.5 rider-E driver-O
san_francisco 33.9 rider-D driver-L
sao_paulo 43.4 rider-G driver-Q
Time taken: 0.208 seconds, Fetched 3 row(s)
-- 查看执行计划 --
spark-sql> EXPLAIN COST SELECT city, fare, rider, driver FROM hudi_table_func_index WHERE city NOT IN ('chennai') AND hour(ts) > 12;
== Optimized Logical Plan ==
Project [city#3465, fare#3464, rider#3462, driver#3463], Statistics(sizeInBytes=899.5 KiB)
+- Filter ((isnotnull(city#3465) AND isnotnull(ts#3460)) AND (NOT (city#3465 = chennai) AND (hour(cast(ts#3460 as timestamp), Some(Asia/Kolkata)) > 12))), Statistics(sizeInBytes=2.5 MiB)
+- Relation default.hudi_table_func_index[_hoodie_commit_time#3455,_hoodie_commit_seqno#3456,_hoodie_record_key#3457,_hoodie_partition_path#3458,_hoodie_file_name#3459,ts#3460,uuid#3461,rider#3462,driver#3463,fare#3464,city#3465] parquet, Statistics(sizeInBytes=2.5 MiB)
== Physical Plan ==
*(1) Project [city#3465, fare#3464, rider#3462, driver#3463]
+- *(1) Filter (isnotnull(ts#3460) AND (hour(cast(ts#3460 as timestamp), Some(Asia/Kolkata)) > 12))
+- *(1) ColumnarToRow
+- FileScan parquet default.hudi_table_func_index[ts#3460,rider#3462,driver#3463,fare#3464,city#3465] Batched: true, DataFilters: [isnotnull(ts#3460), (hour(cast(ts#3460 as timestamp), Some(Asia/Kolkata)) > 12)], Format: Parquet, Location: HoodieFileIndex(1 paths)[file:/tmp/hudi_table_func_index], PartitionFilters: [isnotnull(city#3465), NOT (city#3465 = chennai)], PushedFilters: [IsNotNull(ts)], ReadSchema: struct
-- 创建函数索引 --
CREATE INDEX ts_hour ON hudi_table_func_index USING column_stats(ts) options(func='hour');
-- 在创建索引之后进行查询 --
spark-sql> SELECT city, fare, rider, driver FROM hudi_table_func_index WHERE city NOT IN ('chennai') AND hour(ts) > 12;
san_francisco 93.5 rider-E driver-O
san_francisco 33.9 rider-D driver-L
sao_paulo 43.4 rider-G driver-Q
Time taken: 0.202 seconds, Fetched 3 row(s)
spark-sql> EXPLAIN COST SELECT city, fare, rider, driver FROM hudi_table_func_index WHERE city NOT IN ('chennai') AND hour(ts) > 12;
== Optimized Logical Plan ==
Project [city#2970, fare#2969, rider#2967, driver#2968], Statistics(sizeInBytes=449.8 KiB)
+- Filter ((isnotnull(city#2970) AND isnotnull(ts#2965)) AND (NOT (city#2970 = chennai) AND (hour(cast(ts#2965 as timestamp), Some(Asia/Kolkata)) > 12))), Statistics(sizeInBytes=1278.3 KiB)
+- Relation default.hudi_table_func_index[_hoodie_commit_time#2960,_hoodie_commit_seqno#2961,_hoodie_record_key#2962,_hoodie_partition_path#2963,_hoodie_file_name#2964,ts#2965,uuid#2966,rider#2967,driver#2968,fare#2969,city#2970] parquet, Statistics(sizeInBytes=1278.3 KiB)
== Physical Plan ==
*(1) Project [city#2970, fare#2969, rider#2967, driver#2968]
+- *(1) Filter (isnotnull(ts#2965) AND (hour(cast(ts#2965 as timestamp), Some(Asia/Kolkata)) > 12))
+- *(1) ColumnarToRow
+- FileScan parquet default.hudi_table_func_index[ts#2965,rider#2967,driver#2968,fare#2969,city#2970] Batched: true, DataFilters: [isnotnull(ts#2965), (hour(cast(ts#2965 as timestamp), Some(Asia/Kolkata)) > 12)], Format: Parquet, Location: HoodieFileIndex(1 paths)[file:/tmp/hudi_table_func_index], PartitionFilters: [isnotnull(city#2970), NOT (city#2970 = chennai)], PushedFilters: [IsNotNull(ts)], ReadSchema: struct
设置Hudi配置
对于给定的hudi表,有几种不同的方法可以传递配置。
使用set 命令
可以使用set命令来设置Hudi的任何写配置。这将适用于整个spark session的操作。
例如:
set hoodie.insert.shuffle.parallelism = 100; set hoodie.upsert.shuffle.parallelism = 100; set hoodie.delete.shuffle.parallelism = 100;
使用表属性
还可以在创建表时配置表选项。这将只应用于表,并覆盖任何SET命令值。
-- 语法
CREATE TABLE IF NOT EXISTS tableName (
colName1 colType1,
colName2 colType2,
...
) USING hudi
TBLPROPERTIES (
primaryKey = '${colName1}',
type = 'cow',
${hoodie.config.key1} = '${hoodie.config.value1}',
${hoodie.config.key2} = '${hoodie.config.value2}',
....
);
-- 示例
CREATE TABLE IF NOT EXISTS hudi_table (
id BIGINT,
name STRING,
price DOUBLE
) USING hudi
TBLPROPERTIES (
primaryKey = 'id',
type = 'cow',
hoodie.cleaner.fileversions.retained = '20',
hoodie.keep.max.commits = '20'
);
表属性
用户可以在创建表时设置表属性。下面将讨论重要的表属性。
| 参数名称 | 默认值 | 参数描述 |
|---|---|---|
| type | cow | 要创建的表类型。type = 'cow'创建一个COPY-ON-WRITE表,而type = 'more'创建一个MERGE-ON-READ表。与hoodie.datasource.write.table.type相同。 |
| primaryKey | uuid | 用逗号分隔的表的主键字段名。与hoodie.datasource.write.recordkey.field相同。如果忽略此配置,hudi将自动生成主键。如果显式设置,主键生成将遵循用户配置。 |
| preCombineField | 表的预合并字段。它用于在多个版本中解析记录的最终版本。通常,事件时间或其他类似的列将用于排序目的。Hudi将能够使用预合并字段值处理无序数据。 |
注意:primaryKey、preCombineField、type以及其他属性是区分大小写的。
为并发写入器传递锁提供程序
当使用OCC和NBCC(非阻塞并发控制)并发模式时,Hudi要求锁提供程序支持并发写或异步表服务。对于NBCC模式,锁只用于写时间轴上的提交元数据文件。写操作按完成时间序列化。用户也可以将这些表属性传递给TBLPROPERTIES。下面是一个基于Zookeeper的配置示例。
-- 使用锁配置来支持多写入器的属性 TBLPROPERTIES( hoodie.write.lock.zookeeper.url = "zookeeper", hoodie.write.lock.zookeeper.port = "2181", hoodie.write.lock.zookeeper.lock_key = "tableName", hoodie.write.lock.provider = "org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider", hoodie.write.concurrency.mode = "optimistic_concurrency_control", hoodie.write.lock.zookeeper.base_path = "/tableName" )
启用表的列统计/记录级别索引
Hudi提供了利用关于表的丰富元数据和索引的能力,加速了DML和查询。例如,可以启用列统计信息的集合来执行快速数据跳转,或者使用记录级索引来执行快速更新或使用以下表属性进行点查找。
TBLPROPERTIES( 'hoodie.metadata.index.column.stats.enable' = 'true' 'hoodie.metadata.record.index.enable' = 'true' )
修改表
语法:
-- Alter table name ALTER TABLE oldTableName RENAME TO newTableName; -- Alter table add columns ALTER TABLE tableIdentifier ADD COLUMNS(colAndType [, colAndType]);
示例:
-- 重命名表名: ALTER TABLE hudi_table RENAME TO hudi_table_renamed; -- 增加列: ALTER TABLE hudi_table ADD COLUMNS(remark STRING);
修改表属性
语法:
-- alter table ... set|unset ALTER TABLE tableIdentifier SET|UNSET TBLPROPERTIES (table_property = 'property_value');
示例:
ALTER TABLE hudi_table SET TBLPROPERTIES (hoodie.keep.max.commits = '10');
ALTER TABLE hudi_table SET TBLPROPERTIES ("note" = "don't drop this table");
ALTER TABLE hudi_table UNSET TBLPROPERTIES IF EXISTS (hoodie.keep.max.commits);
ALTER TABLE hudi_table UNSET TBLPROPERTIES IF EXISTS ('note');
注意:目前,尝试更改列类型可能会抛出错误ALTER TABLE CHANGE COLUMN is not supported for changing column colName with oldColType to colName with newColType。
修改配置选项
还可以通过ALTER TABLE SET SERDEPROPERTIES命令修改表的写配置
语法:
-- alter table ... set|unset
ALTER TABLE tableName SET SERDEPROPERTIES ('property' = 'property_value');
示例:
ALTER TABLE hudi_table SET SERDEPROPERTIES ('key1' = 'value1');
显示和删除分区
语法:
-- 显示分区信息 SHOW PARTITIONS tableIdentifier; -- 删除分区 ALTER TABLE tableIdentifier DROP PARTITION ( partition_col_name = partition_col_val [ , ... ] );
示例:
-- 显示分区信息: SHOW PARTITIONS hudi_table; -- 删除分区: ALTER TABLE hudi_table DROP PARTITION (dt='2021-12-09', hh='10');
说明
当使用Spark SQL创建/修改表时,Hudi目前有以下限制。
(1)当使用AWS Glue Data Catalog作为hive metastore时不支持ALTER TABLE ... RENAME TO ...,因为Glue本身不支持表重命名。
(2)Spark SQL创建的新Hudi表默认设置为hoodie.datasource.write.hive_style_partitioning =true,方便使用。这可以使用表属性重写。