Spark RDD编程案例
Top N 问题
【示例】给出一个员工信息名单,找出收入最高的前10名员工(Top N问题)。
样本数据 employees.csv内容如下:
ename,title,department,Full or Part-Time,Salary or Hourly,Typical Hours,Annual Salary,Hourly Rate 张三,paramedic i/c,fire,f,salary,,91080.00, 李四,lieutenant,fire,f,salary,,114846.00, 王老五,sergeant,police,f,salary,,104628.00, 赵六,police officer,police,f,salary,,96060.00, 钱七,clerk iii,police,f,salary,,53076.00, 周扒皮,firefighter,fire,f,salary,,87006.00, 吴用,law clerk,law,f,hourly,35,,14.51
实现代码如下。
// RDD实现
val inputPath = "file:///home/hduser/data/spark_demo/employees.csv"
val rdd = sc.textFile(inputPath);
// def sortBy[K](f: (T) ⇒ K, ascending: Boolean = true, numPartitions: Int = this.partitions.length)
val sortedData = rdd.map(_.split(","))
.sortBy(t => if(t(6).length>0) t(6).toFloat else 0.0, false)
val top = sortedData.take(10)
top.foreach(emp => println(emp.toList.mkString(",")))
执行以上代码,得到如下的结果:
李四,lieutenant,fire,f,salary,,114846.00 王老五,sergeant,police,f,salary,,104628.00 赵六,police officer,police,f,salary,,96060.00 张三,paramedic i/c,fire,f,salary,,91080.00 周扒皮,firefighter,fire,f,salary,,87006.00 钱七,clerk iii,police,f,salary,,53076.00 吴用,law clerk,law,f,hourly,35,,14.51
合并小文件
【例】实现对HDFS上小文件的合并。
使用SparkContext的wholeTextFiles方法和colleasc方法,可以实现对小文件的合并。
import org.apache.spark.sql.SparkSession
/**
*
* 加载整个目录中的文件,使用wholeTextFiles
*/
object WordCount3 {
def main(args: Array[String]): Unit = {
// 创建SparkSession实例 - 入口
val spark = SparkSession.builder.master("local[*]").appName("HelloWorld").getOrCreate
// 加载数据源,构造RDD
val textFiles = spark.sparkContext.wholeTextFiles("input/files")
textFiles.map(_._2).coalesce(1).saveAsTextFile("output/one")
}
}
二次排序
【示例】使用Spark RDD实现二次排序。
什么是二次排序?二次排序就是对于
假设我们有以下输入文件data.txt,其中逗号分割的分别是年、月和总数:
2018,5,22
2019,1,24
2018,2,128
2019,3,56
2019,1,3
2019,2,-43
2019,4,5
2019,3,46
2018,2,64
2019,1,4
2019,1,21
2019,2,35
2019,2,0
我们想要对这些数据排序,期望的输出结果如下:
2018-2 64,128
2018-5 22
2019-1 3,4,21,24
2019-2 -43,0,35
2019-3 46,56
2019-4 5
Spark 二次排序解决方案如下:需要将年和月组合起来构成一个Key,将第三列作为value,并使用 groupByKey 函数将同一个Key的所有Value全部分组到一起,然后对同一个Key的所有Value进行排序即可。
// 加载数据集
val inputPath = "file:///home/hduser/data/spark/data.txt"
val inputRDD = sc.textFile(inputPath)
// 实现二次排序
val sortedRDD = inputRDD
.map(line => {
val arr = line.split(",")
val key = arr(0) + "-" + arr(1)
val value = arr(2)
(key,value)
})
.groupByKey()
.map(t => (t._1, t._2.toList.sortWith(_.toInt < _.toInt).mkString(",")))
.sortByKey(true) // true:升序,false:降序
// 结果输出
sortedRDD.collect.foreach(t => println(t._1 + "\t" + t._2))