关于combineByKey

Pair RDD的combineByKey转换与Hadoop MapReduce编程中的combiner非常相似。是一个宽依赖的操作,它在最后阶段需要shuffle数据。另外,它内部会按分区合并元素。

Pair RDD的combineByKey是一个通用函数,它使用一组自定义的聚合函数组合每个key的元素。内部combineByKey函数通过应用聚合函数有效地组合了Pair RDD分区的值。combineByKey转换的主要目标是将任何PairRDD[(K,V)]转换为RDD[(K,C)],其中C是键K下所有值的任何聚合的结果。

Pair RDD的combineByKey函数使用如下三个函数作为参数:

  • createCombiner:在第一次遇到Key时创建组合器函数,将RDD数据集中的V类型value值转换C类型值(V => C);
  • mergeValue:mergeValue:合并值函数,再次遇到相同的Key时,将createCombiner的C类型值与这次传入的V类型值合并成一个C类型值(C,V)=>C。
  • mergeCombiners:合并组合器函数,将C类型值两两合并成一个C类型值。

下面我们通一个示例来理解combineByKey的用法。

首先,假设我们有一个由studentName、subjectName和marks构成的RDD,我们想要得到每个学生的平均成绩。下面是用combineByKey变换求解的步骤。

【示例】给出每个学生每门功课的分数,请使用combinerByKey函数计算每个学生的平均成绩。

在这个例子中,因为要计算平均成绩,需要做sum和count聚合。所以这里的createCombiner函数应该用一个元组(sum, count)来初始化它。对于初始聚合,它应该是(value,1)。

在这个例子中,mergeValue有一个累加器元组(sum, count)。因此,每当我们得到一个新值,marks将被添加到第一个元素,而第二个值(即计数器)将增加1。

在这个例子中,mergeCombiners合并来自各个分区的结果(sum, count)。因此,对于同一个key,需要将各个元组中的sum和count都累加,得到每个学生的总成绩和总课目数。

代码实现如下所示:

# 创建PairRDD student_rdd
student_rdd = sc.parallelize([
  ("Joseph", "Maths", 83), ("Joseph", "Physics", 74), ("Joseph", "Chemistry", 91), 
    ("Joseph", "Biology", 82), ("Jimmy", "Maths", 69), ("Jimmy", "Physics", 62), 
    ("Jimmy", "Chemistry", 97), ("Jimmy", "Biology", 80), ("Tina", "Maths", 78), 
    ("Tina", "Physics", 73), ("Tina", "Chemistry", 68), ("Tina", "Biology", 87), 
    ("Thomas", "Maths", 87), ("Thomas", "Physics", 93), ("Thomas", "Chemistry", 91), 
    ("Thomas", "Biology", 74), ("Cory", "Maths", 56), ("Cory", "Physics", 65), 
    ("Cory", "Chemistry", 71), ("Cory", "Biology", 68), ("Jackeline", "Maths", 86), 
    ("Jackeline", "Physics", 62), ("Jackeline", "Chemistry", 75), ("Jackeline", "Biology", 83), 
    ("Juan", "Maths", 63), ("Juan", "Physics", 69), ("Juan", "Chemistry", 64), 
    ("Juan", "Biology", 60)], 3)
 
# 定义createCombiner, mergeValue 和mergeCombiner 函数
def createCombiner(tpl):
    return (tpl[1], 1)
    
def mergeValue(accumulator, element): 
    return (accumulator[0] + element[1], accumulator[1] + 1)
    
def mergeCombiner(accumulator1, accumulator2): 
    return (accumulator1[0] + accumulator2[0], accumulator1[1] + accumulator2[1])
 
comb_rdd = student_rdd.map(lambda t: (t[0], (t[1], t[2]))) \
                    .combineByKey(createCombiner, mergeValue, mergeCombiner) \
                    .map(lambda t: (t[0], t[1][0]/t[1][1]))

# 查看输出
for tpl in comb_rdd.collect():
    print(tpl)

执行以上代码,输出结果如下:

('Jimmy', 77.0)
('Tina', 76.5)
('Thomas', 86.25)
('Juan', 64.0)
('Joseph', 82.5)
('Cory', 65.0)
('Jackeline', 76.5)

combineByKey转换函数的特点总结如下:

  • combineByKey是一个通用的转换,而groupByKey、reduceByKey和aggregateByKey转换的内部实现使用了combineByKey;
  • combineByKey转换可灵活执行map或reduce端combine;
  • combineByKey转换的使用更加复杂;
  • 总是需要实现三个函数:createCombiner、mergeValue、mergeCombiner;
  • combineByKey是一个转换操作,因此它的计算是惰性的;
  • 它是一个宽依赖的操作,因为它在聚合的最后阶段shuffle数据并创建另一个RDD。

《Spark原理深入与编程实战》