Hadoop教程

输出格式

针对前一节介绍的输入格式,Hadoop都有相应的输出格式。OutputFormat类的 层次结构如图7-4所示。

图7-4. OutputFromat类的层次结构

文本输出

默认的输出格式是TextOutputFormat,它把每条记录写为文本行。它的键和值可 以是任意类型,因为TextOutputFormat调用toString()方法把它们转换为字符 串。每个键/值由制表符进行分隔,当然也可以mapred.textoutputformat.separator 属性改变默认的分隔符。与TextOutputFormat对应的输入格式是 KeyVaIueTextlnputFormat,它通过可配置的分隔符将为键/值对文本行分隔。

可以使用NullWritable来省略输出的键或值(或两者都省略,相当于 NullOutputFormat输出格式,后者什么也不输出)。这也会导致无分隔符输出, 以使输出适合用TextInPutFormat读取。


二进制输出

SequenceFileOutputFormat

正如名字所示,SequenceFileOutputFormat将它的输出写为一个顺序文件。如 果输出需要作为后续MapReduce任务的输入,这便是一种好的输出格式,因为它 的格式紧凑,很容易被压缩。

SequenceFileAsBinaryOuputFormat

SequenceFileAsBinaryOutputFormat 与 SequenceFileAsBinaryInputForrmat 相对应,它把键/值对作为二进制格式写到一个SequenceFile容器中。

MapFileOutputFormat

MapFileOutputFormat把MapFile作为输出。MapFile中的键必须顺序添加, 所以必须确保reducer输出的键已经排好序。


多个输出

FileOutputFormat及其子类产生的文件放在输出目录下。每个reducer一个文件 并且文件由分区号命名:part-00000, part-00001,等等。有时可能需要对输出的 文件名进行控制,或让每个reducer输出多个文件MapReduce为此提供了两个 库:MultipleOutputFormat 和 MultipleOutput 类。

实例:分区数据

考虑这样一个需求:按气象站来区分气象数据。这就需要运行一个作业,作业的输 出是每个气象站一个文件,此文件包含该气象站的所有数据记录。

一种方法是每个气象站对应一个reducer。为此,我们必须做两件事。第一,写一 个partitioner,把同一个气象站的数据放到同一个分区。第二,把作业的reducer 数设为气象站的个数partitioner如下:

public class StationPartitioner implements Partitioner<LongWritable, Text> 
{
    private NcdcRecordParser parser = new NcdcRecordParser();
    @Override
    public int getPantition(LongWritable key, Text value, int numPartitions) 
    {
        parser.parse(value);
        return getPantition(parsen.getStationId());
    }
    private int getPartition(String stationId)
    {
    ...
    }
    @Override
    public void configure(JobConf conf) 
    {
    }
}

这里没有给出getPartition方法的实现,它将气象站ID转换成分区索 引号。为此,它的输入是一个列出所有气象站ID的列表,然后返回列表中气象站ID的索引。

这样做有两个缺点。第一,需要在作业运行之前知道分区数和气象站的个数。虽然 NCDC数据集提供了气象站的元数据,但无法保证数据中的气象站ID与元数据匹 配。如果元数据中有某个气象站但数据中却没有该气象站的数据,就会浪费一个 reducer任务槽。更糟糕的是,数据中有但元数据中却没有的气象站,也不会有对 应的reducer任务槽,只好将这个气象站扔掉。解决这个问题的方法是写一个作业 来抽取唯一的气象站,但很遗憾,这需要额外的作业来实现。

第二个缺点更微妙。一般来说,让应用程序来严格限定分区数并不好,因为可能导 致分区数少或分区不均。让很多reducer做少量工作不是一个高效的作业组织方 法,比较好的办法是使用更少reducer做更多的事情,因为运行任务的额外开销减 少了。分区不均的情况也是很难避免的。不同气象站的数据量差异很大:有些气象 站是一年前刚投入使用的,而另一些气象站可能已经工作近一个世纪了。如果其中 一些reduce任务运行时间远远超过另一些,作业执行时间将由它们决定,从而导 致作业的运行时间超过预期。

最好能让集群来为作业决定分区数:集群的reducer任务槽越多,作业就会完成越 快。这就是默认的HashPartitioner表现如此出色的原因,因为它处理的分区数 不限,并且确保每个分区都有一个好的键组合使分区更均匀。

如果使用HashPartitioner,那么每个分区将包含多个气象站,因此,要实现每 个气象站输出一个文件,必须安排每个reducer写多个文件,MultipleOutputFormat因此而来。

MultipleOutputFormat 类

