Hadoop教程

shuffle和排序

Reduce确保每个reducer的输入都按键排序。系统执行排序的过程 将map输出作为输入传给reducer 称为shuffle。在此,我们将学习shuffle是如何工作的,因为它有助于我们理解工作机制(如果需要优化MapReduce程序的话)。 shuffle属于不断被优化和改进的代码库的一部分,因此下面的描述有必要隐藏一 些细节(也可能随时间而改变,目前是0.20版本)。从许多方面来看,shuffle是 MapReduce的“心脏”,是奇迹发生的地方。

map端

map函数开始产生输出时,并不是简单地将它写到磁盘。这个过程更复杂,它利 用缓冲的方式写到内存,并出于效率的考虑进行预排序。图6-4展示了这个 过程。

每个map任务都有一个环形内存缓冲区,用于存储任务的输出。默认情况下,缓 冲区的大小为100 MB,此值可以通过改变io.sort.mb属性来调整。一旦缓冲内 容达到阈值(io.sort.spill.percent,默认为0.80,或80%),一个后台线程便 开始把内容写到(spill)磁盘中。在写磁盘过程中,map输出继续被写到缓冲区,但 如果在此期间缓冲区被填满,map会阻塞直到写磁盘过程完成。

写磁盘将按轮询方式写到tnapred.local.dir属性指定的作业特定子目录中的目 录中。

图 6-4. MapReduce 的 shuffle 和排序

在写磁盘之前,线程首先根据数据最终要传送到的reducer把数据划分成相应的分区(partition)。在毎个分区中,后台线程按键进行内排序,如果有一个combiner, 它会在排序后的输出上运行。

一旦内存缓冲区达到溢出写的阈值,就会新建一个溢出写文件,因此在map任务 写完其最后一个输出记录之后,会有几个溢出写文件。在任务完成之前,溢出写文 件被合并成一个已分区且已排序的输出文件。配置属性io.sort.factor控制着 一次最多能合并多少流,默认值是10。

如果已经指定combiner,并且溢出写次数至少为3(min.num.spills.for.combine 属性的取值)时,则combiner就会在输出文件写到磁盘之前运行。前面曾讲过, combiner可以在输人上反复运行,但并不影响最终结果。运行combiner的意义在 于使map输出更紧凑,使得写到本地磁盘和传给reduce的数据更少。

写磁盘时压缩map输出往往是个很好的主意,因为这样会让写磁盘的速度更快, 节约磁盘空间,并且减少传给reducer的数据跫。默认情况下,输出是不压缩的, 但只要将mapred.compress.map.output设置为true,就可以轻松启用此功 能。使用的压缩库由mapred.map.output.compression.codec指定。

reducer通过HTTP方式得到输出文件的分区。用于文件分区的工作线程的数量由 任务的tracker.http.threads属性控制,此设置针对毎个tasktracker,而不是 针对每个map任务槽。默认值是40,在运行大型作业的大型集群上,此值可以根 据需要而增加。


reduce 端

现在转到处理过程的reduce部分。map输出文件位于运行map任务的tasktracker 的本地磁盘(注意,尽管map输出经常写到map tasktracker的本地磁盘,但reduce 输出并不这样),现在,tasktracker需要为分区文件运行reduce任务。更进一步, reduce任务需要集群上若干个map任务的map输出作为其特殊的分区文件。每个map任务的完成时间可能不同,因此只要有一个任务完成,reduce任务就开始复制 其输出。这就是reduce任务的复制阶段卜(copy phase)。reduce任务有少量复制线 程,因此能够并行取得map输出。默认值是5个线程,但这个默认值可以通过设 置 mapred. reduce. parallel. copies 属性来改变。

如果map输出相当小,则会被复制到reduce tasktracker的内存(缓冲区大小由 mapred.job.shuffle.input.buffer.percent 属性控制,指定用于此用途的堆 空间的百分比),否则,map输出被复制到磁盘。一且内存缓冲区达到阈值大小(由 mapred.iob.shuffle.merge.percent 决定)或达到 map 输出阈值(由 mapred. inmem.merge.threshold控制),则合并后溢出写到磁盘中。

随着磁盘上副本的增多,后台线程会将它们合并为更大的、排好序的文件。这会为 后面的合并节省一些时间。注意,为了合并,压缩的map输出(通过map任务)都 必须在内存中被解压缩。

复制完所有map输出被复制期间,reduce任务进入排序阶段(sort phase)更恰当的说 法是合并阶段,因为排序是在map端进行的),这个阶段将合并map输出,维持其 顺序排序。这是循环进行的。比如,如果有50个map输出,而合并因子(merge factor)是10(10为默认设置,由io.sort.factor属性设置,与map的合并类 似),合并将进行5趟。毎趟将10个文件合并成一个文件,因此最后有5个中间文件。

在最后阶段,即reduce阶段,直接把数据输入reduce函数,从而省略了一次磁盘 往返行程,并没有将这5个文件合并成一个已排序的文件作为最后一趟。最后的合 并既可来自内存和磁盘片段。

