PySpark SQL日期时间处理函数

为了帮助执行复杂的分析,PySpark SQL提供了一组强大而灵活的聚合函数、连接多个数据集的函数、一组内置的高性能函数和一组高级分析函数。

PySpark内置的日期时间函数大致可分为以下三个类别:

  • 执行日期时间格式转换的函数。
  • 执行日期时间计算的函数。
  • 从日期时间戳中提取特定值(如年、月、日等)的函数。

日期和时间转换函数有助于将字符串转换为日期、时间戳或Unix时间戳,反之亦然。在内部,它使用Java日期格式模式语法。这些函数使用的默认的日期格式是yyyy-mm-dd HH:mm:ss。因此,如果日期或时间戳列的日期格式不同,那么需要向这些转换函数传入指定的模式。

1. 将字符串转换为日期或时间戳

例如,将字符串类型的日期和时间戳转换为PySpark SQL的date和timestamp类型,代码如下:

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
   .master("spark://localhost:7077") \
   .appName("pyspark demo") \
   .getOrCreate()

# 1)日期和时间转换函数:这些函数使用的默认的日期格式是yyyy-mm-dd HH:mm:ss
# 构造一个简单的DataFrame,注意最后两列不遵循默认日期格式
testDate = [(1, "2019-01-01", "2019-01-01 15:04:58", "01-01-2019", "12-05-2018 45:50")]
testDateTSDF = spark.createDataFrame(testDate,schema=["id", "date", "timestamp", "date_str", "ts_str"])

# testDateTSDF.printSchema()
# testDateTSDF.show()

# 将这些字符串转换为date、timestamp和 unix timestamp,并指定一个自定义的date和timestamp 格式

testDateResultDF = testDateTSDF.select(
      to_date('date').alias("date1"),
      to_timestamp('timestamp').alias("ts1"),
      to_date('date_str',"MM-dd-yyyy").alias("date2"),
      to_timestamp('ts_str',"MM-dd-yyyy mm:ss").alias("ts2"),
      unix_timestamp('timestamp').alias("unix_ts")
    )

testDateResultDF.printSchema()
testDateResultDF.show(truncate=False)

执行以上代码,输出结果如下:

root
 |-- date1: date (nullable = true)
 |-- ts1: timestamp (nullable = true)
 |-- date2: date (nullable = true)
 |-- ts2: timestamp (nullable = true)
 |-- unix_ts: long (nullable = true)

+----------+-------------------+----------+-------------------+----------+
|date1     |ts1                |date2     |ts2                |unix_ts     |
+----------+-------------------+----------+-------------------+----------+
|2019-01-01|2019-01-01 15:04:58|2019-01-01|2018-12-05 00:45:50|1546326298|
+----------+-------------------+----------+-------------------+----------+

2. 将日期或时间戳转换为字符串

将日期或时间戳转换为时间字符串是很容易的,方法是使用date_format()函数和定制日期格式,或者使用from_unixtime()函数将Unix时间戳(以秒为单位)转换成字符串。请看日期和时间戳转换为格式字符串的转换例子,代码如下:

from pyspark.sql.functions import *

testDateResultDF.select(
      date_format('date1', "dd-MM-yyyy").alias("date_str"),
      date_format('ts1', "dd-MM-yyyy HH:mm:ss").alias("ts_str"),
      from_unixtime('unix_ts',"dd-MM-yyyy HH:mm:ss").alias("unix_ts_str")
).show()

执行以上代码,输出结果如下:

+----------+-------------------+-------------------+
|  date_str|                ts_str|          unix_ts_str|
+----------+-------------------+-------------------+
|01-01-2019|01-01-2019 15:04:58|01-01-2019 15:04:58|
+----------+-------------------+-------------------+

3. 日期计算函数

日期-时间计算函数有助于计算两个日期或时间戳的相隔时间,以及执行日期或时间算术运算。关于日期-时间计算的示例,代码如下:

from pyspark.sql.functions import *

# 2) 日期-时间(date-time)计算函数
data = [("黄渤", "2016-01-01", "2017-10-15"),
("王宝强", "2017-02-06", "2017-12-25")]
employeeData = spark.createDataFrame(data, schema=["name", "join_date", "leave_date"])