MultipleOutputFormat类可以将数据写到多个文件,这些文件的名称源于输出 的键和值。MultipleFileOutputFormat是个抽象类,它有两个实体子类: MultipleTextOutputFormat 和 MultipleSequenceFileOutputFormat 类。它 们是 TextOutputFormat 和 SequenceOutputFormat 的多文件版本。 MultipleFileOutputFormat类提供了 一些子类覆盖来控制输出文件名的Protected方法。在例7-5中,我们创建了 MultipleTextOutputFormat类的一个 子类来重载generateFilenameForKeyValue()方法,使它返回我们从记录值中 抽取的气象站ID。

例7-5.用MultipleOutputFormat类将整个数据集分区到以气象站ID命名的文件

public class PartitionByStationUsingMultipleOutputFormat extends Configured implements Tool 
{
    static class StationMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, Text> 
    {
        private NcdcRecondPansen parser = new NcdcRecordPansen();
        
        public void map(LongWnitable key, Text value,OutputCollecton<Text, Text> output, Reporter reporter)throws IOException
        {
            pansen.panse(value);
            output.collect(new Text(pansen.getStationId()), value);
        }
    }
    static class StationReducer extends MapReduceBase implements Reducen<Text, Text,NullWritable, Text> 
    {
        @Override
        public void neduce(Text key, Itenaton<Text> values,OutputCollecton<NullWnitable, Text> output, Reporter reporter)throws IOException 
        {
            while (values.hasNext()) 
            {
                output.collect(NullWnitable.get(), values.next());
            }
         }
    }
    static class StationNameMultipleTextOutputFormat extends MultipleTextOutputFormat<NullMritable,Text> 
    {
        private NcdcRecordParser parser = new NcdcRecordParser();
        protected String generateFileNameForKeyValue(NullMritable key, Text value,String name) 
        {
            parser.parse(value);
            return parser.getStationId();
        }
    }
    @Override
    public int run(String[] args) throws IOException {
        JobConf conf = JobBuilder.parseInputAndOutput(this, getConf()args);
        if (conf == null) {
            return -1;
        }
        conf.setMapperClass(StationMapper.class);
        conf.setMapOutputKeyClass(Text•class);
        conf.setReducerClass(StationReducer.class);
        conf.setOutputKeyClass(NullWritable.class);
        conf.setOutputFormat(StationNameMultipleTextOutputFormat.class);
        
        JobClient.runJob(conf);
        return 0;
    }
    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new PartitionByStationUsingMultipleOutputFormat(), args);
        System.exit(exitCode);
    }
}

StationMapper从记录中找到气象站lD,并把它作为键。这样,来自同一个气象 站的记录会被分到同一个分区。用StationNameMultipleTextOutputFormat写 最终输出的时候,StationReducer用NullWritable来替换记录的键(这类似于 TextOutputFormat丢掉NullWritable键),输出中只包含气象记录(没有气象站ID键).

最终的结果是每个气象站的所有记录都放在以气象站lD命名的文件里。在整个数 据集的子集上运行程序后,得到以下几行输出:

-rw-r--r-- 3 root supergroup 2887145 2009-04-17 10:34 /output/010010-99999
-rw-r--r-- 3 root supergroup 1395129 2009-04-17 10:33 /output/010050-99999
-rw-r--r-- 3 root supergroup 2054455 2009-04-17 10:33 /output/010100-99999
-rw-r--r-- 3 root supergroup 1422448 2009-04-17 10:34 /output/010280-99999
-rw-r--r-- 3 root supergroup 1419378 2009-04-17 10:34 /output/010550-99999

-rw-r--r--	3 root supergroup 1384421 2009-04-17 10:33 /output/010980-99999
-rw-r--r--	3 root supergroup 1480077 2009-04-17 10:33 /output/011060-99999
-rw-r--r--	3 noot supergroup 1400448 2009-04-17 10:33 /output/012030-99999
-rw-r--r--	3 root supergroup 307141  2009-04-17 10:34 /output/012350-99999
-rw-r--r--	3 root supergroup 1433994 2009-04-17 10:33 /output/012620-99999

generateFileNameForKeyValue()方法返回的文件名实际上是一个相对于输出目 录的路径。例如,以下的修改按照气象站来对数据分区,这样,每年的数据包含在 一个以气象站ID命名的目录中:

protected String genenateFileNameFonKeyValue(NullWnitable key,Text value,String name) {
    pansen.panse(value);
    return pansen.getStationId() + 'T + pansen.getYean();
}

MultipleOutputFormat还有很多特性没有在本书中讨论例如,为只有 map(map-only)的作业进行复制输入目录结构和文件命名等能力。详情请参考Java 文档。

MultipleOutputs

在Hadoop中另一个用于产生多个输出的库由MulipleOutputs类提供。与 MultipleOutputFormat类不一样的是,MulipleOutputs可以为不同的输出产 生不同的类型。另一方面,也意味着无法控制输出的命名。例7-6演示了如何使用 MultipleOutputs类根据气象站来切分数据。

