发布日期:2022-07-31
VIP内容
PySpark SQL分析案例:电影评分数据集分析
本节使用PySpark SQL实现对电影数据集进行分析。在这里使用推荐领域一个著名的开放测试数据集movielens。MovieLens数据集包括电影元数据信息和用户属性信息。本例将使用其中的users.dat和ratings.dat两个数据集。
【示例】使用PySpark DataFrame API统计看过电影“Lord of the Rings,The(1978)”的用户的年龄和性别分布(提示该影片的id是2116)。
请按以下步骤执行。
(1) 首先构造一个SparkSession实例,代码如下:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
# 创建SparkSession的实例
spark = SparkSession.builder \
.master("spark://localhost:7077") \
.appName("pyspark sql demo") \
.config("spark.hadoop.hive.exec.dynamic.partition", "true") \
.config("spark.hadoop.hive.exec.dynamic.partition.mode","nonstrict") \
.enableHiveSupport() \
.getOrCreate()
(2) 读取用户数据集users.dat,加载到DataFrame中,代码如下:
from pyspark.sql.types import *
# 定义文件路径
usersFile = "/data/spark/ml-1m/users.dat"
# 指定一个Schema(模式)
fields = [
StructField("userID", LongType(), True),
StructField("gender", StringType(), True),
StructField("age", IntegerType(), True),
StructField("occupation", StringType(), True),
StructField("zipcode", StringType(), True)
]
schema = StructType(fields)
# 加载数据文件到DataFrame,并指定schema
usersDF = spark.read \
.options(sep="::",header="false") \
.schema(schema) \
.csv(usersFile)
# 查看用户数据
usersDF.printSchema()
usersDF.show(5)
执行以上代码,输出结果如下:
root |-- userID: long (nullable = false) |-- gender: string (nullable = true) |-- age: integer (nullable = true) |-- occupation: string (nullable = true) |-- zipcode: string (nullable = true) +------+------+---+----------+-------+ |userID|gender|age|occupation|zipcode| +------+------+---+----------+-------+ | 1| F| 1| 10| 48067| | 2| M| 56| 16| 70072| | 3| M| 25| 15| 55117| | 4| M| 45| 7| 02460| | 5| M| 25| 20| 55455| +------+------+---+----------+-------+ only showing top 5 rows
(3) 读取评分数据集ratings.dat,加载到DataFrame中,代码如下:
# 定义文件路径
ratingsFile = "/data/spark/ml-1m/ratings.dat"
# 指定一个Schema(模式)
fields = [
StructField("userID", LongType(), True),
StructField("movieID", LongType(), True),
StructField("rating", IntegerType(), True),
StructField("timestamp", LongType(), True)
]
schema = StructType(fields)
# 加载数据文件到DataFrame,并指定schema
ratingsDF = spark.read \
.options(sep="::",header="false") \
.schema(schema) \
.csv(ratingsFile)
# 查看用户数据
ratingsDF.printSchema()
ratingsDF.show(5)
执行以上代码,输出结果如下:
root |-- userID: long (nullable = false) |-- movieID: long (nullable = false) |-- rating: integer (nullable = true) |-- timestamp: long (nullable = false) +------+-------+------+---------+ |userID|movieID|rating|timestamp| +------+-------+------+---------+ | 1| 1193| 5|978300760| | 1| 661| 3|978302109| | 1| 914| 3|978301968| | 1| 3408| 4|978300275| | 1| 2355| 5|978824291| +------+-------+------+---------+ only showing top 5 rows
(4) 将两个DataFrame注册为临时表,对应的表名分别为users和ratings,代码如下:
usersDF.createOrReplaceTempView("users")
ratingsDF.createOrReplaceTempView("ratings")
(5) 通过SQL处理临时表users和ratings中的数据,并输出最终结果。为了简单起见,避免三表连接操作,这里直接使用了movieID,代码如下:
MOVIE_ID = "2116"
sqlStr = f"""select age,gender,count(*) as total_peoples
from users as u
join ratings as r on u.userid=r.userid
where movieid={MOVIE_ID} group by gender,age"""
resultDF = spark.sql(sqlStr)
# 显示resultDF的内容
resultDF.show()
执行以上代码,输出结果如下:
+---+------+-------------+ |age|gender|total_peoples| +---+------+-------------+ | 18| M| 72| | 18| F| 9| | 56| M| 8| | 45| M| 26| | 45| F| 3| | 25| M| 169| | 56| F| 2| | 1| M| 13| | 1| F| 4| | 50| F| 3| | 50| M| 22| | 25| F| 28| | 35| F| 13| | 35| M| 66| +---+------+-------------+
(6) 以交叉表的形式统计不同年龄不同性别用户数,代码如下:
from pyspark.sql.functions import *
resultDF \
.groupBy("age") \
.pivot("gender") \
.agg(sum("total_peoples").alias("cnt")) \
.show()
执行以上代码,输出结果如下:
+---+---+---+ |age| F| M| +---+---+---+ | 1| 4| 13| | 35| 13| 66| | 50| 3| 22| | 45| 3| 26| | 25| 28|169| | 56| 2| 8| | 18| 9| 72| +---+---+---+
(7) 在Zeppelin中,支持查询结果的可视化显示。在Zeppelin Notebook的单元格中,执行以下SQL语句,可视化显示数据(注意,第一行必须输入%sql,用来指定使用SQL解释器),代码如下:
%sql
select age,
gender,
count(*) as total_peoples
from users as u join ratings as r
on u.userid=r.userid
where movieid=${MOVIE_ID=2116}
group by gender,age
执行以上代码,输出结果如下图所示: