临时视图与SQL查询

PySpark SQL支持直接应用标准SQL语句进行查询。当在PySpark SQL中编写SQL命令时,它们会被翻译为DataFrame上的关系操作。在SQL语句内,可以访问所有SQL表达式和内置函数。

这需要使用SparkSession的sql()函数执行给定的SQL查询,该查询会返回一个DataFrame。

1. 在PySpark程序中执行SQL语句

PySpark SQL支持使用基本SQL语法或HiveQL编写的SQL查询的执行。在pyspark shell或Zeppelin Notebook中,会自动导入spark.sql,所以可以直接使用该函数用来编写SQL命令,代码如下:

spark.sql("select current_date() as today , 1 + 100 as value").show()

SparkSession的sql()函数执行给定的SQL查询,该查询会返回一个DataFrame。

本节只讨论最后一个选项,即在PySpark应用程序中以编程方式运行SQL。例如,执行一个不带注册视图的SQL语句,代码如下:

from pyspark.sql import SparkSession

# 构建SparkSession实例
spark = SparkSession.builder \
   .master("spark://localhost:7077") \
   .appName("pyspark sql demo") \
   .getOrCreate()

infoDF = spark.sql("select current_date() as today , 1 + 100 as value")
infoDF.show()

执行以上代码,输出内容如下:

+----------+-----+
|     today|value|
+----------+-----+
|2022-02-11|  101|
+----------+-----+

除了使用PySpark读API将文件加载到DataFrame并对其进行查询外,PySpark也可以使用SQL语句直接查询该数据文件,代码如下:

sqlDF = spark.sql("""
  SELECT * 
  FROM parquet.`/data/spark/resources/users.parquet`
""")
sqlDF.show()

执行以上代码,输出内容如下:

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+

2. 注册临时视图并执行SQL查询

DataFrame本质上就像数据库中的表一样,可以通过SQL语句来查询它们。不过,在可以发出SQL查询来操纵它们之前,需要将它们注册为一个临时视图,然后,就可以使用SQL查询从临时表中查询数据了。每个临时视图都有一个名字,通过视图的名字来引用该DataFrame,该名字在select子句中用作表名。

例如,要查询电影数据集movies.parquet文件,代码如下:

from pyspark.sql import SparkSession

# 构建SparkSession实例
spark = SparkSession.builder \
   .master("spark://localhost:7077") \
   .appName("pyspark sql demo") \
   .getOrCreate()

# 定义文件路径
parquetFile = "/data/spark/movies2/movies.parquet"

# 读取到DataFrame中
movies = spark.read.parquet(parquetFile)

# 现在将movies DataFrame注册为一个临时视图
movies.createOrReplaceTempView("movies")

# 从视图view查询
sql = """
    select * 
    from movies 
    where actor_name like '%Jolie%' and produced_year > 2009
"""
spark.sql(sql).show()

执行以上代码,输出内容如下:

+---------------+---------------+-------------+
|     actor_name|    movie_title|produced_year|
+---------------+---------------+-------------+
|Jolie, Angelina|           Salt|         2010|
|Jolie, Angelina|Kung Fu Panda 2|         2011|
|Jolie, Angelina|    The Tourist|         2010|
+---------------+---------------+-------------+

也可以在sql()函数中混合使用SQL语句和DataFrame转换API。例如,查询电影数据集,找出参演影片超过30部的演员,代码如下:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# 构建SparkSession实例
spark = SparkSession.builder \
   .master("spark://localhost:7077") \
   .appName("pyspark sql demo") \
   .getOrCreate()

# 定义文件路径
parquetFile = "/data/spark/movies2/movies.parquet"

# 读取到DataFrame中
movies = spark.read.parquet(parquetFile)

# 现在将movies DataFrame注册为一个临时视图
movies.createOrReplaceTempView("movies")

# 从视图view查询
spark.sql("select actor_name, count(*) as count from movies group by actor_name") \
     .where('count > 30') \
     .orderBy(col("count").desc()) \
     .show()

执行以上代码,输出内容如下:

+------------------+-----+
|        actor_name|count|
+------------------+-----+
|  Tatasciore, Fred|   38|
|     Welker, Frank|   38|
|Jackson, Samuel L.|   32|
|     Harnell, Jess|   31|
+------------------+-----+

当SQL语句较长时,可以利用"""(三引号)来格式化多行SQL语句。例如,查询电影数据集,使用子查询来计算每年产生的电影数量,代码如下:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# 构建SparkSession实例
spark = SparkSession.builder \
   .master("spark://localhost:7077") \
   .appName("pyspark sql demo") \
   .getOrCreate()

# 定义文件路径
parquetFile = "/data/spark/movies2/movies.parquet"

# 读取到DataFrame中
movies = spark.read.parquet(parquetFile)

# 现在将movies DataFrame注册为一个临时视图
movies.createOrReplaceTempView("movies")

# 使用子查询来计算每年产生的电影数量(利用"""来格式化多行SQL语句)
spark.sql("""select produced_year, count(*) as count
             from (select distinct movie_title, produced_year from movies)
             group by produced_year
        """) \
     .orderBy(col("count").desc()) \
     .show(5)

执行以上代码,输出内容如下:

+-------------+-----+
|produced_year|count|
+-------------+-----+
|         2011|   86|
|         2004|   86|
|         2006|   86|
|         2005|   85|
|         2008|   82|
+-------------+-----+
only showing top 5 rows

3. 直接使用数据源注册临时视图

在前面的示例中,都是先将数据加载到一个DataFrame中,然后将该DataFrame注册为临时视图或全局视图。除此之外,也可以使用SparkSession的sql()方法从注册的数据源直接加载数据注册临时视图。

例如,注册一个Parquet文件并加载它的内容,代码如下:

from pyspark.sql import SparkSession

# 构建SparkSession实例
spark = SparkSession.builder \
   .master("spark://localhost:7077") \
   .appName("pyspark sql demo") \
   .getOrCreate()

# 从parquet数据源创建临时视图
spark.sql("create temporary view usersParquet "+
      "using org.apache.spark.sql.parquet "+
      "options(path '/data/spark/resources/users.parquet')")

# 查询临时视图
spark.sql("select * from usersParquet").show()

执行以上代码,输出结果如下:

+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+

下面是另一个使用内置数据源的例子。从JDBC注册一个临时视图,然后使用SQL语句查询该临时视图(这里连接的是MySQL 5数据库,读者如果使用的是其他版本的MySQL,请自行修改为相应版本的连接参数),代码如下:

from pyspark.sql import SparkSession

# 构建SparkSession实例
spark = SparkSession.builder \
   .master("spark://localhost:7077") \
   .appName("pyspark sql demo") \
   .getOrCreate()

# 从jdbc数据源创建临时视图
spark.sql(
      """
        create temporary view peoplesjdbc
        using org.apache.spark.sql.jdbc
        options(
          url 'jdbc:mysql://localhost:3306/xueai8',
          dbtable 'peoples',
          user 'root',
          password 'admin'
        )
      """
    )

# 在临时视图上执行查询
spark.sql("select * from peoplesjdbc").show()

执行以上代码,输出结果如下:

+---+------+---+
| id|  name|age|
+---+------+---+
|  1|   张三| 23|
|  2|   李四| 18|
|  3| 王老五| 35|
+---+------+---+

《PySpark原理深入与编程实战》