PySpark SQL用户自定义函数(UDF)

尽管PySpark SQL为大多数常见用例提供了大量的内置函数,但总会有一些情况下,这些功能都不能提供用户的用例所需要的功能。PySpark SQL提供了一个相当简单的工具来编写用户定义的函数(UDF),并在PySpark数据处理逻辑或应用程序中使用它们,就像使用内置函数一样。

UDF用于扩展框架的函数,并在多个DataFrame上重用这些函数。UDF实际上是用户可以扩展PySpark的功能以满足特定需求的一种方式。

在PySpark中,使用UDF涉及有三个步骤:

  • (1) 第一步是用Python语法创建一个函数并进行测试。
  • (2) 第二步是通过将函数名传递给PySpark SQL的udf()函数来注册它。
  • (3) 第三步是在DataFrame代码或发出SQL查询时使用UDF。在SQL查询中使用UDF时,注册过程略有不同。

示例1

【示例】下面的示例用一个简单的UDF将数字等级转换为考查等级,它演示了前面提到的三个步骤。

首先创建一个包含学生成绩的DataFrame,代码如下:

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
   .master("spark://localhost:7077") \
   .appName("pyspark sql demo") \
   .getOrCreate()

# 创建学生成绩DataFrame
studentDF = spark.createDataFrame(
    [
      ("张三", 85),
      ("李四", 90),
      ("王老五", 55)
    ],["name","score"]
)

studentDF.printSchema()
studentDF.show()

执行以上代码,输出内容如下:

root
 |-- name: string (nullable = true)
 |-- score: integer (nullable = false)

+------+-----+
|  name|score|
+------+-----+
|   张三|    85|
|   李四|    90|
| 王老五|    55|
+------+-----+

将studentDF注册到名为students的临时视图,代码如下:

# 注册为视图
studentDF.createOrReplaceTempView("students")

# spark.sql("select * from students").show()

接下来创建一个普通的Python函数,用来将成绩转换到考察等级,代码如下:

# 创建一个函数(普通的Python函数)将成绩转换到考察等级
def convertGrade(score):
    if score > 100:
        return "作弊"
    elif score >= 90:
        return "优秀"
    elif score >= 80:
        return "良好"
    elif score >= 70:
        return "中等"
    else:
        return "不及格"


# 注册为一个UDF(在DataFrame API中使用时的注册方法)
convertGradeUDF = udf(convertGrade)

# 使用该UDF将成绩转换为字母等级
studentDF.select("name","score", convertGradeUDF(col("score")).alias("grade")).show()

最后,可以像使用普通PySpark内置函数一个使用该UDF,将成绩转换为字母等级,代码如下:

# 使用该UDF将成绩转换为字母等级
studentDF \
  .select("name","score",convertGradeUDF(col("score")).alias("grade")) \
  .show()

执行以上代码,输出结果如下:

+------+-----+------+
|  name|score| grade|
+------+-----+------+
|   张三|    85|   良好|
|   李四|    90|   优秀|
| 王老五|    55| 不及格|
+------+-----+------+

当在SQL查询中使用UDF时,注册过程与上面略有不同,代码如下:

# 注册为UDF,在SQL中使用
spark.udf.register("convertGrade", convertGrade)
spark.sql("""
    select name, score, convertGrade(score) as grade 
    from students"""
).show()

执行以上代码,输出结果如下:

+------+-----+------+
|  name|score| grade|
+------+-----+------+
|   张三|    85|  良好|
|   李四|    90|  优秀|
| 王老五|    55|不及格|
+------+-----+------+

示例2

【示例】把DataFrame中名字字符串中每个单词的第一个字母都转换成大写字母。

首先,创建一个DataFrame,代码如下:

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
   .master("spark://localhost:7077") \
   .appName("pyspark sql demo") \
   .getOrCreate()

columns = ["Seqno","Name"]
data = [
    ("1", "john jones"),
    ("2", "tracey smith"),
    ("3", "amy sanders")
]

df = spark.createDataFrame(data=data,schema=columns)

df.show(truncate=False)

执行以上代码,输出结果如下:

+-----+------------+
|Seqno|Name         |
+-----+------------+
|1     |john jones  |
|2     |tracey smith|
|3     |amy sanders |
+-----+------------+

接下来,创建一个普通的Python函数,它接受一个字符串参数并将每个单词的第一个字母转换为大写字母,代码如下:

def convertCase(str):
    resStr=""
    arr = str.split(" ")
    for x in arr:
       resStr= resStr + x[0:1].upper() + x[1:len(x)] + " "
    return resStr 

然后通过将函数传递给PySpark SQL的pyspark.sql.functions.udf()这个函数,将函数convertCase()注册为UDF,代码如下:

convertUDF = udf(lambda z: convertCase(z), StringType())

因为udf()函数的默认类型就是StringType,因此,也可以编写不带返回类型的上述语句,代码如下:

convertUDF = udf(lambda z: convertCase(z)) 

现在可以在DataFrame列上将convertUDF()作为常规内置函数来使用,代码如下:

df.select(col("Seqno"), \
    convertUDF(col("Name")).alias("Name") ) \
   .show(truncate=False)

执行以上代码,输出结果如下:

+-----+-------------+
|Seqno|Name           |
+-----+-------------+
|1     |John Jones   |
|2     |Tracey Smith |
|3     |Amy Sanders  |
+-----+-------------+

也可以在DataFrame的withColumn()函数上使用udf()函数。下面创建另一个upperCase()函数,它将输入字符串转换为大写,代码如下:

def upperCase(str):
    return str.upper()

将upperCase()这个Python函数转换为UDF,然后将其与DataFrame withColumn()一起使用。下面的例子将Name列的值转换为大写,并创建一个新列Curated Name,代码如下:

upperCaseUDF = udf(lambda z:upperCase(z),StringType())   

df.withColumn("Cureated Name", upperCaseUDF(col("Name"))).show(truncate=False)

执行以上代码,输出结果如下:

+-----+------------+-------------+
|Seqno|Name         | Cureated Name|
+-----+------------+-------------+
|1     |john jones  |JOHN JONES    |
|2     |tracey smith|TRACEY SMITH |
|3     |amy sanders |AMY SANDERS  |
+-----+------------+-------------+

为了在PySpark SQL上使用convertCase()函数,需要使用spark.udf.register()在PySpark上注册这个函数,代码如下:

# 注册函数
spark.udf.register("convertUDF", convertCase,StringType())

# 创建临时视图
df.createOrReplaceTempView("NAME_TABLE")

# 执行SQL查询,在SQL语句中使用自定义函数
spark.sql("select Seqno, convertUDF(Name) as Name from NAME_TABLE").show(truncate=False)

执行以上代码,输出结果如下:

+-----+-------------+
|Seqno|Name          |
+-----+-------------+
|1    |John Jones    |
|2    |Tracey Smith |
|3    |Amy Sanders   |
+-----+-------------+

前面要创建UDF,需要两步处理:先创建一个Python函数,再将该函数注册为UDF。用户也可以通过注解来创建UDF,只需一步,代码如下:

@udf(returnType=StringType()) 
def upperCase(str):
    return str.upper()

df.withColumn("Cureated Name", upperCase(col("Name"))).show(truncate=False)

执行以上代码,输出结果如下:

+-----+------------+-------------+
|Seqno|Name         |Cureated Name|
+-----+------------+-------------+
|1     |john jones  |JOHN JONES    |
|2     |tracey smith|TRACEY SMITH |
|3     |amy sanders |AMY SANDERS  |
+-----+------------+-------------+

《Flink原理深入与编程实战》