Hadoop MapReduce架构

一、什么是Hadoop中的MapReduce?

MapReduce是一种用于处理大量数据的软件框架和编程模型。 MapReduce程序工作分为两个阶段,即Map和Reduce。 Map任务并发地处理数据的分割和映射,而Reduce任务则是shuffle和Reduce数据。

Hadoop能够运行用各种语言编写的MapReduce程序:Java、Ruby、Python和c++。 云计算中的Map Reduce程序本质上是并行的,因此对于在集群中使用多台机器进行大规模数据分析非常有用。

每个阶段的输入是key-value对。此外,还需要开发人员来指定两个函数:map函数和reduce函数。

二、MapReduce架构

MapReduce计算模型的整个过程经历四个阶段:分割、映射、变换和还原。

让我们用一个MapReduce例子来理解。

考虑到我们的大数据程序中的MapReduce有以下输入数据:

Welcome to Hadoop Class
Hadoop is good
Hadoop is bad

我们使用MapReduce来统计每个单词出现的次数-单词计数。MapReduce计算的过程如下:

MapReduce任务的最终输出是:

单词 出现次数
bad 1
Class 1
good 1
Hadoop 3
is 2
to 1
Welcome 1

数据经过MapReduce的以下几个阶段:

  • 输入切片:在大数据作业中,MapReduce的输入被划分为固定大小的片段,称为输入切片。输入切片是单个map使用的输入块。
  • mapping:这是执行map-reduce程序的第一个阶段。在这个阶段中,每个分片中的数据被传递给一个map函数以产生输出值。 在我们的示例中,map阶段的工作是统计每个单词从输入分片中出现的次数,并准备一个<单词,频率>的列表。
  • shuffling:这个阶段使用Mapping阶段的输出。它的任务是合并Mapping阶段输出的相关记录。在我们的示例中,将相同的单词连同它们各自的频率组合在一起。
  • reducing:在这个阶段中,从shuffling阶段的输出值被聚合。这个阶段结合了来自shuffling阶段的值,并返回一个单独的输出值。 简而言之,这个阶段汇总了完整的数据集。 在我们的例子中,这个阶段汇总了来自Shuffling阶段的值,也就是说,计算每个单词的出现总数。

三、详细说明MapReduce架构

下面我们对MapReduce架构进行详细的说明:

  • 为每个分片创建一个map任务,然后为分片中的每个记录执行map函数。
  • 进行多个分片总是有益的,因为处理一个分片所花费的时间与处理整个输入所花费的时间相比要小。当分片较小时,处理更好地实现负载平衡,因为我们并行处理分片。
  • 然而,也不希望分片的尺寸太小。当分片过小时,管理分片和map任务创建的负担就会开始支配整个作业执行时间。
  • 对于大多数作业来说,最好将分片大小设置为HDFS块的大小(默认为128M)。
  • 执行map任务会将输出写到相应节点上的本地磁盘上,而不是写到HDFS上。
  • 选择本地磁盘而不是HDFS的原因是为了避免在HDFS存储操作时发生复制。
  • Map输出是中间输出,由reduce任务处理以产生最终输出。一旦作业完成,就可以丢弃map输出。因此,通过复制将其存储在HDFS中就显得过于夸张了。
  • 在节点故障的情况下,在reduce任务消费map输出之前,Hadoop会在另一个节点上重新运行map任务,并重新创建map输出。
  • Reduce任务不能处理数据局部性的概念。每个map任务的输出都被提供给reduce任务。Map输出被传输到reduce任务正在运行的机器上。
  • 在这台机器上,输出被合并,然后传递给用户定义的reduce函数。
  • 与map输出不同,reduce输出存储在HDFS中(第一个副本存储在本地节点上,其他副本存储在off-rack节点上)。

四、Hadoop MapReduce计算工作流

下图是一个典型的Hadoop MapReduce计算工作流。

其中,InputFormat从HDFS读取输入数据,并解析该数据,为map函数创建key-value对输入。InputFormat还执行数据的逻辑分区以创建计算的Map任务。

一个典型的MR计算会为每个输入的HDFS数据块创建一个Map任务。对于每个生成的key-value对,Hadoop会调用用户所提供的map函数来计算。

从Hadoop v2开始,对于map函数的输入数据,还可能会涉及到combiner这一步(这一步是可选的)。

到了Partitioner这一步,开始对Map任务的输出数据进行分区,以便将数据发送到对应的Reduce任务中。 使用Map任务输出的key-value对中的key字段进行分区,分区的数量等于Reduce任务的数量。

每个Reduce任务从Map任务抓取相应的输出数据分区(这称为shuffling),并基于key字段对数据执行一个归并排序(merge sort)。 在调用reduce函数之前,Hadoop还对输入数据分组(基于数据的key字段)。

从Reduce任务输出的key-value对将被写入到HDFS(基于OutputFormat类指定的格式)。

五、MapReduce是如何工作的?

Hadoop将作业(job)划分为多个任务(task)。有两种类型的任务:

  • Map tasks (Splits & Mapping)
  • Reduce tasks (Shuffling, Reducing)

Hadoop MapReduce作业提交和执行过程如下图所示:

  • 步骤1:用户向YARN中提交应用程序,其中包括ApplicationMaster程序、启动ApplicationMaster的命令、用户程序等。
  • 步骤2:ResourceManager为该应用程序分配第一个Container,并与对应的Node-Manager通信,要求它在这个Container中启动应用程序的ApplicationMaster。
  • 步骤3:ApplicationMaster首先向ResourceManager注册,这样用户可以直接通过ResourceManager查看应用程序的运行状态,然后它将为各个任务申请资源,并监控它的运行状态,直到运行结束,即重复步骤4~7。
  • 步骤4:ApplicationMaster采用轮询的方式通过RPC协议向ResourceManager申请和领取资源。
  • 步骤5:一旦ApplicationMaster申请到资源后,便与对应的NodeManager通信,要求它启动任务。
  • 步骤6:NodeManager为任务设置好运行环境(包括环境变量、JAR包、二进制程序等)后,将任务启动命令写到一个脚本中,并通过运行该脚本启动任务。
  • 步骤7:各个任务通过某个RPC协议向ApplicationMaster汇报自己的状态和进度,以让ApplicationMaster随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务。在应用程序运行过程中,用户可随时通过RPC向ApplicationMaster查询应用程序的当前运行状态。
  • 步骤8:应用程序运行完成后,ApplicationMaster向ResourceManager注销并关闭自己。

《PySpark原理深入与编程实战》