Hadoop 中 MapReduce 最核心的思想就是分而治之,通过 MapReduce 这个名字就可以看出,MapReduce 包含有 Map 和 Reduce 两个部分。它将一个大型的计算问题分解成一个个小的,简单的计算任务,交给 MapReduce 中的 Map 部分执行,随后 Reduce 部分会对 Map 部分输出的中间结果进行聚合计算,输出最终的统计结果。
为了方便大家理解,可以看下 MapReduce 的简要模型图:
每个子任务在框架中都是高度并行计算的,然后 MapReduce 框架将各个计算子任务的计算结果进行合并,得出最终的计算结果。
每个子任务在 MapReduce 内部都是高度并行计算的,子任务的高度并行化极大地提高了 Hadoop 处理海量数据的性能。MapReduce 的并行计算模型如图所示:
由图可知,MapReduce 框架将一个大型的计算任务拆分为多个简单的计算任务,交由多个 Map 并行计算,每个 Map 的计算结果经过中间结果处理阶段的处理后输入 Reduce 阶段,Reduce 阶段将输入的数据进行合并处理,输出最终的计算结果 。
同时,用户无须关心 MapReduce 底层各个节点之间的通信机制与通信过程,只需简单地编写 map() 函数和 reduce() 函数即可开发 Hadoop MapReduce 程度。
MapReduce 框架由一个主节点(ResourceManager)、多个子节点(NodeManager)和每个执行任务的 MR AppMaster 共同组成 。通常会将 MapReduce 的计算节点和存储节点部署在同一台服务器上,如图所示:
这种部署结构可以使 MapReduce 框架在已经存储好数据的节点上快速、高效地调度任务,尽可能地不用通过 RPC 从其他服务器上获取数据来执行任务,使整个集群的网络带宽被高效利用,极大地提升了处理任务的效率。
MapReduce 编程模型简化了分布式系统中并行计算的复杂度,开发人员能够不必关心 MapReduce 程序的底层实现细节,只专注于解决业务需求。
在 MapReduce 框架内部,整个运行流程可以分为如下四个阶段,其中每个阶段中的数据传输格式也不一样。
简单运行流程如下所示:
大致流程:
(1)原始数据经过 Hadoop 框架的处理,将 “(k,原始数据行)”格式的数据输入 Map 阶段,即 Map 阶段接收到的数据都是 “(k,元素数据行)”的。
(2)数据经过 Map 阶段处理之后,输出 “{(k1,v1),(k2,v2)}”格式的中间结果
(3)Map阶段输出的中间结果经由 Hadoop 的中间结果处理阶段(如聚合、排序等)之后,会形成 “ {(k1,[v1,v2]) …} ”格式的数据
(4)中间结果处理阶段形成的 “{(k1,[v1,v2]) …}”格式的数据会输入 Reduce 阶段进行处理。此时,key相同的数据会被输入进同一个 Reduce 函数进行处理(也可以由用户自定义数据分发规则)
(5)数据经过 Reduce 阶段处理之后,最终会形成“{(k1,v3)}” 格式的数据存入 HDFS 中
另外,如果觉得不够清晰,也可以参考下下面这个版本的 MapReduce 运行流程。
(1)原始数据被切分为多个小的数据分片输入 map() 函数,这些小的数据分片往往是原始数据的数据行,它们以 “(k,line)” 的格式输入 map() 函数,其中 k 表示数据的偏移量,line 表示整行数据。
(2)map() 函数并行处理输入的数据分片,根据具体的业务规则对输入的数据进行相应的处理,输出中间处理结果,这些中间处理结果往往以“{(k1,v1),(k2,v2)}” 的格式存在。
(3)中间处理阶段将 map() 函数输出的中间结果根据 key 进行聚合处理,输出聚合结果,这些聚合结果的格式为:“{(k1,[v1,v2])}”。
(4)中间处理阶段将输出的聚合结果输入 reduce () 函数进行处理( key相同的数据会被输入同一个 reduce()函数中,用户也可以自定义数据分发规则 ),reduce()函数对这些数据进行进一步聚合和计算等。
(5)reduce 函数将最终的结果以 “ (k,v) ”的格式输出到 HDFS 中。
MapReduce 容错包括 Task(任务)容错,AppMaster 容错、NodeManager 容错和 ResourceManager 容错。
默认重试次数为4,即任务失败后,MapReduce 框架会重试4次,如果任务依然失败,MapReduce才会认为任务彻底失败了。
尝试次数默认值为2,即当 AppMaster 失败2次之后,运行的任务将会失败。
等待时间默认值为 10 min,即 NodeManager 发生故障之后,ResourceManager 节点接收不到 NodeManager 发生过来的心跳信息,过 10 min 之后才会将 NodeManager 移除 。
此默认值为3,即当一个 NodeManager 上有超过3个任务失败,AppMaster 就会将该节点上的任务调度到其他节点上 。
新版本的 Hadoop 中提供了 ResourceManager 节点的 HA 机制,如果主 ResourceManager 失败,备 ResouceManager 会迅速接管工作。
Hadoop 中对 ResourceManager节点提供了检查点机制,当所有的 ResourceManager 节点失败后,重启 ResouceManager 节点,可以从上一个失败的 ResourceManager 节点保存的检查点进行状态恢复。
当然,默认是保存到文件中。
技术面试中,关于 MapReduce 优化的考察频率可能不如 Spark,Flink,但是作为 Hadoop 知识的热门考点,我们对于它的优化还是要有一个清晰的认识 。 这里,我们从以下几个小点逐一展开。
MapReduce程序效率的瓶颈在于两点:
CPU、内存、磁盘健康、网络
关于 MapReduce 优化方法主要从以下6个方面进行考虑,分别是:数据倾斜、Map阶段、Reduce阶段、IO传输、数据倾斜问题和常用的调优参数。
(1)合并小文件:在执行 MR 任务之前将小文件进行合并,大量的小文件会产生大量的 MR 任务,增大 Map 任务装载次数,而任务的装载比较耗时,从而导致 MR 运行较慢。
(2)采用 CombineText InputFormat 来作为输入,解决输入端大量小文件场景。
(3)在 Map 之后,不影响业务逻辑前提下,先进行 Combine 处理,减少 I/O 。
(1)合理设置 Map 和 Reduce 数:两个都不能设置的太少,也不能设置的太多。太少,会导致 Task 等待,延长处理时间;太多,会导致 Map,Reduce 任务间竞争资源,造成处理超时等错误 。
(3)规避使用 Reduce:因为 Reduce 在用于连接数据集的时候将会产生大量的网络消耗。
(2)使用 SequenceFile 二进制文件。
可以通过对原始数据进行抽样得到的结果集来预设分区边界值。
基于输出键的背景知识进行自定义分区。例如,如果 Map 输出键的单词来源于一本书。且其中某几个专业词汇较多,那么就可以自定义分区将这些专业词汇发送给固定的一部分 Reduce 实例。而其他的都发送给剩余的 Reduce 实例。
使用 Combine 可以大量的减少数据倾斜。在可能的情况下,Combine 的目的就是聚合并精简数据。
这个我们上面说过了,Reduce 在用于连接数据集的时候将会产生大量的网络消耗,所以我们采用 MapJoin,尽量避免 Reduce Join 。
配置参数
参数说明
一个MapTask可使用的资源上限(单位:MB),默认为1024。如果MapTask实际使用的资源量超过该值,则会被强制杀死。
一个ReduceTask可使用的资源上限(单位:MB),默认为1024。如果ReduceTask实际使用的资源量超过该值,则会被强制杀死。
每个MapTask可使用的最多cpu core数目,默认值: 1
每个ReduceTask可使用的最多cpu core数目,默认值: 1
每个Reduce去Map中取数据的并行数。默认值是5
Buffer中的数据达到多少比例开始写入磁盘。默认值0.66
Buffer大小占Reduce可用内存的比例。默认值0.7
指定多少比例的内存用来存放Buffer中的数据,默认值是0.0
配置参数
参数说明
给应用程序Container分配的最小内存,默认值:1024
给应用程序Container分配的最大内存,默认值:8192
每个Container申请的最小CPU核数,默认值:1
每个Container申请的最大CPU核数,默认值:32
给Containers分配的最大物理内存,默认值:8192
配置参数
参数说明
Shuffle的环形缓冲区大小,默认100m
环形缓冲区溢出的阈值,默认80%
配置参数
参数说明
每个Map Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。
每个Reduce Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。
Task超时时间,经常需要设置的一个参数,该参数表达的意思为:如果一个Task在一定时间内没有任何进入,即不会读取新的数据,也没有输出数据,则认为该Task处于Block状态,可能是卡住了,也许永远会卡住,为了防止因为用户程序永远Block住不退出,则强制设置了一个该超时时间(单位毫秒),默认是600000。如果你的程序对每条输入数据的处理时间过长(比如会访问数据库,通过网络拉取数据等),建议将该参数调大,该参数过小常出现的错误提示是“AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster.”
1、《海量数据处理与大数据技术实战》2、《Hadoop权威指南》
实际上,关于 MapReduce的内容还有很多,本期文章只是将比较重要核心的部分介绍了一下。其中,MapReduce的原理,运行流程,优化是面试中比较经常考察的点,而部署结构,容错机制我们仅做学习了解即可。我还想强调一点,一定要学会自发的去学习新的知识和总结学过的内容。否则就容易出现,新学的记不住,学过的忘记了的情况。