在reduce阶段,对已排序输出中的每个键都要调用reduce函数。此阶段的输出直 接写到输出文件系统,一般为HDFS。如果采用HDFS,由于tasktracker节点也运 行数据节点,所以第一个块复本作(block replica)将被写到本地磁盘。


配置的调优

现在我们已经有比较好的基础来理解如何调优shuffle过程以提高MapReduce性能 了。表6-1和表6-2总结了相关设置和默认值,这些设置以作业为单位(除非有特 别说明),默认值适用于常规作业。

总的原则是给shuffle过程尽量多提供内存空间。然而,有一个平衡问题,也就是 要确保map函数和reduce函数能得到足够的内存来运行。这就是为什么编写map函数和reduce函数时尽量少用内存的原因 它们不应无限使用内存(例如,应避免在map中堆积数据)。

运行map任务和reduce任务的JVM,其内存大小由mapred.child.java.opts属性设置。任务节点上的内存大小应该尽量大。

在map端,可以通过避免多次溢出写磁盘来获得最佳性能。如果能估算map输出 大小,就可以合理地设置ip.sort.*.属性来尽可能减少溢出写的次数。具体而言,如果可以,应该增加io.sort.mb的值。MapReduce计数器计算在作业运行整个阶段中溢出写磁 盘的记录数,这对于调优很有帮助。注意,计数器统计map和reduce两端的溢出写。

在reduce端,中间数据全部驻留在内存时,就能获得最佳性能。默认情况下,这 是不可能发生的,因为一般情况下所有内存都预留给reduce函数。但如果reduce 函数的内存需求不大,那么把tnapred.inmem.merge.threshold设置为0,把 mapred.job.reduce.input.buffer.percent 设置为 1.0(或一个更低的值,详 见表6-2)会带来性能的提升。

更常见的情况是,Hadoop使用默认为4 KB的缓冲区,这是很低的,因此应该在 集群中增加这个值。2008年4月,Hadoop在通用TB字节排序基准测试中获胜,它使用的一个优化方法就是将中 间数据保存在代如^这一端的内存中。

2008年4月,Hadoop在通用TB字节排序基准测试中获胜,它使用的一个优化方法就是将中 间数据保存在reduce这一端的内存中。

表6-1.map端的调优属性

属性名称类型默认值说明
io.sort.mbint100排序 map输出时所使 用的内存缓冲区的大 小,以兆字节为单位
io.soPt.record. percentfloat0.05用作存储map输出记 录边界的io.sort.mb 的比例。剩余的空间 用来输出map输出记 录本身
io.sopt.spill. percentfloat0.80map输出内存缓冲和 用来开始磁盘溢出写 过程的记录边界索引,这两者使用比例 的阈值
io.sort.factorint10排序文件时,一次最 多合并的流数。这个 属性也在reduce中使用。将此值增加到 100是很常见的。
min.num.spills.for .combineint3运行combiner所需的 最少溢出写文件数(如果已指定combiner)
mapred.compress.map.outputBooleanfalse压缩map输出
mapred.map.output. compression.codecClass nameorg.apache.hadoop.io.compress.DefaultCodec用于map输出的压缩 编解码器
tasktracker.http.threadsint40每个tasktracker的工作线程数,用于将map输出到reducer。这是集群范 围的设置,不能由单个作业设置

表6-2.map端的调优属性

属性名称类型默认值说明
mapred.reduce.parallel.copiesint5用于把map输出复制到reducer 的线程数
mapred.reduce.copy.backoffint300在声明失败之前,reducer获取一 个map输出所花的最大时间,以 秒为单位。如果失败(根据指数后 退),reducer可以在此时间内尝 试重传
io.sort.factorint10排序文件时一次最多合并的流的 数量。这个属性也在map端使用
mapred.job.shuffle.input.buffer.percentfloat0.70在shuffle的复制阶段,分配给 map输出的缓冲区占堆空间的百 分比
mapred.iob.shuffle.merge.percentfloat0.66map输出缓冲区(由mapred.job. shuffle.input.buffer.percent定义)的阈值使用比例,用于启动 合并输出和磁盘溢出写的过程
mapred.inmem.merge.thresholdint1000启动合并输出和磁盘溢出写过程 的map输出的阈值数。0或更小 的数意味着没有阈值限制,溢出 写行为由 mapred.job.shuffle.merge.percent 单独控制
mapred.iob.reduce.input.buffer.percentfloat0.0在reduce过程中,在内存中保存 map输出的空间占整个堆空间的 比例。reduce阶段开始时,内存 中的map输出大小不能大于这个 值。默认情况下,在reduce任务 开始之前,所有map输出都合并 到磁盘上,以便为reducer提供 尽可能多的内存。然而,如果reducer需要的内存较少,可以增 加此值来最小化访问磁盘的 次数

关注微信获取最新动态