本地运行测试数据
现在mapper和reducer已经能够在可控的输入上进行工作了,下一步是写一个作 业驱动程序(job driver),然后在开发机器上使用测试数据运行它。
在本地作业运行器上运行作业
通过使用前面介绍的Tool接口,可以轻松写一个MapReducer作业的驱动程序, 来计算按照年度查找最高气温(参见例5-7的MaxTemperatureDriver)。 例5-7.査找最高气温
public class MaxTemperatureDriver extends Configured implements Tool { @Override public int run(String[] args) throws Exception { if (args.length != 2) { System.err.printf("Usage: %s [generic options] <input> <output>\n", getClass().getSimpleName()); ToolRunner.printGGnericCommandLlsagG(SystGm.Grr); return -1; } JobConf conf - new 3obConf(getConf(), getClass()); conf.setJobName("Max temperature"); FileInputFormat•addInputPath(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf^ new Path(args[l])); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(MaxTemperatureMapper.class); conf.setCombinerClass(MaxTemperatureReducer.class); conf.setReducerClass(MaxTemperatureReducer.class); JobClient•runDob(conf); return 0; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new MaxTemperatureDriver(), args); System.exit(exitCode); } }
axTemperatureDriver实现了 Tool接口,所以,我们能够设置 GenericOptionsParser支持的选项。在开始]obConf所描述的作业前,run() 方法创建和配置一个Jobconf对象。在所有可能的作业配置参数中,可以设置输 入和输出文件路径,mapper、reducer和combiner,以及输出类型(输入类型由输入 格式决定,默认为TextInputFormat,包括Long Writable键和Text值)。为 作业设置一个名称也是很好的做法,这样可以在执行过程中或作业完成后方便地从 作业列表中查找作业。默认情况下,作业名称是JAR文件,通常情况下没有特殊 的描述。
现在我们可以在一些本地文件上运行这个应用。Hadoop有一个本地作业运行器 (Job runner),它是在MapReduce执行引擎运行单个JVM上的MapReduce作业的简 化版本。它是为测试而设计的,在lDE中使用起来非常方便,因为我们可以在调 试器中单步运行mapper和reducer代码。
本地作业运行器通过一个配置设置来激活。正常情况下,mapred.job.tracker 是一个主机:端口(host.port),用来设置jobtracker的地址,但它是一个特殊的local值时,作业就在不访问外部jobtracker的情况下运行^
可以在命令行方式下输入如下命令来运行驱动程序:
% hadoop v2.MaxTemperatureDriver -conf conf/hadoop-local.xml \ .input/ncdc/micro max-temp
类似地,可以使用GenericOptionParser提供的-fs和-jt选项:
% hadoop v2.MaxTemperatureDriver -fs file:/// -jt local input/ncdc/micro max-temp
这条指令使用本地input/ncdc/micro目录的输入来执行MaxTemperatureDriver,产 生的输出存放在本地max-tcmp目录中。注意:虽然我们设置了-fs,可以使用本地 文件系统(file:///),但本地作业运行器实际上可以在包括HDFS在内的任何文 件系统上正常工作(如果HDFS里有一些文件,可以马上进行尝试)。
我们运行这个程序时,运行失败,并打印如下异常:
java.lang.NumbenFonmatException: For input string: "+0000"
修复mapper
这个异常表明map方法仍然不能解析带正号的气温。如果堆栈跟踪不能提供足够 的信息来诊断这个错误,因为程序运行在一个JVM中,所以我们可以在本地调试 器中进行测试。前面我们已经使程序能够处理缺失气温值(+9999)的特殊情况,但 不是任意非负气温的一般情况。如果mapper中有更多的逻辑,那么给出一个解析 类来封装解析逻辑是非常有意义的。参见例5-8。
例5-8.该类解析NCDC格式的气温记录
public class NcdcRecondPansen { private static final int MISSING_TEMPERATURE = 9999; private String yean; private int ainTempenatune; private String quality; public void panse(Stning record) { yean = record.substning(15, 19); String ainTempenatureString; // Remove leading plus sign as parseInt doesn't like them if (necond.chanAt(87) == '+') { airTempenatureString = record.substring(88, 92); } else { ainTemperatureString = record.substring(87, 92); } airTemperature = Integer.parseInt(airTemperature5tring); quality = record.substring(92, 93); } public void parse(Text record) { parse(record.toString()); } public boolean isValidTemperature() { return airTemperature != MISSING_TEMPERATURE && quality.matches("[01459]"); } public String getYear() { return year; } public int getAirTemperature() { return airTemperature; } }
最终的mapper相当简单(参见例5-9)。只调用解析类的parser()方法,后者解析 输人行中的相关字段,用isValidTemperature()方法检查是否是合法气温,如 果是,就用解析类的getter方法获取年份和气温数据。注意,我们也会在isValidTemperature()方法中检查质量状态字段和缺失的气温信息,以便过滤气 温读取错误。
创建解析类的另一个好处是:相似作业的mapper不需要重写代码。也提供了一个 机会直接针对解析类编写单元测试,用于更多目标测试。
例5-9.这个mapper使用utnity类来解析记录
public class MaxTemperatureMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { private NcdcRecordParser parser = new NcdcRecordParser(); public void map(LongWritable key. Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException { parser.parse(value); if (parser.isValidTempenature()) { output.collect(new Text(parser.getYear()), new IntWritable(parser.getAirTemperature())); } } }
经过这些修改以后,测试得以通过。
测试驱动程序
除了灵活的配置选项可以使应用程序实现Tool,还可以插入任意Configuration 来增加可测试性。可以利用这点来编写测试程序,它将利用本地作业运行器在已经 输入数据上运行作业,借此来检查输出是否满足预期。
要实现这个目标,有两种方法。第一种方法是使用本地作业运行器,在本地文件系 统的测试文件上运行作业。例5-10的代码给出了一种思路。
例5-10.这个MaxTemperatureDriver测试使用了一个正在运行的本地作业运行器
@Test public void test() throws Exception { JobConf conf = new JobConf(); conf.set("fs.default.name", "file:///"); conf.set("mapred.job.tracker", "local"); Path input = new Path("input/ncdc/micro"); Path output = new Path("output"); FileSystem fs = FileSystem.getLocal(conf); fs.delete(outputj true); // delete old output MaxTemperatureDriver driver = new MaxTemperatureDriver(); driver.setConf(conf); int exitCode = driver.run(new String[]{input.toString(), output.toString()}); assertThat(exitCode, is(0)); checkOutput(conf, output); }
测试代码明确设置fs.default.name和mapred.job.tracker,所以,它使用的 是本地文件系统和本地作业运行器。随后,通过其Tool接口在少数已知数据上运 行MaxTemperatureDriver。最后,checkOutput()方法被调用以逐行对比实际输 出与预期输出。
测试驱动程序的第二种方法是使用一个mini集群来运行它。Hadoop有一对测试 类,名为MiniDFSCluster和MiniMRCluster,它以程序方式创建正在运行的集 群。不同于本地作业运行器,它们不允许在整个HDFS和MapReduce机器上运行 运行测试。注意,mini集群上的tasktracker启动不同的JVM来运行任务,这会使调试更困难。
mini集群广泛应用于Hadoop自带的自动测试包中,但也可以用于测试用户代码。 Hadoop的ClusterMapReduceTestCase抽象类提供了一个编写此类测试的基础, 它在setUp()和 tearDown()方法中提供了用来处理启动和停止运行中的HDFS和MapReduce集群的细节,同时产生一个合适的被配置为一起工作的]obConf对 象。子类只需要得到HDFS中的数据(可能从本地文件中复制得到),运行 MapReduce作业,然后确认输出是否满足要求。
这样的测试是回归测试,是一个非常有用的输入边界用例和相应的期望结果的资源 库。随着测试用例的增加,简单将其加入输入文件,然后更新相应输出即可。