employeeData.show()

执行以上代码,输出内容如下:

+------+----------+----------+
|  name| join_date|leave_date|
+------+----------+----------+
|   黄渤|2016-01-01|2017-10-15|
| 王宝强|2017-02-06|2017-12-25|
+------+----------+----------+

执行date()和month()计算,代码如下:

from pyspark.sql.functions import *

employeeData.select(
      'name',
      datediff('leave_date', 'join_date').alias("days"),
      months_between('leave_date', 'join_date').alias("months"),
      last_day('leave_date').alias("last_day_of_mon")
).show()

执行以上代码,输出内容如下:

+------+----+-----------+---------------+
|  name|days|      months|last_day_of_mon|
+------+----+-----------+---------------+
|   黄渤| 653| 21.4516129|      2017-10-31|
| 王宝强| 322|10.61290323|      2017-12-31|
+------+----+-----------+---------------+

执行日期加、减计算,代码如下:

from pyspark.sql.functions import *

oneDate = spark.createDataFrame([("2019-01-01",)],schema=["new_year"])
oneDate.select(
      date_add('new_year', 14).alias("mid_month"),
      date_sub('new_year', 1).alias("new_year_eve"),
      next_day('new_year', "Mon").alias("next_mon")
).show()

执行上面的代码,输出内容如下:

+----------+------------+----------+
| mid_month|new_year_eve|  next_mon|
+----------+------------+----------+
|2019-01-15|  2018-12-31|2019-01-07|
+----------+------------+----------+

4. 转换不规范的日期

有的时候,采集到的数据是不受控制的,得到的日期可能是不规范的,这就需要将这些不规范的日期转换为规范的表示。对不规范日期的转换,代码如下:

from pyspark.sql.functions import *

# 转换不规范的日期:
data = [("Nov 05, 2018 02:46:47 AM",),("Nov 5, 2018 02:46:47 PM",)]
df = spark.createDataFrame(data,schema=["times"])

df.withColumn(
      "times2",
      from_unixtime(
        unix_timestamp("times", "MMM d, yyyy hh:mm:ss a"),
        "yyyy-MM-dd HH:mm:ss.SSSSSS"
      )
).show(truncate=False)

执行以上代码,输出结果如下:

+------------------------+--------------------------+
|times                       |times2                        |
+------------------------+--------------------------+
|Nov 05, 2018 02:46:47 AM|2018-11-05 02:46:47.000000|
|Nov 5, 2018 02:46:47 PM |2018-11-05 14:46:47.000000|
+------------------------+--------------------------+

5. 处理时间序列数据

在处理时间序列数据(time-series data)时,经常需要提取日期或时间戳值的特定字段(如年、月、小时、分钟和秒)。例如,当需要按季度、月或周对所有股票交易进行分组时,就可以从交易日期提取该信息,并按这些值分组。从日期或时间戳中提取字段,代码如下:

from pyspark.sql.functions import *

# 3)提取日期或时间戳值的特定字段(如年、月、小时、分钟和秒)
# 从一个日期值中提取指定的日期字段
valentimeDateDF = spark \
	.createDataFrame([("2019-02-14 13:14:52",)],["date"])

valentimeDateDF.select(
      year('date').alias("year"),               	# 年
      quarter('date').alias("quarter"),         	# 季
      month('date').alias("month"),             	# 月
      weekofyear('date').alias("woy"),          	# 周
      dayofmonth('date').alias("dom"),          	# 日
      dayofyear('date').alias("doy"),           	# 天
      hour('date').alias("hour"),               	# 小时
      minute('date').alias("minute"),           	# 分
      second('date').alias("second")          	# 秒
).show()

执行以上代码,输出结果如下:

+----+-------+-----+---+---+---+----+------+------+
|year|quarter|month|woy|dom|doy|hour|minute|second|
+----+-------+-----+---+---+---+----+------+------+
|2019|       1|     2|  7| 14| 45|   13|    14|     52|
+----+-------+-----+---+---+---+----+------+------+

《Spark原理深入与编程实战》