Hadoop教程

作业调优

作业运行后,许多开发人员可能会问:“能够让它运行得更快一些吗?”

有一些Hadoop相关的“疑点”值得检査一下,看看它们是不是引发性能问题的“元凶”。在开始任务级别的分析或优化之前,须仔细地研究表5-3所示的检査内容。

表5-3.作业调优检査表

范围最佳实践
mapper的数量mapper需要运行多长时间?如果平均只运行几秒钟,则可以看是否能用更少mapper运行更长的时间,通常是一分钟左右。时间长度取决于使用的输入格式
reducer的数量为了达到最髙性能,集群中reducer数量该略少于reducer的任务槽数。这将使reducer能够在同一个周期(in one wave)完成任务,并在reducer阶段充分使用集群
combiner作业能否充分利用combiner来减少通过shuffle传输的数据量?
中间值的压缩对map输出进行压缩几乎总能使作业执行得更快
自定义序列如果正在使用自己定义的Writable对象或自定义的comparator,则必须确保已实现 RawComparator
调整shuffleMapReduce的shuffle过程可以对一些内存管理的参数进行调整,以弥补性能不足

分析任务

正如调试一样,对MapReduce这类分布式系统上运行的作业进行分析也有诸多挑 战。Hadoop允许分析作业中的一部分任务,并且在毎个任务完成时,把分析信息 放到用户的机器上,以便日后使用标准分析工具进行分析。

当然,对本地作业运行器中运行的作业进行分析可能稍微简单些。如果你有足够的 数据运行map和reduce任务,那么对于提高mapper和reducer的性能有很大的帮 助。但必须注意一些问题。本地作业运行器是一个与集群完全不同的环境,并且数 据流模式也截然不同。如果MapReduce作业是I/O密集型的(很多作业都属于此 类),那么优化代码的CPU性能是没有意义的。为了保证所有调整都是有效的,应 该在实际集群上对比新老执行时间。这说起来容易做起来难,因为作业执行时间会随着与其他作业的资源争夺和调度器决定的任务顺序不同而发生改变。为了在这类情况下得到较短的作业执行时间,必须不断运行(改变代码或不改变代码),并检查是否有明显的改进。

有些问题(如内存溢出)只能在集群上重现,在这些情况下,必须能够在发生问题的 地方进行分析。

HPROF分析工具

许多配置属性可以控制分析过程,这些属性也可以通过JobConf的简便方法获取。下面对MaxTemperatureDrive(版本6)的修改将启用远程HPROF分析。HPROF是JDK自带的分析工具,虽然只有基本功能,但是同样能提供程序的CPU和堆使用情况等有用信息。

conf.setPnofileEnabled(tnue);
conf.setPnofilGPanams("-agentlib:hpnof=cpu=sampleSjheap=sites,depth=6/'+
                      "fonce=n,thnead=y,venbose=n,file=%s");
conf.setPnofileTaskRange(tnue, "0-2");

第一行启用了分析工具(默认是关闭状态),这相当于把mapred.task.profile配置属性设置为true。

接下来设置分析参数,即传到任务JVM的额外的命令行参数。一旦启用分析,即使启用JVM重用,也会给每个任务分配一个新的JVM。默认参数定义了 HPROF分析器,示例中设置一个额外的 HPROF选项depth = 6,以便能达到更深的栈跟踪深度(相比HPROF默认值)。 JobConf 的setProfileeParams()方法相当于设置 mapred.task.profile.params。

最后,指定希望分析的任务。一般只需要少数几个任务的分析信息,所以使用setProfileeParams()方法来指定想要分析的任务ID的范围。我们将其设置为0- 2(默认情况下),这意味着ID为0、1、2的任务将被分析。第一个传进setProfilee TaskRange()方法的参数指明这是map任务的范围还是reduce任务的范围:true代表 map任务,false代表reduce任务。允许范围集合㈡set of ranges)的表示方法,使 用一个标注允许开放范围(open range)。例如’ 0-1、4、6-将指定除了山为2、3、5之外 的所有任务。要分析的map任务,还可以使用mapred.task.profile.map属性来控 制,reduce任务则由mapred.task和profile.reduce控制。

使用修改过的驱动程序来运行作业时,分析结果将输出到在启动作业的文件夹中该 作业的末尾。因为我们只分析少数几个任务,所以可以在数据集的子集上运行该作业。

下面取自一个mapper分析文件,它显示了 CPU的抽样信息:

CPU SAMPLES BEGIN (total =1002)Sat Apr 11 11:17:52 2009
Rank   self    accum   count   trace    method
1      3.49%   3.49%    35     307969   java.lang.Object.2      3.39%   6.89%    34     307954   java.lang.Object.3      3.19%  10.08%    32     307945   java.util.regex.Matcher.4      3.19%  13.27%    32     307963   java.lang.Object.5      3.19%  16.47%    32     307973   java.lang.Object.

交叉引用跟踪号307973显示了同一文件的栈跟踪轨迹:

TRACE 307973: (thread=200001)
java.lang.Object.<init>(Object.java:20)
org.apache.hadoop.io.IntWritable.<init>(IntWritable.java: 29)
v5.MaxTemperatureMapper.map(MaxTemperatureMapper.3ava:30)
v5.MaxTemperatureMapper.map(MaxTemperatureMapper.java:14)
org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:356)

因此可以看出,mapper花了 3%的时间来构建IntUritable对象。这表明重用Writable实例作为输出(版本7,见例5-12)是有价值的。

例5-12.重用Text和IntWritable输出对象

public class MaxTemperatureMapper extends MapReduceBase
 implements Mapper<LongWritable, Text, Text, IntWritable> 
{
    enum Temperature {
        MALFORMED
    }
    private NcdcRecordParser parser = new NcdcRecordParser();
    private Text year = new Text();
    private IntWritable temp = new IntWritable();

    public void map(LongWritable key, Text value,OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException 
    {
        parser.parse(value);
        if (parser.isValidTemperature()) 
        {
            year.set(parser.getYear());
            temp.set(parser.getAirTemperature());
            output*collect(year,temp);
        } 
        else if (parser.isMalformedTemperature()) 
        {
            System.err.println("Ignoring possibly corrupt input: " + value);
            reporter.incrCounter(Temperature.MALFORMED, 1);
        }
    }
}

然而,我们知道只有能看出整个数据集上运行作业有明显提升,才是有意义的。在 其他空闲的11个节点的集群上运行每个修改过的版本五次,统计结果显示作业执 行时间并没有明显不同。当然,这只是针对具体的代码、数据和硬件的综合结果而 言,对于这样的修改,应该以相同的基准来运行,然后检査在具体的配置下性能是 否明显提升。

其他分析工具

本书写作时,获取分析输出的机制是HPROF专有的。在此之前,可以使用 Hadoop的分析设置来触发其他分析器进行分析(详见具体分析工具的文档),但这 必须从tasktracker中手动检索分析输出。

如果在所有tasktracker的机器上没有安装分析工具,可以考虑使用Distributed Cache在需要的机器上安装该 分析工具。

关注微信获取最新动态