输出格式
针对前一节介绍的输入格式,Hadoop都有相应的输出格式。OutputFormat类的 层次结构如图7-4所示。
图7-4. OutputFromat类的层次结构
文本输出
默认的输出格式是TextOutputFormat,它把每条记录写为文本行。它的键和值可 以是任意类型,因为TextOutputFormat调用toString()方法把它们转换为字符 串。每个键/值由制表符进行分隔,当然也可以mapred.textoutputformat.separator 属性改变默认的分隔符。与TextOutputFormat对应的输入格式是 KeyVaIueTextlnputFormat,它通过可配置的分隔符将为键/值对文本行分隔。
可以使用NullWritable来省略输出的键或值(或两者都省略,相当于 NullOutputFormat输出格式,后者什么也不输出)。这也会导致无分隔符输出, 以使输出适合用TextInPutFormat读取。
二进制输出
SequenceFileOutputFormat
正如名字所示,SequenceFileOutputFormat将它的输出写为一个顺序文件。如 果输出需要作为后续MapReduce任务的输入,这便是一种好的输出格式,因为它 的格式紧凑,很容易被压缩。
SequenceFileAsBinaryOuputFormat
SequenceFileAsBinaryOutputFormat 与 SequenceFileAsBinaryInputForrmat 相对应,它把键/值对作为二进制格式写到一个SequenceFile容器中。
MapFileOutputFormat
MapFileOutputFormat把MapFile作为输出。MapFile中的键必须顺序添加, 所以必须确保reducer输出的键已经排好序。
多个输出
FileOutputFormat及其子类产生的文件放在输出目录下。每个reducer一个文件 并且文件由分区号命名:part-00000, part-00001,等等。有时可能需要对输出的 文件名进行控制,或让每个reducer输出多个文件MapReduce为此提供了两个 库:MultipleOutputFormat 和 MultipleOutput 类。
实例:分区数据
考虑这样一个需求:按气象站来区分气象数据。这就需要运行一个作业,作业的输 出是每个气象站一个文件,此文件包含该气象站的所有数据记录。
一种方法是每个气象站对应一个reducer。为此,我们必须做两件事。第一,写一 个partitioner,把同一个气象站的数据放到同一个分区。第二,把作业的reducer 数设为气象站的个数partitioner如下:
public class StationPartitioner implements Partitioner<LongWritable, Text> { private NcdcRecordParser parser = new NcdcRecordParser(); @Override public int getPantition(LongWritable key, Text value, int numPartitions) { parser.parse(value); return getPantition(parsen.getStationId()); } private int getPartition(String stationId) { ... } @Override public void configure(JobConf conf) { } }
这里没有给出getPartition方法的实现,它将气象站ID转换成分区索 引号。为此,它的输入是一个列出所有气象站ID的列表,然后返回列表中气象站ID的索引。
这样做有两个缺点。第一,需要在作业运行之前知道分区数和气象站的个数。虽然 NCDC数据集提供了气象站的元数据,但无法保证数据中的气象站ID与元数据匹 配。如果元数据中有某个气象站但数据中却没有该气象站的数据,就会浪费一个 reducer任务槽。更糟糕的是,数据中有但元数据中却没有的气象站,也不会有对 应的reducer任务槽,只好将这个气象站扔掉。解决这个问题的方法是写一个作业 来抽取唯一的气象站,但很遗憾,这需要额外的作业来实现。
第二个缺点更微妙。一般来说,让应用程序来严格限定分区数并不好,因为可能导 致分区数少或分区不均。让很多reducer做少量工作不是一个高效的作业组织方 法,比较好的办法是使用更少reducer做更多的事情,因为运行任务的额外开销减 少了。分区不均的情况也是很难避免的。不同气象站的数据量差异很大:有些气象 站是一年前刚投入使用的,而另一些气象站可能已经工作近一个世纪了。如果其中 一些reduce任务运行时间远远超过另一些,作业执行时间将由它们决定,从而导 致作业的运行时间超过预期。
最好能让集群来为作业决定分区数:集群的reducer任务槽越多,作业就会完成越 快。这就是默认的HashPartitioner表现如此出色的原因,因为它处理的分区数 不限,并且确保每个分区都有一个好的键组合使分区更均匀。
如果使用HashPartitioner,那么每个分区将包含多个气象站,因此,要实现每 个气象站输出一个文件,必须安排每个reducer写多个文件,MultipleOutputFormat因此而来。
MultipleOutputFormat 类
MultipleOutputFormat类可以将数据写到多个文件,这些文件的名称源于输出 的键和值。MultipleFileOutputFormat是个抽象类,它有两个实体子类: MultipleTextOutputFormat 和 MultipleSequenceFileOutputFormat 类。它 们是 TextOutputFormat 和 SequenceOutputFormat 的多文件版本。 MultipleFileOutputFormat类提供了 一些子类覆盖来控制输出文件名的Protected方法。在例7-5中,我们创建了 MultipleTextOutputFormat类的一个 子类来重载generateFilenameForKeyValue()方法,使它返回我们从记录值中 抽取的气象站ID。
例7-5.用MultipleOutputFormat类将整个数据集分区到以气象站ID命名的文件
public class PartitionByStationUsingMultipleOutputFormat extends Configured implements Tool { static class StationMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> { private NcdcRecondPansen parser = new NcdcRecordPansen(); public void map(LongWnitable key, Text value,OutputCollecton<Text, Text> output, Reporter reporter)throws IOException { pansen.panse(value); output.collect(new Text(pansen.getStationId()), value); } } static class StationReducer extends MapReduceBase implements Reducen<Text, Text,NullWritable, Text> { @Override public void neduce(Text key, Itenaton<Text> values,OutputCollecton<NullWnitable, Text> output, Reporter reporter)throws IOException { while (values.hasNext()) { output.collect(NullWnitable.get(), values.next()); } } } static class StationNameMultipleTextOutputFormat extends MultipleTextOutputFormat<NullMritable,Text> { private NcdcRecordParser parser = new NcdcRecordParser(); protected String generateFileNameForKeyValue(NullMritable key, Text value,String name) { parser.parse(value); return parser.getStationId(); } } @Override public int run(String[] args) throws IOException { JobConf conf = JobBuilder.parseInputAndOutput(this, getConf()args); if (conf == null) { return -1; } conf.setMapperClass(StationMapper.class); conf.setMapOutputKeyClass(Text•class); conf.setReducerClass(StationReducer.class); conf.setOutputKeyClass(NullWritable.class); conf.setOutputFormat(StationNameMultipleTextOutputFormat.class); JobClient.runJob(conf); return 0; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new PartitionByStationUsingMultipleOutputFormat(), args); System.exit(exitCode); } }
StationMapper从记录中找到气象站lD,并把它作为键。这样,来自同一个气象 站的记录会被分到同一个分区。用StationNameMultipleTextOutputFormat写 最终输出的时候,StationReducer用NullWritable来替换记录的键(这类似于 TextOutputFormat丢掉NullWritable键),输出中只包含气象记录(没有气象站ID键).
最终的结果是每个气象站的所有记录都放在以气象站lD命名的文件里。在整个数 据集的子集上运行程序后,得到以下几行输出:
-rw-r--r-- 3 root supergroup 2887145 2009-04-17 10:34 /output/010010-99999 -rw-r--r-- 3 root supergroup 1395129 2009-04-17 10:33 /output/010050-99999 -rw-r--r-- 3 root supergroup 2054455 2009-04-17 10:33 /output/010100-99999 -rw-r--r-- 3 root supergroup 1422448 2009-04-17 10:34 /output/010280-99999 -rw-r--r-- 3 root supergroup 1419378 2009-04-17 10:34 /output/010550-99999 -rw-r--r-- 3 root supergroup 1384421 2009-04-17 10:33 /output/010980-99999 -rw-r--r-- 3 root supergroup 1480077 2009-04-17 10:33 /output/011060-99999 -rw-r--r-- 3 noot supergroup 1400448 2009-04-17 10:33 /output/012030-99999 -rw-r--r-- 3 root supergroup 307141 2009-04-17 10:34 /output/012350-99999 -rw-r--r-- 3 root supergroup 1433994 2009-04-17 10:33 /output/012620-99999
generateFileNameForKeyValue()方法返回的文件名实际上是一个相对于输出目 录的路径。例如,以下的修改按照气象站来对数据分区,这样,每年的数据包含在 一个以气象站ID命名的目录中:
protected String genenateFileNameFonKeyValue(NullWnitable key,Text value,String name) { pansen.panse(value); return pansen.getStationId() + 'T + pansen.getYean(); }
MultipleOutputFormat还有很多特性没有在本书中讨论例如,为只有 map(map-only)的作业进行复制输入目录结构和文件命名等能力。详情请参考Java 文档。
MultipleOutputs
在Hadoop中另一个用于产生多个输出的库由MulipleOutputs类提供。与 MultipleOutputFormat类不一样的是,MulipleOutputs可以为不同的输出产 生不同的类型。另一方面,也意味着无法控制输出的命名。例7-6演示了如何使用 MultipleOutputs类根据气象站来切分数据。
例7-6.使用MultipleOutputs类将整个数据集切分为以气象站ID命名的文件
public class PantitionByStationUsingMultipleOutputs extends Configured implements Tool{ static class StationMapper extends MapReduceBase implements Mappen<LongWritable,Text, Text, Text> { private NcdcRecondPansen pansen = new NcdcRecondPansen(); public void map(LongWritable key, Text value,OutputCollecton<Text, Text> output, Reporter reporter) throws IOException { parser.panse(value); output.collect(new Text(parser.getStationId()), value); } } static class MultipleOutputsReducer extends MapReduceBase implements Reducer<Text, Text, NullWritable, Text> { private MultipleOutputs multipleOutputs; @Override public void configure(JobConf conf) { multipleOutputs = new MultipleOutputs(conf); } public void reduce(Text key, Iterator,Text, values,OutputCollector<NullWritable, Text> output, Reporter reporter) throws IOException { OutputCollector collector = multipleOutputs.getCollector("station", key,toString().replace("-", *"•), reporter); while (values.hasNext()) { collector.collect(NullMritable.get(); values.next()); } } @Override public void close() throws IOException { multipleOutputs.close(); } } @Override public int run(String[] args) throws IOException { JobConf conf = JobBuilder.parseInputAndOutput(this, getConf(), args); if (conf == null) { return -1; } conf.setMapperClass(StationMapper.class); conf.setMapOutputKeyClass(Text.class); conf.setReducerClass(MultipleOutputsReducer.class); conf.setOutputKeyClass(NullWritable.class); conf.setOutputFormat(NullOutputFormat.class);// suppress empty part file MultipleOutputs.addMultiNamedOutput(conf, "station", TextOutputFormat.class, Nuliwritable.class, Text.class); JobClient.runJob(conf); return 0; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new PartitionByStationUsingMultipleOutputs(),args); System.exit(exitCode); } }
MultipleOutputs类用于在原有输出基础上附加输出。输出是指定名称的,在生成输出的reducer中,在configure()方法中构造一个MultipleOutputs的 实例,并将它赋给一个实例变量。在reduce()方法中使用MultipleOutputs实 例为多名输出获取OutputCollector。getCollector方法接收输出名称(即 “station”)和一个标识part的字符串,该part标识符用于多命名输出。这里使 用气象站标识符,其中的“-”符号被忽略了,因为MultipleOutputs只能包含 字母字符。可以 写到一个文件(单名输出)或多个文件(多名输出)中。在这个例子中,我们需要多个 文件,每个气象站一个文件,所以我们使用多命名输出,调用MultipleOutputs 中的addMultiNamedOutput()方法来设定输出名称(这里是 “station”)、输出格 式及输出类型。另外,为了减少原有输出的影响,我们把正常的输出格式设为 NullOutputFormat。
最终的结果是生成多个输出文件,文件的命名模式是station_
运行一次后,前面几个输出文件的命名如下(目录列表中的其他列已被删除):
/output/station_01001099999-r-00027 /output/station_01005099999-r-00013 /output/station_01010099999-r-00015 /output/station_01028099999-r-00014 /output/station_01055099999-r-00000 /output/station_01098099999-r-00011 /output/station_01106099999-r-00025 /output/station_01203099999-r-00029 /output/station_01235099999-r-00018 /output/station_01262099999-r-00004
延迟输出
File OutputFormat的子类会产生输出文件(part-nnnnn),即使文件是空的。有些 应用倾向于不创建空文件,此时LazyOutputFormat就有用了。它是一个封装输 出格式,可以保证指定分区第一条记录输出时才真正创建文件。要使用它,用JobConf和相关的输出格式作为参数来调用setOutputFormatClass()方法即可。
Streaming 和 Pipes 支持-LazyOutput 选项来启用 LazyOutputFormat 功能。
数据库输出
写到关系数据库和HBase狀的输出格式,请参见“数据库输入(和输出)” 小节。