PySpark SQL日期时间处理函数
为了帮助执行复杂的分析,PySpark SQL提供了一组强大而灵活的聚合函数、连接多个数据集的函数、一组内置的高性能函数和一组高级分析函数。
PySpark内置的日期时间函数大致可分为以下三个类别:
- 执行日期时间格式转换的函数。
- 执行日期时间计算的函数。
- 从日期时间戳中提取特定值(如年、月、日等)的函数。
日期和时间转换函数有助于将字符串转换为日期、时间戳或Unix时间戳,反之亦然。在内部,它使用Java日期格式模式语法。这些函数使用的默认的日期格式是yyyy-mm-dd HH:mm:ss。因此,如果日期或时间戳列的日期格式不同,那么需要向这些转换函数传入指定的模式。
1. 将字符串转换为日期或时间戳
例如,将字符串类型的日期和时间戳转换为PySpark SQL的date和timestamp类型,代码如下:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
spark = SparkSession.builder \
.master("spark://localhost:7077") \
.appName("pyspark demo") \
.getOrCreate()
# 1)日期和时间转换函数:这些函数使用的默认的日期格式是yyyy-mm-dd HH:mm:ss
# 构造一个简单的DataFrame,注意最后两列不遵循默认日期格式
testDate = [(1, "2019-01-01", "2019-01-01 15:04:58", "01-01-2019", "12-05-2018 45:50")]
testDateTSDF = spark.createDataFrame(testDate,schema=["id", "date", "timestamp", "date_str", "ts_str"])
# testDateTSDF.printSchema()
# testDateTSDF.show()
# 将这些字符串转换为date、timestamp和 unix timestamp,并指定一个自定义的date和timestamp 格式
testDateResultDF = testDateTSDF.select(
to_date('date').alias("date1"),
to_timestamp('timestamp').alias("ts1"),
to_date('date_str',"MM-dd-yyyy").alias("date2"),
to_timestamp('ts_str',"MM-dd-yyyy mm:ss").alias("ts2"),
unix_timestamp('timestamp').alias("unix_ts")
)
testDateResultDF.printSchema()
testDateResultDF.show(truncate=False)
执行以上代码,输出结果如下:
root |-- date1: date (nullable = true) |-- ts1: timestamp (nullable = true) |-- date2: date (nullable = true) |-- ts2: timestamp (nullable = true) |-- unix_ts: long (nullable = true) +----------+-------------------+----------+-------------------+----------+ |date1 |ts1 |date2 |ts2 |unix_ts | +----------+-------------------+----------+-------------------+----------+ |2019-01-01|2019-01-01 15:04:58|2019-01-01|2018-12-05 00:45:50|1546326298| +----------+-------------------+----------+-------------------+----------+
2. 将日期或时间戳转换为字符串
将日期或时间戳转换为时间字符串是很容易的,方法是使用date_format()函数和定制日期格式,或者使用from_unixtime()函数将Unix时间戳(以秒为单位)转换成字符串。请看日期和时间戳转换为格式字符串的转换例子,代码如下:
from pyspark.sql.functions import *
testDateResultDF.select(
date_format('date1', "dd-MM-yyyy").alias("date_str"),
date_format('ts1', "dd-MM-yyyy HH:mm:ss").alias("ts_str"),
from_unixtime('unix_ts',"dd-MM-yyyy HH:mm:ss").alias("unix_ts_str")
).show()
执行以上代码,输出结果如下:
+----------+-------------------+-------------------+ | date_str| ts_str| unix_ts_str| +----------+-------------------+-------------------+ |01-01-2019|01-01-2019 15:04:58|01-01-2019 15:04:58| +----------+-------------------+-------------------+
3. 日期计算函数
日期-时间计算函数有助于计算两个日期或时间戳的相隔时间,以及执行日期或时间算术运算。关于日期-时间计算的示例,代码如下:
from pyspark.sql.functions import *
# 2) 日期-时间(date-time)计算函数
data = [("黄渤", "2016-01-01", "2017-10-15"),
("王宝强", "2017-02-06", "2017-12-25")]
employeeData = spark.createDataFrame(data, schema=["name", "join_date", "leave_date"])
employeeData.show()
执行以上代码,输出内容如下:
+------+----------+----------+ | name| join_date|leave_date| +------+----------+----------+ | 黄渤|2016-01-01|2017-10-15| | 王宝强|2017-02-06|2017-12-25| +------+----------+----------+
执行date()和month()计算,代码如下:
from pyspark.sql.functions import *
employeeData.select(
'name',
datediff('leave_date', 'join_date').alias("days"),
months_between('leave_date', 'join_date').alias("months"),
last_day('leave_date').alias("last_day_of_mon")
).show()
执行以上代码,输出内容如下:
+------+----+-----------+---------------+ | name|days| months|last_day_of_mon| +------+----+-----------+---------------+ | 黄渤| 653| 21.4516129| 2017-10-31| | 王宝强| 322|10.61290323| 2017-12-31| +------+----+-----------+---------------+
执行日期加、减计算,代码如下:
from pyspark.sql.functions import *
oneDate = spark.createDataFrame([("2019-01-01",)],schema=["new_year"])
oneDate.select(
date_add('new_year', 14).alias("mid_month"),
date_sub('new_year', 1).alias("new_year_eve"),
next_day('new_year', "Mon").alias("next_mon")
).show()
执行上面的代码,输出内容如下:
+----------+------------+----------+ | mid_month|new_year_eve| next_mon| +----------+------------+----------+ |2019-01-15| 2018-12-31|2019-01-07| +----------+------------+----------+
4. 转换不规范的日期
有的时候,采集到的数据是不受控制的,得到的日期可能是不规范的,这就需要将这些不规范的日期转换为规范的表示。对不规范日期的转换,代码如下:
from pyspark.sql.functions import *
# 转换不规范的日期:
data = [("Nov 05, 2018 02:46:47 AM",),("Nov 5, 2018 02:46:47 PM",)]
df = spark.createDataFrame(data,schema=["times"])
df.withColumn(
"times2",
from_unixtime(
unix_timestamp("times", "MMM d, yyyy hh:mm:ss a"),
"yyyy-MM-dd HH:mm:ss.SSSSSS"
)
).show(truncate=False)
执行以上代码,输出结果如下:
+------------------------+--------------------------+ |times |times2 | +------------------------+--------------------------+ |Nov 05, 2018 02:46:47 AM|2018-11-05 02:46:47.000000| |Nov 5, 2018 02:46:47 PM |2018-11-05 14:46:47.000000| +------------------------+--------------------------+
5. 处理时间序列数据
在处理时间序列数据(time-series data)时,经常需要提取日期或时间戳值的特定字段(如年、月、小时、分钟和秒)。例如,当需要按季度、月或周对所有股票交易进行分组时,就可以从交易日期提取该信息,并按这些值分组。从日期或时间戳中提取字段,代码如下:
from pyspark.sql.functions import *
# 3)提取日期或时间戳值的特定字段(如年、月、小时、分钟和秒)
# 从一个日期值中提取指定的日期字段
valentimeDateDF = spark \
.createDataFrame([("2019-02-14 13:14:52",)],["date"])
valentimeDateDF.select(
year('date').alias("year"), # 年
quarter('date').alias("quarter"), # 季
month('date').alias("month"), # 月
weekofyear('date').alias("woy"), # 周
dayofmonth('date').alias("dom"), # 日
dayofyear('date').alias("doy"), # 天
hour('date').alias("hour"), # 小时
minute('date').alias("minute"), # 分
second('date').alias("second") # 秒
).show()
执行以上代码,输出结果如下:
+----+-------+-----+---+---+---+----+------+------+ |year|quarter|month|woy|dom|doy|hour|minute|second| +----+-------+-----+---+---+---+----+------+------+ |2019| 1| 2| 7| 14| 45| 13| 14| 52| +----+-------+-----+---+---+---+----+------+------+