发布日期:2022-07-31 VIP内容

PySpark SQL分析案例:电影评分数据集分析

本节使用PySpark SQL实现对电影数据集进行分析。在这里使用推荐领域一个著名的开放测试数据集movielens。MovieLens数据集包括电影元数据信息和用户属性信息。本例将使用其中的users.dat和ratings.dat两个数据集。

【示例】使用PySpark DataFrame API统计看过电影“Lord of the Rings,The(1978)”的用户的年龄和性别分布(提示该影片的id是2116)。

请按以下步骤执行。

(1) 首先构造一个SparkSession实例,代码如下:

from pyspark.sql import SparkSession
from pyspark.sql.functions import *

# 创建SparkSession的实例
spark = SparkSession.builder \
    .master("spark://localhost:7077") \
    .appName("pyspark sql demo") \
    .config("spark.hadoop.hive.exec.dynamic.partition", "true") \
    .config("spark.hadoop.hive.exec.dynamic.partition.mode","nonstrict") \
    .enableHiveSupport() \
    .getOrCreate()

(2) 读取用户数据集users.dat,加载到DataFrame中,代码如下:

from pyspark.sql.types import *

# 定义文件路径
usersFile = "/data/spark/ml-1m/users.dat"

# 指定一个Schema(模式)
fields = [
    StructField("userID", LongType(), True), 
    StructField("gender", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("occupation", StringType(), True),
    StructField("zipcode", StringType(), True)
] 
schema = StructType(fields)

# 加载数据文件到DataFrame,并指定schema
usersDF = spark.read \
	.options(sep="::",header="false") \
	.schema(schema) \
	.csv(usersFile)

# 查看用户数据
usersDF.printSchema()
usersDF.show(5)

执行以上代码,输出结果如下:

root
 |-- userID: long (nullable = false)
 |-- gender: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- occupation: string (nullable = true)
 |-- zipcode: string (nullable = true)

+------+------+---+----------+-------+
|userID|gender|age|occupation|zipcode|
+------+------+---+----------+-------+
|      1|      F|  1|          10|  48067|
|      2|      M| 56|          16|  70072|
|      3|      M| 25|          15|  55117|
|      4|      M| 45|           7|  02460|
|      5|      M| 25|          20|  55455|
+------+------+---+----------+-------+
only showing top 5 rows

(3) 读取评分数据集ratings.dat,加载到DataFrame中,代码如下:

# 定义文件路径
ratingsFile = "/data/spark/ml-1m/ratings.dat"

# 指定一个Schema(模式)
fields = [
    StructField("userID", LongType(), True), 
    StructField("movieID", LongType(), True),
    StructField("rating", IntegerType(), True),
    StructField("timestamp", LongType(), True)
] 
schema = StructType(fields)

# 加载数据文件到DataFrame,并指定schema
ratingsDF = spark.read \
	.options(sep="::",header="false") \
	.schema(schema) \
	.csv(ratingsFile)

# 查看用户数据
ratingsDF.printSchema()
ratingsDF.show(5)

执行以上代码,输出结果如下:

root
 |-- userID: long (nullable = false)
 |-- movieID: long (nullable = false)
 |-- rating: integer (nullable = true)
 |-- timestamp: long (nullable = false)

+------+-------+------+---------+
|userID|movieID|rating|timestamp|
+------+-------+------+---------+
|      1|    1193|     5|978300760|
|      1|     661|     3|978302109|
|      1|     914|     3|978301968|
|      1|    3408|     4|978300275|
|      1|    2355|     5|978824291|
+------+-------+------+---------+
only showing top 5 rows

(4) 将两个DataFrame注册为临时表,对应的表名分别为users和ratings,代码如下:

usersDF.createOrReplaceTempView("users")
ratingsDF.createOrReplaceTempView("ratings")

(5) 通过SQL处理临时表users和ratings中的数据,并输出最终结果。为了简单起见,避免三表连接操作,这里直接使用了movieID,代码如下:

MOVIE_ID = "2116"
sqlStr = f"""select age,gender,count(*) as total_peoples 
              from users as u 
		   join ratings as r on u.userid=r.userid 
              where movieid={MOVIE_ID} group by gender,age"""
resultDF = spark.sql(sqlStr)

# 显示resultDF的内容
resultDF.show()

执行以上代码,输出结果如下:

+---+------+-------------+
|age|gender|total_peoples|
+---+------+-------------+
| 18|      M|             72|
| 18|      F|              9|
| 56|      M|              8|
| 45|      M|             26|
| 45|      F|              3|
| 25|      M|            169|
| 56|      F|              2|
|  1|      M|             13|
|  1|      F|              4|
| 50|      F|              3|
| 50|      M|             22|
| 25|      F|             28|
| 35|      F|             13|
| 35|      M|             66|
+---+------+-------------+

(6) 以交叉表的形式统计不同年龄不同性别用户数,代码如下:

from pyspark.sql.functions import *

resultDF \
   .groupBy("age") \
   .pivot("gender") \
   .agg(sum("total_peoples").alias("cnt")) \
   .show()

执行以上代码,输出结果如下:

+---+---+---+
|age|  F|   M|
+---+---+---+
|  1|  4|  13|
| 35| 13| 66|
| 50|  3| 22|
| 45|  3| 26|
| 25| 28|169|
| 56|  2|  8|
| 18|  9| 72|
+---+---+---+

(7) 在Zeppelin中,支持查询结果的可视化显示。在Zeppelin Notebook的单元格中,执行以下SQL语句,可视化显示数据(注意,第一行必须输入%sql,用来指定使用SQL解释器),代码如下:

%sql
select age,
       gender,
       count(*) as total_peoples 
from users as u join ratings as r 
     on u.userid=r.userid 
where movieid=${MOVIE_ID=2116}
group by gender,age

执行以上代码,输出结果如下图所示: