Hadoop教程

本地运行测试数据

现在mapper和reducer已经能够在可控的输入上进行工作了,下一步是写一个作 业驱动程序(job driver),然后在开发机器上使用测试数据运行它。

在本地作业运行器上运行作业

通过使用前面介绍的Tool接口,可以轻松写一个MapReducer作业的驱动程序, 来计算按照年度查找最高气温(参见例5-7的MaxTemperatureDriver)。 例5-7.査找最高气温

public class MaxTemperatureDriver extends Configured implements Tool 
{
    @Override
    public int run(String[] args) throws Exception {
        if (args.length != 2) 
        {
            System.err.printf("Usage: %s [generic options] <input> <output>\n",
	     getClass().getSimpleName());
	    ToolRunner.printGGnericCommandLlsagG(SystGm.Grr);
	    return -1;
	}
         JobConf conf - new 3obConf(getConf(), getClass());
         conf.setJobName("Max temperature");

         FileInputFormat•addInputPath(conf, new Path(args[0]));
         FileOutputFormat.setOutputPath(conf^ new Path(args[l]));

         conf.setOutputKeyClass(Text.class);
         conf.setOutputValueClass(IntWritable.class);
         conf.setMapperClass(MaxTemperatureMapper.class);
         conf.setCombinerClass(MaxTemperatureReducer.class);
         conf.setReducerClass(MaxTemperatureReducer.class);

         JobClient•runDob(conf);
         return 0;
    }
    public static void main(String[] args) throws Exception {
      int exitCode = ToolRunner.run(new MaxTemperatureDriver(), args);
      System.exit(exitCode);
    }
}

axTemperatureDriver实现了 Tool接口,所以,我们能够设置 GenericOptionsParser支持的选项。在开始]obConf所描述的作业前,run() 方法创建和配置一个Jobconf对象。在所有可能的作业配置参数中,可以设置输 入和输出文件路径,mapper、reducer和combiner,以及输出类型(输入类型由输入 格式决定,默认为TextInputFormat,包括Long Writable键和Text值)。为 作业设置一个名称也是很好的做法,这样可以在执行过程中或作业完成后方便地从 作业列表中查找作业。默认情况下,作业名称是JAR文件,通常情况下没有特殊 的描述。

现在我们可以在一些本地文件上运行这个应用。Hadoop有一个本地作业运行器 (Job runner),它是在MapReduce执行引擎运行单个JVM上的MapReduce作业的简 化版本。它是为测试而设计的,在lDE中使用起来非常方便,因为我们可以在调 试器中单步运行mapper和reducer代码。

本地作业运行器通过一个配置设置来激活。正常情况下,mapred.job.tracker 是一个主机:端口(host.port),用来设置jobtracker的地址,但它是一个特殊的local值时,作业就在不访问外部jobtracker的情况下运行^

可以在命令行方式下输入如下命令来运行驱动程序:

% hadoop v2.MaxTemperatureDriver -conf conf/hadoop-local.xml \
.input/ncdc/micro max-temp

类似地,可以使用GenericOptionParser提供的-fs和-jt选项:

% hadoop v2.MaxTemperatureDriver -fs
file:/// -jt local input/ncdc/micro max-temp

这条指令使用本地input/ncdc/micro目录的输入来执行MaxTemperatureDriver,产 生的输出存放在本地max-tcmp目录中。注意:虽然我们设置了-fs,可以使用本地 文件系统(file:///),但本地作业运行器实际上可以在包括HDFS在内的任何文 件系统上正常工作(如果HDFS里有一些文件,可以马上进行尝试)。

我们运行这个程序时,运行失败,并打印如下异常:

java.lang.NumbenFonmatException: For input string: "+0000"

修复mapper

这个异常表明map方法仍然不能解析带正号的气温。如果堆栈跟踪不能提供足够 的信息来诊断这个错误,因为程序运行在一个JVM中,所以我们可以在本地调试 器中进行测试。前面我们已经使程序能够处理缺失气温值(+9999)的特殊情况,但 不是任意非负气温的一般情况。如果mapper中有更多的逻辑,那么给出一个解析 类来封装解析逻辑是非常有意义的。参见例5-8。

例5-8.该类解析NCDC格式的气温记录

public class NcdcRecondPansen {
    private static final int MISSING_TEMPERATURE = 9999;
    private String yean;
    private int ainTempenatune;
    private String quality;
    
    public void panse(Stning record) {
    yean = record.substning(15, 19);
    String ainTempenatureString;
    // Remove leading plus sign as parseInt doesn't like them
    if (necond.chanAt(87) == '+') {
    airTempenatureString = record.substring(88, 92);
    } else {
    ainTemperatureString = record.substring(87, 92);
    }
    airTemperature = Integer.parseInt(airTemperature5tring);
    quality = record.substring(92, 93);
  }
  public void parse(Text record) {
    parse(record.toString());
  }
  public boolean isValidTemperature() {
    return airTemperature != MISSING_TEMPERATURE && quality.matches("[01459]");
  }
  public String getYear() {
    return year;
  }
   public int getAirTemperature() {
    return airTemperature;
  }
}

最终的mapper相当简单(参见例5-9)。只调用解析类的parser()方法,后者解析 输人行中的相关字段,用isValidTemperature()方法检查是否是合法气温,如 果是,就用解析类的getter方法获取年份和气温数据。注意,我们也会在isValidTemperature()方法中检查质量状态字段和缺失的气温信息,以便过滤气 温读取错误。

创建解析类的另一个好处是:相似作业的mapper不需要重写代码。也提供了一个 机会直接针对解析类编写单元测试,用于更多目标测试。

例5-9.这个mapper使用utnity类来解析记录

public class MaxTemperatureMapper extends MapReduceBase
implements Mapper<LongWritable, Text, Text, IntWritable> 
{
    private NcdcRecordParser parser = new NcdcRecordParser();
    
    public void map(LongWritable key. Text value,
    OutputCollector<Text, IntWritable> output, Reporter reporter)
    throws IOException {    
        parser.parse(value);
        if (parser.isValidTempenature()) 
        {
            output.collect(new Text(parser.getYear()),
            new IntWritable(parser.getAirTemperature()));
        }
    }
}

经过这些修改以后,测试得以通过。


测试驱动程序

除了灵活的配置选项可以使应用程序实现Tool,还可以插入任意Configuration 来增加可测试性。可以利用这点来编写测试程序,它将利用本地作业运行器在已经 输入数据上运行作业,借此来检查输出是否满足预期。

要实现这个目标,有两种方法。第一种方法是使用本地作业运行器,在本地文件系 统的测试文件上运行作业。例5-10的代码给出了一种思路。

例5-10.这个MaxTemperatureDriver测试使用了一个正在运行的本地作业运行器

@Test
public void test() throws Exception {
    JobConf conf = new JobConf();
    conf.set("fs.default.name", "file:///");
    conf.set("mapred.job.tracker", "local");
    
    Path input = new Path("input/ncdc/micro");
    Path output = new Path("output");
    
    FileSystem fs = FileSystem.getLocal(conf);
    fs.delete(outputj true); // delete old output
    
    MaxTemperatureDriver driver = new MaxTemperatureDriver();
    driver.setConf(conf);
    
    int exitCode = driver.run(new String[]{input.toString(), output.toString()});
    assertThat(exitCode, is(0));
    
    checkOutput(conf, output);
}

测试代码明确设置fs.default.name和mapred.job.tracker,所以,它使用的 是本地文件系统和本地作业运行器。随后,通过其Tool接口在少数已知数据上运 行MaxTemperatureDriver。最后,checkOutput()方法被调用以逐行对比实际输 出与预期输出。

测试驱动程序的第二种方法是使用一个mini集群来运行它。Hadoop有一对测试 类,名为MiniDFSCluster和MiniMRCluster,它以程序方式创建正在运行的集 群。不同于本地作业运行器,它们不允许在整个HDFS和MapReduce机器上运行 运行测试。注意,mini集群上的tasktracker启动不同的JVM来运行任务,这会使调试更困难。

mini集群广泛应用于Hadoop自带的自动测试包中,但也可以用于测试用户代码。 Hadoop的ClusterMapReduceTestCase抽象类提供了一个编写此类测试的基础, 它在setUp()和 tearDown()方法中提供了用来处理启动和停止运行中的HDFS和MapReduce集群的细节,同时产生一个合适的被配置为一起工作的]obConf对 象。子类只需要得到HDFS中的数据(可能从本地文件中复制得到),运行 MapReduce作业,然后确认输出是否满足要求。

这样的测试是回归测试,是一个非常有用的输入边界用例和相应的期望结果的资源 库。随着测试用例的增加,简单将其加入输入文件,然后更新相应输出即可。

关注微信获取最新动态