PySpark RDD编程案例_数据聚合计算
【示例】给定一些销售数据,数据采用键值对的形式(公司,收入),求出每个公司的总收入和平均收入。
提示:可直接用sc.parallelize在内存中生成数据,在求每个公司总收入时,先分三个分区进行求和,然后再把三个分区进行合并。只需要编写RDD combineByKey函数的前三个参数的实现。
from pyspark.sql import SparkSession
# 构建SparkSession和SparkContext实例
spark = SparkSession.builder \
.master("spark://xueai8:7077") \
.appName("pyspark demo") \
.getOrCreate()
sc = spark.sparkContext
# 构造RDD
data = sc.parallelize([("company-1",92),("company-1",85),("company-1",82),
("company-2",78),("company-2",96),("company-2",85),
("company-3",88),("company-3",94),("company-3",80)], 3)
# 定义createCombiner, mergeValue 和mergeCombiner 函数
def createCombiner(income):
return (income, 1)
def mergeValue(acc, income):
return (acc[0] + income, acc[1] + 1)
def mergeCombiner(acc1, acc2):
return (acc1[0] + acc2[0], acc1[1] + acc2[1])
cbk = data \
.combineByKey(createCombiner, mergeValue, mergeCombiner) \
.map(lambda t: (t[0], t[1][0], t[1][0]/float(t[1][1])))
# 查看输出
for row in cbk.collect():
print(row)
执行以上代码,输出结果如下:
('company-1', 259, 86.33333333333333)
('company-3', 262, 87.33333333333333)
('company-2', 259, 86.33333333333333)