发布日期:2022-03-14 VIP内容

PySpark RDD编程案例_电影数据集分析

下面的示例使用Spark RDD实现对电影数据集进行分析。在这里我们使用推荐领域一个著名的开放测试数据集movielens。我们将使用其中的电影评分数据集ratings.csv以及电影数据集movies.csv。这两个数据集已经放在了PBLP平台的/home/hduser/data/spark/movielens/目录下。

【例】请找出平均评分超过4.0的电影,列表显示。

实现过程和代码如下。

1)加载数据,构造RDD:

from pyspark.sql import SparkSession

# 构建SparkSession和SparkContext实例
spark = SparkSession.builder \
   .master("spark://xueai8:7077") \
   .appName("pyspark demo") \
   .getOrCreate()

sc = spark.sparkContext

# 加载数据,构造RDD
ratings = "file:///home/hduser/data/spark/movielens/ratings.csv"    	# 评分数据集
movies = "file:///home/hduser/data/spark/movielens/movies.csv"   	# 电影数据集

ratingsRDD = sc.textFile(ratings)
moviesRDD = sc.textFile(movies)

print("评分数据集中数据总记录数量:",ratingsRDD.count())			# 评分数据集中数据总记录数量
ratingsRDD.cache()			# 缓存评分数据集

print("电影数据集中数据总记录数量:",moviesRDD.count())			# 电影数据集中数据总记录数量
moviesRDD.cache()			# 缓存电影数据集

执行以上代码,输出结果如下:

评分数据集中数据总记录数量: 100837
电影数据集中数据总记录数量: 9743

可以看到,评分数据集ratingsRDD中总共有100837条评论记录,电影数据集moviesRDD中总共有9743部电影信息。

2)对评分数据集和电影数据集进行简单探索,了解数据:

# 对评分数据集进行简单探索,了解数据:
for row in ratingsRDD.take(5):
    print(row)
    
print()   	 # 换行

# 对电影数据集进行简单探索,了解数据:
for row in moviesRDD.take(5):
    print(row)

执行以上代码,输出结果如下所示:

userId,movieId,rating,timestamp
1,1,4.0,964982703
1,3,4.0,964981247
1,6,4.0,964982224
1,47,5.0,964983815

movieId,title,genres
1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy
2,Jumanji (1995),Adventure|Children|Fantasy
3,Grumpier Old Men (1995),Comedy|Romance
4,Waiting to Exhale (1995),Comedy|Drama|Romance

可以看到,这两个数据集的第1行都是标题行。

3)处理评分数据集,包括:忽略标题行,并抽取(movieId,rating)字段。代码如下所示:

# 自定义map转换函数
def ratingsMapFun(line):
    fileds = line.split(",")
    return (int(fileds[1]), float(fileds[2]))

# 删除标题行,并抽取(movieId,rating)字段
rating = ratingsRDD \
    .filter(lambda line: not line.startswith("userId")) \
    .map(ratingsMapFun)  

# 查看前5条
for row in rating.take(5):
    print(row)

执行以上代码,输出结果如下:

(1, 4.0)
(3, 4.0)
(6, 4.0)
(47, 5.0)
(50, 5.0)

可以看到,已经去掉了标题行,并在rating RDD中只保留了movieId(电影ID)和rating(评分)这两个字段。

4)对rating RDD进行转换,按key(即movieId)进行分组,并计算每一组的平均值(也就是每部电影的平均评分)。代码如下所示:

# 定义计算平均评分的函数
def avgFun(t):
    avg = sum(t[1]) / len(t[1])
    return (t[0], avg)

# 获得(movieid,ave_rating) 
movieScores = rating.groupByKey().map(avgFun) 

# 查看前5条数据
for row in movieScores.take(5):
    print(row)

执行以上代码,输出结果如下:

(6, 3.946078431372549)
(50, 4.237745098039215)
(70, 3.5090909090909093)
(110, 4.031645569620253)
(216, 3.326530612244898)

可以看出,经过这一步处理后,返回的是一个元组,元组元素为(movieId,平均得分)。但是这个结果对用户来说并不友好,因为它只显示了电影的ID。下一步显示对应的电影名称及其平均得分。

5)处理电影数据集,包括:忽略标题行;抽取(movieId,movieName)字段。代码如下所示:

# 自定义map转换函数
def movieMapFun(line):
    fileds = line.split(",")
    return (int(fileds[0]), fileds[1])

# 删除标题行,并抽取出(MovieID,MovieName)
movieskey = moviesRDD \
    .filter(lambda line: not line.startswith("movieId")) \
    .map(movieMapFun)

# 查看前5条
for row in movieskey.take(5):
    print(row)

执行以上代码,输出结果如下所示:

('1', 'Toy Story (1995)')
('2', 'Jumanji (1995)')
('3', 'Grumpier Old Men (1995)')
('4', 'Waiting to Exhale (1995)')
('5', 'Father of the Bride Part II (1995)')

6)最后一步,将movieScores RDD和movieskey RDD进行join连接,从而得到每部电影的名称及其得分。代码如下所示:

result = movieScores \
    .join(movieskey) \
    .filter(lambda f: f[1][0]>4.0) \
    .map(lambda f: (f[0],f[1][1],f[1][0]))

# 查看前5条
for row in result.take(5):
    print(row)

执行以上代码,输出结果如下:

(260, 'Star Wars: Episode IV - A New Hope (1977)', 4.231075697211155)
(296, 'Pulp Fiction (1994)', 4.197068403908795)
(356, 'Forrest Gump (1994)', 4.164133738601824)
(608, 'Fargo (1996)', 4.116022099447513)
(1136, 'Monty Python and the Holy Grail (1975)', 4.161764705882353)