RDD案例_电影评分数据集分析
任务描述:请使用Spark RDD实现对电影数据集进行分析,找出平均评分超过4.0的电影列表。
- 数据集说明:
- 这里我们使用推荐领域一个著名的开放测试数据集movielens。我们将使用其中的电影评分数据集ratings.csv以及电影数据集movies.csv。
- 数据集位于PBLP平台的“~/data/spark/movielens/”目录下。
【例】请找出平均评分超过4.0的电影,列表显示,并将查找结果保存到文件中。
请按以下步骤执行。
1) 加载数据,构造RDD。
val ratingsFile = "/home/hduser/data/spark/movielens/ratings.csv" // 评分数据集 val moviesFile = "/home/hduser/data/spark/movielens/movies.csv" // 电影数据集 val ratingsRDD = sc.textFile(ratingsFile) val moviesRDD = sc.textFile(moviesFile)
2) 简单探索。
println("电影评分数据集中记录数量:" + ratingsRDD.count())
println("电影数据集中记录数量:" + moviesRDD.count())
3) 从评分数据集中抽取每部电影的评分,以(movieid, rating)的形式返回。
val movieAvgScore = ratingsRDD
// 因为第一行是标题行,所以需要过滤掉
.filter(line => !line.startsWith("userId"))
// 转换为(movieid, rating)元组形式
.map(line => {
val fields = line.split(",")
(fields(1).trim.toInt, fields(2).trim.toDouble)
})
// 按电影id分组
.groupByKey()
// 计算每部 电影的平均评分,格式为元组(movieid, avg_rating)
.map(t => (t._1, t._2.sum/t._2.size))
// 过滤出平均评分超过4.0的
.filter(t => t._2 > 4.0)
4) 从电影数据集中抽取电影名称,以(movieId, title)的形式返回。
val moviesInfo = moviesRDD
// 因为第一行是标题行,所以需要过滤掉
.filter(line => !line.startsWith("movieId"))
// 转换为(movieid, title)元组形式
.map(line => {
val fields = line.split(",")
(fields(0).toInt, fields(1))
})
5) 将两个数据集连接起来,得到(movieId, title, avgScore)类型结果。
val result = movieAvgScore.join(moviesInfo)
.map(f => (f._2._1,(f._1, f._2._2, f._2._1)))
.sortByKey(ascending = false)
.map(t => t._2)
6) 结果列表显示。
result.collect.foreach(println)
7) 将查询结果保存到HDFS文件系统中。
result.saveAsTextFile("/data/spark/movielens/result")
完整示例代码
完整示例代码如下:
import org.apache.spark.sql.SparkSession
object MoviesDemo {
def main(args: Array[String]): Unit = {
// 创建一个SparkContext来初始化Spark
// 2.0 以前的用法
// val conf = new SparkConf().setMaster("local").setAppName("movie demo")
// val sc = new SparkContext(conf)
// 2.0 以后的用法
val spark = SparkSession.builder().master("local[*]").appName("movie demo").getOrCreate()
val sc = spark.sparkContext
// 加载数据,构造RDD(注:这里数据集放在项目的src/data/movielens/目录下)
val ratingsFile = "src/data/movielens/ratings.csv" // 评分数据集
val moviesFile = "src/data/movielens/movies.csv" // 电影数据集
val ratingsRDD = sc.textFile(ratingsFile)
val moviesRDD = sc.textFile(moviesFile)
// 从评分数据集中抽取每部电影的评分,以(movieid, rating)的形式返回
// 因为第一行是标题行,所以过滤掉
val movieAvgScore = ratingsRDD
.filter(line => !line.startsWith("userId"))
.map(line => {val fields = line.split(","); (fields(1).trim.toInt, fields(2).trim.toDouble)})
.groupByKey()
.map(t => (t._1, t._2.sum/t._2.size))
.filter(t => t._2 > 4.0)
// 从电影数据集中抽取电影名称,以(movieId, movieName)的形式返回
// 因为第一行是标题行,所以过滤掉
val moviesInfo = moviesRDD
.filter(line => !line.startsWith("movieId"))
.map(line => {val fields = line.split(","); (fields(0).toInt, fields(1))})
// 将两个数据集连接起来,得到(movieId, movieName, avgScore)
val result = movieAvgScore.join(moviesInfo)
.map(f => (f._2._1,(f._1, f._2._2, f._2._1)))
.sortByKey(ascending = false)
.map(t => t._2)
// 列表显示
result.collect.foreach(println)
// 将查询结果保存到HDFS文件系统中
result.saveAsTextFile("/data/spark/movielens/result")
}
}