PySpark RDD编程案例_电影数据集分析
下面的示例使用Spark RDD实现对电影数据集进行分析。在这里我们使用推荐领域一个著名的开放测试数据集movielens。我们将使用其中的电影评分数据集ratings.csv以及电影数据集movies.csv。这两个数据集已经放在了PBLP平台的/home/hduser/data/spark/movielens/目录下。
【例】请找出平均评分超过4.0的电影,列表显示。
实现过程和代码如下。
1)加载数据,构造RDD:
from pyspark.sql import SparkSession
# 构建SparkSession和SparkContext实例
spark = SparkSession.builder \
.master("spark://xueai8:7077") \
.appName("pyspark demo") \
.getOrCreate()
sc = spark.sparkContext
# 加载数据,构造RDD
ratings = "file:///home/hduser/data/spark/movielens/ratings.csv" # 评分数据集
movies = "file:///home/hduser/data/spark/movielens/movies.csv" # 电影数据集
ratingsRDD = sc.textFile(ratings)
moviesRDD = sc.textFile(movies)
print("评分数据集中数据总记录数量:",ratingsRDD.count()) # 评分数据集中数据总记录数量
ratingsRDD.cache() # 缓存评分数据集
print("电影数据集中数据总记录数量:",moviesRDD.count()) # 电影数据集中数据总记录数量
moviesRDD.cache() # 缓存电影数据集
执行以上代码,输出结果如下:
评分数据集中数据总记录数量: 100837 电影数据集中数据总记录数量: 9743
可以看到,评分数据集ratingsRDD中总共有100837条评论记录,电影数据集moviesRDD中总共有9743部电影信息。
2)对评分数据集和电影数据集进行简单探索,了解数据:
# 对评分数据集进行简单探索,了解数据:
for row in ratingsRDD.take(5):
print(row)
print() # 换行
# 对电影数据集进行简单探索,了解数据:
for row in moviesRDD.take(5):
print(row)
执行以上代码,输出结果如下所示:
userId,movieId,rating,timestamp 1,1,4.0,964982703 1,3,4.0,964981247 1,6,4.0,964982224 1,47,5.0,964983815 movieId,title,genres 1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy 2,Jumanji (1995),Adventure|Children|Fantasy 3,Grumpier Old Men (1995),Comedy|Romance 4,Waiting to Exhale (1995),Comedy|Drama|Romance
可以看到,这两个数据集的第1行都是标题行。
3)处理评分数据集,包括:忽略标题行,并抽取(movieId,rating)字段。代码如下所示:
# 自定义map转换函数
def ratingsMapFun(line):
fileds = line.split(",")
return (int(fileds[1]), float(fileds[2]))
# 删除标题行,并抽取(movieId,rating)字段
rating = ratingsRDD \
.filter(lambda line: not line.startswith("userId")) \
.map(ratingsMapFun)
# 查看前5条
for row in rating.take(5):
print(row)
执行以上代码,输出结果如下:
(1, 4.0) (3, 4.0) (6, 4.0) (47, 5.0) (50, 5.0)
可以看到,已经去掉了标题行,并在rating RDD中只保留了movieId(电影ID)和rating(评分)这两个字段。
4)对rating RDD进行转换,按key(即movieId)进行分组,并计算每一组的平均值(也就是每部电影的平均评分)。代码如下所示:
# 定义计算平均评分的函数
def avgFun(t):
avg = sum(t[1]) / len(t[1])
return (t[0], avg)
# 获得(movieid,ave_rating)
movieScores = rating.groupByKey().map(avgFun)
# 查看前5条数据
for row in movieScores.take(5):
print(row)
执行以上代码,输出结果如下:
(6, 3.946078431372549) (50, 4.237745098039215) (70, 3.5090909090909093) (110, 4.031645569620253) (216, 3.326530612244898)
可以看出,经过这一步处理后,返回的是一个元组,元组元素为(movieId,平均得分)。但是这个结果对用户来说并不友好,因为它只显示了电影的ID。下一步显示对应的电影名称及其平均得分。
5)处理电影数据集,包括:忽略标题行;抽取(movieId,movieName)字段。代码如下所示:
# 自定义map转换函数
def movieMapFun(line):
fileds = line.split(",")
return (int(fileds[0]), fileds[1])
# 删除标题行,并抽取出(MovieID,MovieName)
movieskey = moviesRDD \
.filter(lambda line: not line.startswith("movieId")) \
.map(movieMapFun)
# 查看前5条
for row in movieskey.take(5):
print(row)
执行以上代码,输出结果如下所示:
('1', 'Toy Story (1995)')
('2', 'Jumanji (1995)')
('3', 'Grumpier Old Men (1995)')
('4', 'Waiting to Exhale (1995)')
('5', 'Father of the Bride Part II (1995)')
6)最后一步,将movieScores RDD和movieskey RDD进行join连接,从而得到每部电影的名称及其得分。代码如下所示:
result = movieScores \
.join(movieskey) \
.filter(lambda f: f[1][0]>4.0) \
.map(lambda f: (f[0],f[1][1],f[1][0]))
# 查看前5条
for row in result.take(5):
print(row)
执行以上代码,输出结果如下:
(260, 'Star Wars: Episode IV - A New Hope (1977)', 4.231075697211155) (296, 'Pulp Fiction (1994)', 4.197068403908795) (356, 'Forrest Gump (1994)', 4.164133738601824) (608, 'Fargo (1996)', 4.116022099447513) (1136, 'Monty Python and the Holy Grail (1975)', 4.161764705882353)