PySpark SQL数据集join连接操作

PySpark SQL支持对两个或多个DataFrame执行各种类型的join连接操作。

执行两个数据集的连接需要指定两个内容:

  • (1) 第一个是连接表达式,它指定来自每个数据集的哪些列应该用于确定来自两个数据集的哪些行将被包含在连接后的数据集中(确定连接列/等值列)。
  • (2) 第二种是连接类型,它决定了连接后的数据集中应该包含哪些内容。

为了演示如何在PySpark SQL中使用join连接,需要先准备两个小型的DataFrame。第一个DataFrame代表一个员工列表,每一行包含员工姓名和所属部门。第二个DataFrame包含一个部门列表,每一行包含一个部门ID和部门名称。

为此,首先创建两个测试DataFrame,代码如下:

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

spark = SparkSession.builder \
   .master("spark://localhost:7077") \
   .appName("pyspark sql demo") \
   .getOrCreate()

# 员工
employeeDF = spark.createDataFrame([
    ("刘宏明", 31),
    ("赵薇", 33),
    ("黄海波", 33),
    ("杨幂", 34),
    ("楼一萱", 34),
    ("龙梅子", None)
], ["name","dept_no"])

# 部门
deptDF = spark.createDataFrame([
    (31, "销售部"),
    (33, "工程部"),
    (34, "财务部"),
    (35, "市场营销部")
], ["id","name"])

将这两个DataFrame注册为临时视图,代码如下:

employeeDF.createOrReplaceTempView("employees")
deptDF.createOrReplaceTempView("departments")

然后就可以使用SQL来执行join连接测试了。

1. 内连接

这是最常用的连接类型,它使用相等比较的连接表达式,包含来自两个数据集与连接条件相匹配的列。连接的数据集只有当连接表达式结果为真时才包含行。没有匹配列值的行将被排除在连接数据集之外。在PySpark SQL中,内联接是默认连接类型。

对employeeDF和deptDF这两个数据集按id执行内连接,代码如下:

# 执行该join
employeeDF.join(deptDF, col("dept_no") == col("id"), "inner").show()

# 不需要指定该join类型,因为"inner"是默认的
# employeeDF.join(deptDF, col("dept_no") == col("id")).show()

# 使用SQL
# spark.sql("select * from employees JOIN departments on dept_no == id").show()

执行上面的代码,输出结果如下:

+--------+-------+---+------+
|emp_name|dept_no| id|  name|
+--------+-------+---+------+
|    刘宏明|     31| 31|  销售部|
|      赵薇|     33| 33|  工程部|
|    黄海波|     33| 33|  工程部|
|      杨幂|     34| 34|  财务部|
|    楼一萱|     34| 34|  财务部|
+--------+-------+---+------+

连接表达式可以在join()转换中指定,也可以使用where()变换。如果列名是唯一的,则可以使用简写引用join表达式中的列。如果没有,则需要通过使用col()函数指定特定列来自哪个DataFrame,代码如下:

# join表达式的简写版本
employeeDF.join(deptDF, col("dept_no") == col("id")).show()

# 指定特定列来自哪个DataFrame
employeeDF.join(deptDF, employeeDF.dept_no == deptDF.id).show()

# 使用where transformation指定join表达式
employeeDF.join(deptDF).where('dept_no == id').show()

执行上面的代码,输出结果相同,均为如下内容:

+--------+-------+---+------+
|emp_name|dept_no| id|  name|
+--------+-------+---+------+
|    刘宏明|     31| 31|  销售部|
|      赵薇|     33| 33|  工程部|
|    黄海波|     33| 33|  工程部|
|      杨幂|     34| 34|  财务部|
|    楼一萱|     34| 34|  财务部|
+--------+-------+---+------+

2. 左外连接

这个join类型的连接后的数据集包括来自内连接的所有行加上来自左边数据集的连接表达式的计算结果为False的所有行。对于那些不匹配的行,它将为右边的数据集的列填充NULL值。例如,对employeeDF和deptDF执行左外连接,代码如下:

# 连接类型既可以是"left_outer",也可以是"leftouter"
employeeDF \
	.join(deptDF, col("dept_no") == col("id"), "left_outer") \
	.show()

# 或者使用SQL
spark.sql("""
    select * 
    from employees 
        LEFT OUTER JOIN departments 
        on dept_no == id
""").show()

执行上面的代码,输出结果相同,均为如下内容:

+--------+-------+----+------+
|emp_name|dept_no|  id|  name|
+--------+-------+----+------+
|    刘宏明|     31|  31|  销售部|
|      赵薇|     33|  33|  工程部|
|    黄海波|     33|  33|  工程部|
|      杨幂|     34|  34|  财务部|
|    楼一萱|     34|  34|  财务部|
|    龙梅子|      0|null|   null|
+--------+-------+----+------+

3. 右外连接

这种join类型的行为类似于左外连接类型的行为,除了将相同的处理应用于右边的数据集之外。换句话说,连接后的数据集包括来自内连接的所有行加上来自右边数据集的连接表达式的计算结果为False的所有行。对于那些不匹配的行,它将为左边数据集的列填充NULL值。例如,对employeeDF和deptDF执行右外连接,代码如下:

# 连接类型既可以是"right_outer",也可以是"rightouter"
employeeDF \
	.join(deptDF, col("dept_no") == col("id"), "right_outer") \
	.show()

# 或者使用SQL
spark.sql("""
    select * 
    from employees 
        RIGHT OUTER JOIN departments 
        on dept_no == id
""").show()

执行上面的代码,输出结果相同,均为如下内容:

+--------+-------+---+----------+
|emp_name|dept_no| id|      name|
+--------+-------+---+----------+
|    刘宏明|     31| 31|      销售部|
|    黄海波|     33| 33|      工程部|
|      赵薇|     33| 33|      工程部|
|    楼一萱|     34| 34|      财务部|
|      杨幂|     34| 34|      财务部|
|     null|   null| 35|  市场营销部|
+--------+-------+---+----------+

4. 全外连接

这种join类型的行为实际上与将左外连接和右外连接的结果结合起来是一样的。例如,对employeeDF和deptDF执行全外连接,代码如下:

# 使用join转换
employeeDF.join(deptDF, col("dept_no") == col("id"), "outer").show()

# 或者使用SQL
spark.sql("""
    select * 
    from employees 
        FULL OUTER JOIN departments 
        on dept_no == id
""").show()

执行上面的代码,输出结果相同,均为如下内容:

+--------+-------+----+----------+
|emp_name|dept_no|  id|       name|
+--------+-------+----+----------+
|    龙梅子|      0| null|      null|
|      杨幂|     34|  34|      财务部|
|    楼一萱|     34|  34|      财务部|
|    刘宏明|     31|  31|      销售部|
|      赵薇|     33|  33|      工程部|
|    黄海波|     33|  33|      工程部|
|     null|   null|  35|  市场营销部|
+--------+-------+----+----------+

5. 左反连接

这种join类型能够发现来自左边数据集的哪些行在右边的数据集上没有任何匹配的行,而连接后的数据集只包含来自左边数据集的列。例如,对employeeDF和deptDF执行左反连接,代码如下:

# 使用join转换
employeeDF.join(deptDF, col("dept_no") == col("id"), "left_anti").show()

# 或者使用SQL
spark.sql("""
    select * 
    from employees 
        LEFT ANTI JOIN departments 
        on dept_no == id
""").show()

执行上面的代码,输出结果相同,均为如下内容:

+--------+-------+
|emp_name|dept_no|
+--------+-------+
|    龙梅子|       0|
+--------+-------+

6. 左半连接

这种join类型的行为类似于内连接类型,除了连接后的数据集不包括来自右边数据集的列。可以将这种join类型看作与左反连接类型相反,在这里,连接后的数据集只包含匹配的行。例如,对employeeDF和deptDF执行左半连接,代码如下:

# 使用join转换
employeeDF.join(deptDF, col("dept_no") == col("id"), "left_semi").show()

# 使用SQL
spark.sql("""
    select * 
    from employees 
        LEFT SEMI JOIN departments 
        on dept_no == id
""").show()

执行上面的代码,输出结果相同,均为如下内容:

+--------+-------+
|emp_name|dept_no|
+--------+-------+
|   刘宏明|      31|
|     赵薇|      33|
|   黄海波|      33|
|     杨幂|      34|
|   楼一萱|      34|
+--------+-------+

7. 交叉连接

又称笛卡尔连接。例如,对employeeDF和deptDF执行交叉连接,代码如下:

# 使用crossJoin transformation 并显示该count
print(employeeDF.crossJoin(deptDF).count())       # 24

# 使用SQL,并显示前30行以观察连接后的数据集中所有的行
spark.sql("select * from employees CROSS JOIN departments").show(30)

执行上面的代码,输出结果如下:

24
+--------+-------+---+----------+
|emp_name|dept_no| id|       name|
+--------+-------+---+----------+
|    刘宏明|     31| 31|       销售部|
|    刘宏明|     31| 33|       工程部|
|    刘宏明|     31| 34|       财务部|
|    刘宏明|     31| 35|   市场营销部|
|      赵薇|     33| 31|       销售部|
|      赵薇|     33| 33|       工程部|
|      赵薇|     33| 34|       财务部|
|      赵薇|     33| 35|   市场营销部|
|    黄海波|     33| 31|       销售部|
|    黄海波|     33| 33|       工程部|
|    黄海波|     33| 34|       财务部|
|    黄海波|     33| 35|   市场营销部|
|      杨幂|     34| 31|       销售部|
|      杨幂|     34| 33|       工程部|
|      杨幂|     34| 34|       财务部|
|      杨幂|     34| 35|   市场营销部|
|    楼一萱|     34| 31|       销售部|
|    楼一萱|     34| 33|       工程部|
|    楼一萱|     34| 34|       财务部|
|    楼一萱|     34| 35|   市场营销部|
|    龙梅子|      0| 31|       销售部|
|    龙梅子|      0| 33|       工程部|
|    龙梅子|      0| 34|       财务部|
|    龙梅子|      0| 35|   市场营销部|
+--------+-------+---+----------+

8. 处理重复列名

有时,在join两个具有同名列的DataFrame之后,会出现一个意想不到的问题。当这种情况发生时,连接后的DataFrame会有多个同名的列。在这种情况下,在对连接后的DataFrame进行某种转换时,就不太方便引用其中一列。

例如,向deptDF增加一个新的列,列名为dept_no,值来自于id列,代码如下:

# 将deptDF的列名dept_id修改为dept_no
deptDF2 = deptDF.withColumn("dept_no", "id")

deptDF2.printSchema()

执行上面的代码,输出结果如下:

root
 |-- id: long (nullable = false)
 |-- name: string (nullable = true)
 |-- dept_no: long (nullable = false)

现在,使用employeeDF连接deptDF2,基于dept_no列进行连接,代码如下:

dupNameDF = employeeDF.join(deptDF2, employeeDF.dept_no == deptDF2.dept_no)

dupNameDF.printSchema()

执行上面的代码,输出结果如下:

root
 |-- emp_name: string (nullable = true)
 |-- dept_no: long (nullable = false)
 |-- id: long (nullable = false)
 |-- name: string (nullable = true)
 |-- dept_no: long (nullable = false)

注意,dupNameDF现在有两个名称相同的列,都叫dept_no。当试图在dupNameDF中投影dept_no列时,PySpark会抛出一个错误。例如,选择dept_no列,代码如下:

dupNameDF.select("dept_no")

执行上面的代码,会抛出异常信息,内容如下:

AnalysisException:Reference 'dept_no' is ambiguous, could be: dept_no, dept_no.
......

这个异常信息的意思是,因为dept_no列是模糊的(即不知道应该引用哪个DataFrame中的dept_no列),所以无法正确执行,抛出异常。

要解决这个问题,可以有以下几种方法。

(1) 使用原始的DataFrame。

# 解决方法一:明确来自哪个DataFrame
# dupNameDF.select(employeeDF.dept_no).show()
dupNameDF.select(deptDF2.dept_no).show()

执行上面的代码,会发现可以正常执行而没有异常抛出。

(2) 在join之前重命名列。

为了避免列名称的模糊性问题,另一种方法是使用withColumnRenamed()转换来重命名其中一个DataFrames中的列,代码如下:

# 解决方法二:join之前重命名列,使用withColumnRenamed转换
deptDF3 = deptDF2.withColumnRenamed("dept_no","dept_id")
deptDF3.printSchema()

dupNameDF3 = employeeDF.join(deptDF3, col('dept_no') == col('dept_id'))
dupNameDF3.printSchema()

dupNameDF3.select("dept_no").show()

执行上面的代码,会发现可以正常执行而没有异常抛出,因为在join时已经不存在重名的列了。

(3) 使用一个连接后的列名。

在两个DataFrams中,当连接的列名是相同的时,在join()函数中指定一个连接列名即可,这会自动从连接后的DataFrame中删除重复列名。但是,如果这是一个自连接,也就是说连接一个DataFrame本身,那么就没有办法引用其他重复的列名。在这种情况下,需要使用第一个技术来重命名一个DataFrame的列,代码如下:

noDupNameDF = employeeDF.join(deptDF2, "dept_no")

noDupNameDF.printSchema()
noDupNameDF.show()

执行上面的代码,输出结果如下:

root
 |-- dept_no: long (nullable = false)
 |-- emp_name: string (nullable = true)
 |-- id: long (nullable = false)
 |-- name: string (nullable = true)

+-------+--------+---+------+
|dept_no|emp_name| id|  name|
+-------+--------+---+------+
|      31|    刘宏明| 31| 销售部|
|      33|      赵薇| 33| 工程部|
|      33|    黄海波| 33| 工程部|
|      34|      杨幂| 34| 财务部|
|      34|    楼一萱| 34| 财务部|
+-------+--------+---+------+

《Spark原理深入与编程实战》