PySpark SQL集合元素处理函数

集合被设计用来处理复杂的数据类型,如arrays、maps和struts。本节将介绍两种特定类型的集合函数。第一种方法是使用数组数据类型,第二种方法是处理为JSON数据格式。

1. 数组处理函数

PySpark DataFrame支持复杂数据类型,也就是列值可以是一个集合。可以使用数组相关的集合函数来轻松获取数组大小、检查值的存在、或者对数组进行排序。下面的示例包含了处理各种数组的相关函数用法,代码如下:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
   .master("spark://localhost:7077") \
   .appName("pyspark demo") \
   .getOrCreate()

# 创建一个任务集DataFrame
tasksDF = spark.createDataFrame([("星期天", ["抽烟", "喝酒", "去烫头"])], ["day", "tasks"])

# tasksDF的schema
tasksDF.printSchema()
tasksDF.show()

执行以上代码,输出结果如下:

root
 |-- day: string (nullable = true)
 |-- tasks: array (nullable = true)
 |    |-- element: string (containsNull = true)

+------+--------------------+
|day    |        tasks         | 
+------+--------------------+
|星期天 |[抽烟, 喝酒, 去烫头]   |
+------+--------------------+

接下来获得该数组的大小,对其进行排序,并检查在该数组中是否存在一个指定的值。代码如下:

tasksDF \
      .select(
        "day",
        size("tasks").alias("size"),                             # 数组大小
        sort_array("tasks").alias("sorted_tasks"),     	    # 对数组排序
        array_contains("tasks", "去烫头").alias("是否去烫头")  # 是否包含
      ) \
      .show(truncate=False) 

执行以上代码,输出结果如下:

+------+----+--------------------+----------+
|day    |size|sorted_tasks         |  是否去烫头|
+------+----+--------------------+----------+
|星期天 |3    |[去烫头, 喝酒, 抽烟]   | true      |
+------+----+--------------------+----------+

使用explode()表函数将为数组中的每一个元素创建一个新行,代码如下:

tasksDF.select("day", explode("tasks").alias("task")).show()

执行以上代码,输出结果如下:

+------+------+
|   day|   task|
+------+------+
| 星期天|   抽烟|
| 星期天|   喝酒|
| 星期天| 去烫头|
+------+------+

2. JSON处理函数

许多非结构化数据集都是以JSON的形式存在的。对于JSON数据类型的列,使用相关的集合函数将JSON字符串转换成struct(结构体)数据类型。主要的函数是from_json()、get_json_object()和to_json()。一旦JSON字符串被转换为PySpark struct数据类型,就可以轻松地提取这些值。下面的代码演示了from_json()和to_json()函数的示例。

首先构造一个带有JSON字符串内容的DataFrame,代码如下:

from pyspark.sql import SparkSession

spark = SparkSession.builder \
   .master("spark://localhost:7077") \
   .appName("pyspark demo") \
   .getOrCreate()

# 创建一个字符串,它包含有JSON格式的字符串内容
todos = """{"day": "星期天","tasks": ["抽烟", "喝酒", "去烫头"]}"""
todoStrDF = spark.createDataFrame([(todos,)], ["todos_str"])

# 查看schema和内容
todoStrDF.printSchema()
todoStrDF.show(truncate=False)

执行以上代码,输出结果如下:

root
 |-- todos_str: string (nullable = true)

+-----------------------------------------------------+
|todos_str                                                     |
+-----------------------------------------------------+
|    {"day": "星期天","tasks": ["抽烟", "喝酒", "去烫头"]}|
+-----------------------------------------------------+

为了将一个JSON字符串转换为一个PySpark结构体数据类型,需要将其结构描述给PySpark,为此需要定义一个Schema模式,并在from_json()函数中应用,代码如下:

from pyspark.sql.types import *

todoSchema = StructType([
    StructField("day", StringType(), True), 
    StructField("tasks",  ArrayType(StringType()), True)
])

# 使用from_json来转换JSON string
todosDF = todoStrDF \
	.select(from_json("todos_str",todoSchema).alias("todos"))

# todos是一个struct数据类型,包含两个字段:day 和 tasks
todosDF.printSchema()
todosDF.show()

执行以上代码,输出结果如下:

root
 |-- todos: struct (nullable = true)
 |    |-- day: string (nullable = true)
 |    |-- tasks: array (nullable = true)
 |    |    |-- element: string (containsNull = true)

+------------------------------+
|                              todos|
+------------------------------+
|    {星期天, [抽烟, 喝酒, 去烫头]}|
+------------------------------+

可以使用Column类的getItem()函数检索出结构体数据类型的值,代码如下:

todosDF \
      .select(
        col("todos").getItem("day"),
        col("todos").getItem("tasks"),
        col("todos").getItem("tasks")[0].alias("first_task")
      ) \
      .show(truncate=False)

执行以上代码,输出结果如下:

+---------+--------------------+----------+
|todos.day|todos.tasks           |first_task|
+---------+--------------------+----------+
|星期天     |[抽烟, 喝酒, 去烫头]   |抽烟        |
+---------+--------------------+----------+

也可以使用to_json()函数将一个PySpark结构体数据类型转换为JSON格式字符串,代码如下:

todosDF.select(to_json("todos")).show(truncate=False)

执行以上代码,输出结果如下:

+-------------------------------------------------+
|       to_json(todos)                                   |
+-------------------------------------------------+
|    {"day":"星期天","tasks":["抽烟","喝酒","去烫头"]}|
+-------------------------------------------------+

《Flink原理深入与编程实战》