例7-6.使用MultipleOutputs类将整个数据集切分为以气象站ID命名的文件

public class PantitionByStationUsingMultipleOutputs extends Configured implements Tool{
    static class StationMapper extends MapReduceBase implements Mappen<LongWritable,Text, Text, Text> 
    {
        private NcdcRecondPansen pansen = new NcdcRecondPansen();
        
        public void map(LongWritable key, Text value,OutputCollecton<Text, Text> output, Reporter reporter) throws IOException 
        {
            parser.panse(value);
            output.collect(new Text(parser.getStationId()), value);
        }
    }
    static class MultipleOutputsReducer extends MapReduceBase
    implements Reducer<Text, Text, NullWritable, Text> 
    {
        private MultipleOutputs multipleOutputs;

        @Override
        public void configure(JobConf conf) 
        {	
            multipleOutputs = new MultipleOutputs(conf);
        }
        public void reduce(Text key, Iterator,Text, values,OutputCollector<NullWritable, Text> output, Reporter reporter) throws IOException 
        {
            OutputCollector collector = multipleOutputs.getCollector("station",
            key,toString().replace("-", *"•), reporter);
            while (values.hasNext()) 
            {
                collector.collect(NullMritable.get(); values.next());
            }
        }
        @Override
        public void close() throws IOException 
        {
            multipleOutputs.close();
        }
    }

    @Override
    public int run(String[] args) throws IOException 
    {
    
        JobConf conf = JobBuilder.parseInputAndOutput(this, getConf(), args);
        if (conf == null) {
            return -1;
        }
        conf.setMapperClass(StationMapper.class);
        conf.setMapOutputKeyClass(Text.class);
        conf.setReducerClass(MultipleOutputsReducer.class);
        conf.setOutputKeyClass(NullWritable.class);
        conf.setOutputFormat(NullOutputFormat.class);// suppress empty part file
        MultipleOutputs.addMultiNamedOutput(conf, "station", TextOutputFormat.class,
        Nuliwritable.class, Text.class);
    
        JobClient.runJob(conf);
        return 0;
    }
    public static void main(String[] args) throws Exception 
    {
        int exitCode = ToolRunner.run(new PartitionByStationUsingMultipleOutputs(),args);
        System.exit(exitCode);
    }
}

MultipleOutputs类用于在原有输出基础上附加输出。输出是指定名称的,在生成输出的reducer中,在configure()方法中构造一个MultipleOutputs的 实例,并将它赋给一个实例变量。在reduce()方法中使用MultipleOutputs实 例为多名输出获取OutputCollector。getCollector方法接收输出名称(即 “station”)和一个标识part的字符串,该part标识符用于多命名输出。这里使 用气象站标识符,其中的“-”符号被忽略了,因为MultipleOutputs只能包含 字母字符。可以 写到一个文件(单名输出)或多个文件(多名输出)中。在这个例子中,我们需要多个 文件,每个气象站一个文件,所以我们使用多命名输出,调用MultipleOutputs 中的addMultiNamedOutput()方法来设定输出名称(这里是 “station”)、输出格 式及输出类型。另外,为了减少原有输出的影响,我们把正常的输出格式设为 NullOutputFormat。

最终的结果是生成多个输出文件,文件的命名模式是station_-r-。文件名中出现的r是因为这个文件是reducer产生的,紧跟 的part number确保写同一个气象站的输出时,来自不同分区扣£1以^)的结果不会 有冲突。由于按气象站进行分区,所以这里不会有冲突(但在一般情况会冲突)。

运行一次后,前面几个输出文件的命名如下(目录列表中的其他列已被删除):

/output/station_01001099999-r-00027
/output/station_01005099999-r-00013
/output/station_01010099999-r-00015
/output/station_01028099999-r-00014
/output/station_01055099999-r-00000
/output/station_01098099999-r-00011
/output/station_01106099999-r-00025
/output/station_01203099999-r-00029
/output/station_01235099999-r-00018
/output/station_01262099999-r-00004


延迟输出

File OutputFormat的子类会产生输出文件(part-nnnnn),即使文件是空的。有些 应用倾向于不创建空文件,此时LazyOutputFormat就有用了。它是一个封装输 出格式,可以保证指定分区第一条记录输出时才真正创建文件。要使用它,用JobConf和相关的输出格式作为参数来调用setOutputFormatClass()方法即可。

Streaming 和 Pipes 支持-LazyOutput 选项来启用 LazyOutputFormat 功能。


数据库输出

写到关系数据库和HBase狀的输出格式,请参见“数据库输入(和输出)” 小节。

关注微信获取最新动态