Hadoop教程

    排序

排序是MapReduce的核心技术。尽管应用本身可能并不需要对数据排序,但仍可 能使用MapReduce的排序功能来组织数据。本节将讨论几种不同的数据集排序方 法,以及如何控制MapReduce的排序。

准备

面将按气温字段对天气数据集排序。由于气温字段是有符号整数,所以不能将该 字段视为Text对象并以字典顺序排序。反之,我们要用顺序文件存储数据,其 IntWritable键代表气温(并且正确排序),其Text值就是数据行。

例8-3中的MapReduce作业只包含map任务,它过滤输人数据并移除包含有无效 气温的记录。各个map创建并输出一个块压缩的顺序文件。相关指令如下:

%hadoop jar job.jar SortDataPreprocessor input/ncdc/all input/ncdc/all-seq

例8-3.该MapReduce程序将天气数据转成顺序文件格式

public class SortDataPreprocessor extends Configured implements Tool 
{
    static class CleanerMapper extends MapReduceBase implements Mapper<LongWritable, Text, IntWritable, Text> 
    {
        private NcdcRecordParser parser = new NcdcRecordParser();
        public void map(LongWritable key Text value,OutputCollecton<IntWnitable, Text> output, Reporter reporter) throws IOException 
        {
            parser.panse(value);
            if (parser.isValidTempenatune()) 
            {
                output.collect(new IntWritable(parser.getAinTempenatune()), value);
            }
        }
    }
    @Override
    public int nun(String[] args) throws IOException 
    {
        JobConf conf = JobBuilder.panseInputAndOutput(this, getConf(), args);
        if (conf == null) {
            return -1;
        }
        conf.setMapperClass(CleanerMapper.class);
        conf.setOutputKeyClass(IntWnitable.class);
        conf.setOutputValueClass(Text.class);
        conf.setNumReduceTasks(0);
        conf.setOutputFonmat(SequenceFileOutputFormat.class);
        SequenceFileOutputFormat.setCompnessOutput(conf, true);
        SequenceFileOutputFormat.setOutputCompressorClass(conf, GzipCodec.class);
        SequenceFileOutputFormat.setOutputCompressionType(conf,
        CompressionType.BLOCK);

        JobClient.runJob(conf);
        return 0;
    }
    public static void main(String[] args) throws Exception 
    {
        int exitCode = ToolRunner.run(new SortDataPreprocessor(), args);
        System.exit(exitCode);
    }
}

部分排序

“默认的MapReduce作业”小节所述,在默认情况下,MapReduce 根据输入记录的键对数据集排序。例8-4则是一个变种,它利用IntWritable键 对顺序文件排序。

例8-4. MapReduce程序:调用默认的HashPartitioner根据Intwritable键对顺序 文件进行排序

public class SontByTempenatureUsingHashPartitioner extends Configured implements Tool {
    @Override
    public int run(String[] args) throws IOException 
    {
        JobConf conf = DobBuilder.parseInputAndOutput(this, getConf(), args);
        if (conf == null) 
        {
            return -1;
        }
        conf.setInputFopmat(SequenceFileInputFormat.class);
        conf.setOutputKeyClass(IntWritable.class);
        conf.setOutputFormat(SequenceFileOutputFormat.class);
        SequenceFileOutputFormat.setCompressOutput(conf, true);
        SequenceFileOutputFormat.setOutputCompressorClass(conf, GzipCodec.class);
        SequenceFileOutputFormat_ setOutputCompressionType(conf,
        CompressionType.BLOCK);

        JobClient.runDob(conf);
        return 0;
    }
    public static void main(String[] args) throws Exception 
    {
        int exitCode = ToolRunner.run(new SortByTemperatureUsingHashPartitioner(),args);
        System.exit(exitCode);
    }
}

假设采用30个reducer来运行该程序:

% hadoop jar job.jar SortByTemperatureUsingHashPartitioner \ -D mapred.reduce.tasks=B0 input/ncdc/all-seq output-hashsort

该指令产生30个已排好序的输出文件。但是如何将这些小文件合并成一个有序的 文件却并非易事。例如,直接将纯文本文件连接起来无法保证全局有序。幸运的 是,许多应用并不强求待处理的文件全局有序。例如,对于查找操作来说,部分排 序的文件就已经足够了。

应用:基于分区的MapFile查找技术

以按键执行查找操作为例,在多文件情况下效率更髙。在例8-5中,输出格式被改 SMapFileOutputFormat,则会输出30个map文件,我们可以基于这些文件执 行查找操作。

例8-5.该MapReduce程序对一个顺序文件排序并输出MapFile

public class SortByTempenatureToMapFile extends Configured implements Tool {
    @Ovennide
    public int nun(Stning[] args) throws IOException {
        JobConf conf = DobBuilden.panseInputAndOutput(this, getConf(), angs);
        if (conf == null) {
            return -1;
        }
        conf.setInputFonmat(SequenceFileInputFonmat.class);
        conf.setOutputKeyClass(IntWritable.class);
        conf.setOutputFormat(MapFileOutputFormat.class);
        SequenceFileOutputFormat•setCompressOutput(conf, true);
        SequenceFileOutputFormat.setOutputCompnessonClass(conf, GzipCodec.class);
        SequenceFileOutputFormat.setOutputCompnessionType(conf,
        CompressionType.BLOCK);
        DobClient.runJob(conf);
        return 0;
    }
    public static void main(String[] angs) throws Exception 
    {
        int exitCode = ToolRunner.run(new SontByTemperatuneToMapFile(), angs);
        System.exit(exitCode);
    }
}

MapFileOutputFormat提供了两个很方便的静态方法,用于对MapReduce输出文 件执行査找操作,其用法如例8-6所示。

例8-6.从MapFiles集合中获取符合指定键的第一项记录

public class LookupRecordByTemperature extends Configured implements Tool {
    @Override
    public int nun(String[] angs) throws Exception {
        if (args.length != 2) 
        {
            JobBuilden.pnintUsage(this, "<path> <key>");
            return -1;
        }
        Path path = new Path(args[0]);
        IntWritable key = new IntWnitable(Integen.panseInt(angs[l]));
        FileSystem fs = path.getFileSystem(getConf());
        Readen[] readers = MapFileOutputR>rmat.getReaders(fs, path, getConf());
        Pantitionen<IntWnitable, Text> pantitionen = new HashPantitionen<IntWnitable, Text>();
        Text val = new Text();
        Writable entry =
        MapFileOutputFonmat.getEntny(neaders, pantitionen, key, val); 
        if (entry == null) 
        {
            System.enn.pnintln("Key not found: " + key);
            return -1;
        }
        NcdcRecordParser pansen = new NcdcRecondPansen(); pansen.panse(val.toStning());
        System.out.pnintf("%s\t%s\n", pansen.getStationId(), pansen.getYean());
         return 0;
     }
    public static void main(Stning[] angs) throws Exception 
    {
        int exitCode = ToolRunner.run(new LookupRecondByTempenatune(),angs);
        System.exit(exitCode);
    }
}

getReaders()方法为MapReduce作业创建的每个输出文件分别打开一个 MapFile.Reader实例,用一个reader数组(即reader)表示。之后,getEntry()方法使用partitioner找到包含指定键的Reader实例,再通过该Reader实例的get()方法得到这个键对应的值(val)。如果getEntry()返回null,则表明没有找 到匹配的键。否则,返回一个描述对应气象站山和年份的值。

举例来说,若要查找首条气温为-101:的记录(记住,气温字段是整数类型,其值是 真实气温的10倍。例如,_101被记为-100),则有:

% hadoop jar job.jar LookupRecordByTemperature output-hashmapsort -100
357460-99999 1956

我们还可以直接用readers数组来获得包含指定键的所有记录。readers数组按 分区排序,因而针对一个指定键的reader,均使用MapReduce作业中的同一个

 partitioner:
Reader reader = readers[partitioner.getPartition(key, val, readers.length)];

找到reader之后,可通过MapFile的get()方法获取第一条包含指定键的记录。 接着,循环调用next()获取下一条记录,直到键改变为止。相关程序如例8-7所示。

例8-7.从一个MapFiles集合中获取包含指定键的所有记录

public class LookupRecondsByTempenatune extends Configured implements Tool {
    @Override
    public int nun(Stning[] angs) throws Exception 
    { 
        if (args.length != 2) {
            JobBuilden.pnintUsage(this, "<path> <key>");
            return -1;
        }
        Path path = new Path(args[0]);
        IntWritable key = new IntWritable(Integer.parseInt(args[l]));
        FileSystem fs = path.getFileSystem(getConf());

        Reader[] readers = MapFileOutputFormat.getReaders(fs, path, getConf());
        Partitioner<IntWnitable, Text> partitioner = new HashPartitionen<IntWritable, Text>();
        Text val = new Text();

        Reader reader = readers[partitioner.getPartition(key, val, readers.length)];
        Writable entry = reader.get(keyj val);
        if (entry == null) {
            System.err.println("Key not found: " + key);
            return -1;
        }
        NcdcRecordPanser parser = new NcdcRecondParsen();
        IntWritable nextKey = new IntWritable();
        do {
            panser.parse(val.toString());
            System.out.printf("%s\t%s\n", parser.getStationId(), parser.getYear());
        } while(reader.next(nextKey, val) && key.equals(nextKey));
        return 0;
    }

    public static void main(Stning[] args) throws Exception 
    {
        int exitCode = ToolRunner.run(new LookupRecordsByTemperatune(), angs);
        System.exit(exitCode);
    }
}

下例描述如何获取所有_10℃的读数,并计数:

% hadoop jar job.jar LookupRecordsByTemperature output-hashniapsort -100 \
2> /dev/null | wc -1
1489272

全排序

如何用Hadoop产生一个全局排序的文件?最简单的方法是使用一个分区(a single partition)。但该方法在处理大型文件时效率极低,因为一台机器必须处理所有输 出文件,从而完全丧失了 MapReduCe所提供的并行架构的优势。

事实上仍有替代方案:首先,创建一系列排好序的文件,其次,串联这些文件,最 后,生成一个全局排序的文件。主要的思路是使用一个partitinoner来描述全局排序 的输出。例如,可以为上述文件创建4个分区,在第一分区中,各记录的气温小 于10℃,第二分区的气温介于-10℃和0℃之间,第三个分区的气温在0℃和10℃之间,最后一个分区的气温大于10℃。

该方法的关键点在于如何划分各个分区。理想情况下,各分区所含记录数应该大致 相等,使作业的总体执行时间不会受制于个别reducer。在前面提到的分区方案 中,各分区的相对大小如下所示。

气温范围<-10℃[-10℃,0℃][0℃,10℃]≥10℃
记录所占的比例11%13%17%59%

显然,记录没有被均匀分开。因此,需要深入了解整个数据集的气温分布才能建立 更均勻的分区。写一个MapReduce作业计算落入各个气温桶的记录数比较容易。 例如,图8-1显示了桶大小为1℃时各桶的分布情况,各点分别对应一个桶。

图8-1.天气数据集合的气温分布

获得气温分布信息意味着可以建立一系列分布更均匀的分区。但由于这个操作需要 遍历整个数据集,因此并不实用。通过对键空间进行采样,就可较为均匀地划分数 据集。采样的核心思想是只查看一小部分键,获得键的近似分布,并由此构建分 区。幸运的是,Hadoop已经内置了若干采样器,不需要用户自己编写。

InputSampler类实现了Sampler接口,该接口的唯一成员方法(即getSampler) 有两个输入参数(一个InputFormat对象和一个JobConf对象),返回一系列样本键:

public interface Sampler<K,V> {
K[] getSample(InputFormat<K,V> inf,JobConf job) throws IOException;
}

这个接口通常不直接由客户端调用,而是由InputSampler类的静态方法 writePartitionFile()调用,目的是创建一个顺序文件来存储定义分区的键。

public static <K,V> void writePartitionFile(JobConf job,
Sampler<K,V> sampler) throws IOException;

顺序文件供TotalOrderPartitioner使用,为排序作业创建分区。例8-8整合了 上述内容。

例8-8.该MapReduce程序利用TotalOrderPartitioner根据IntWritable键对顺序文件进 行全局排序

public class SortByTemperatureUsingTotalOrderPartitioner extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception 
    {
        JobConf conf = JobBuilder.parseInputAndOutput(this, getConf(), args);
        if (conf == null) {
        return -1:
        }
        conf.setInputFormat(SequenceFileInputFormat.class);
        conf_setOutputKeyClass(IntWritable.class);
        conf.setOutputFormat(SequenceFileOutputFormat.class);
        SequenceFileOutputFormat.setCompressOutput(conf, true);
        SequenceFileOutputFormat.setOutputCompressorClass(conf,
        SequenceFileOutputFormat.setOutputCompressionType(conf,
        CompressionType.BLOCK);
        GzipCodec.class);

        conf.setPartitionerClass(TotalOrderPartitioner.class);
        InputSampler.Sampler<IntWritable, Text> sampler =
        new InputSampler.RandomSampler<IntWritable, Text>(0.1, 10000, 10);
        Path input = FileInputFormat.getInputPaths(conf)[0];
        input = input.makeQualified(input.getFileSystem(conf));
        Path partitionFile = new Path(input, "_partitions");

        TotalOndenPantitionen.setPantitionFile(conf, partitionFile);
        InputSamplen.wnitePantitionFiIe(conf, sampler);
        // Add to DistnibutedCache
        URI pantitionUni=new URI(pantitionFile.toString()+"#_partitions");
        DistnibutedCache.addCacheFile(pantitionUni, conf);
        DistnibutedCache.cneateSymlink(conf);
        JobClient.runJob(conf);
        return 0;
    }
    public static void main(String[] angs) throws Exception 
    {
        int exitCode = ToolRunner.run(
        new SortByTempenatuneLlsingTotalOrderPartitioner(),args);
        System.exit(exitCode);
    }
}

该程序使用了一个RandomSampler,以指定的采样率均匀地从一个数据集中选择 样本。在本例中,采样率被设为0.1。 RamdomSanpler的输入参数还包括最大样本 数和最大分区(本例中这两个参数分别是10 000和10,这也是InputSampler作为 应用程序运行时的默认设置),只要满足其中任意一个限制条件,即停止采样。采 样器在客户端运行,因此,限制分片的下载数量以加速采样器的运行就显得尤为重 要。在实践中,采样器的运行时间仅占作业总运行时间的一小部分。

InputSampler写的分区文件被命名为_partitions,放在输入目录中(由于该文件以“_” 打头,所以不会被选为输入文件)。为了和集群上运行的其他任务共享分区文件, 还需将该文件就载入到分布式缓存。

以下显示了分别以_5.6℃、13.9℃和22.0℃为分区边界得到的4个分区及其所占比 例。易知,新方案比旧方案更为均匀。

气温范围<5.6℃[-5.6℃,13.9℃][13.9℃,22℃]≥22℃
记录所占的比例29%24%23%24%

输入数据的特性决定着如何挑选最合适的采样器。以SplitSampler为例,它只 采样一个分片中前n条记录。由于并未从所有分片中广泛采样,因而该采样器并不适合已经排好序的数据。

Hadoop内置的采样器还有IntervalSample,它以一定的间隔定期从划分中选择 键,因此对于已排好序的数据来说是一个更好的选择。RandomSampler是优秀的 通用采样器。如果疫有采样器可以满足应用需求(记住,采样目的是创建大小均匀 的一系列分区),则只能写程序来实现Sampler接口。

InputSampler类和TotalOrderPartitioner类的一个好特性是用户可以自由定 义分区数。该值通常取决于集群上reducer任务槽的数量(该值需稍小于reducer任 务槽的总数,以应付可能出现的故障)。由于TotalOrderPartitioner只用于分区边 界均不相同的时候,因此,当键空间较小时,设置太大的分区数可能会导致数据 冲突。

以下是运行方式:

% hadoop jar job.jar SortByTemperatureUsingTotalOrderPartitioner \
-D mapred.reduce.tasks=30 input/ncdc/all-seq output-totalsort

该程序输出30个内部已经排好序的分区。此外,分区i中的所有键都小于分区i+1 中的键。


辅助排序

MapReduce框架在记录到达reducer之前按键对记录排序,但键所对应的值并没有 被排序。由于值来自不同的map任务,所以在多次运行程序时,值的出现顺序并 不固定,导致每次运行作业的时间会各有不同。一般来说,大多数MapReduce程 序无需考虑值在reduce函数中的出现顺序。但是,有时也需要通过对键进行排序 和分组等以实现对值的排序。

例如,考虑如何设计一个MapReduce程序以计算每年最高气温。如果全部记录均 按照气温降序排列,则无需遍历整个数据集即可获得查询结果——获取各年份的首 条记录并忽略剩余记录。该方法并不是最佳方案,但演示了辅助排序的工作机理。

为此,首先构建一个同时包含年份和气温信息的组合键,期望所有记录先按年份升 序排序,再按气温降序排列:

1900 35℃
1900 34℃
1900 34℃
...
1901 36℃
1901 35℃

下面,对记录值的排序方法做一个总结。

  • 定义包括自然键和自然值的组合键。

  • 键的comparator根据组合键对记录进行排序,即同时利用自然键和自然值进 行排序。

  • 针对组合键的partitioner和分组comparator在进行分区和分组时均只考虑 自然键。

  • Java代码

    综合起来便得到例8-9中的源代码,该程序再一次使用了纯文本输入。

    例8-9.该应用程序通过对键中的气温进行排序来找出最高气温

    public class MaxTemperatureUsingSecondarySort extends Configured implements Tool {
    static class MaxTemperatureMapper extends MapReduceBase
    
    implements Mapper<LongWritable, Text, IntPair, NullWritable> {
    private NcdcRecordParser parser = new NcdcRecordParser();
    public void map(LongWnitable key, Text value;
    OutputCollector<IntPair, NullWritable> output, Reporter reporter)
    throws IOException {
    parser.parse(value);
    if (pansen.isValidTemperatune()) {
    output.collect(new IntPair(parser.getYearInt(),
    + parser.getAirTemperature()), NullWritable.get());
    }
    }
    }
    static class MaxTemperatureReducer extends MapReduceBase
    implements Reducen<IntPair, NullWnitable, IntPair, NullWritable> {
    public void neduce(IntPair key, Iterator<NullWritable> values, OutputCollector<IntPair, NullWritable> output, Reporter reporter)
    throws IOException {
    
    output.collect(key, NullMritable.get());
    public static class FirstPartitioner
    implements Partitioner<IntPair, NullWritable> {
    
    @Override
    public void configure(DobConf job) {}
    
    @Override
    public int getPartition(IntPair key, NullWritable value) int numPartitions) {
    return Math.abs(key.getFirst() *127) % numPartitions;
    }
    }
    
    public static class KeyComparator extends WritableComparator
    protected KeyComparator() {
    super(IntPair.class, true);
    }
    @Override
    public int compare(WritableComparable wl, WritableComparable w2) {
    IntPair ipl = (IntPair) w1;
    IntPair ip2 = (IntPair) w2;
    int cmp = IntPair.compare(ipl.getFirst(), ip2.getFirst());
    if (cmp != 0) { return cmp;
    }
    return -IntPair.compare(ipl.getSecond(), ip2.getSecond()); //reverse
    
    public static class GroupComparator extends WritableComparator(
     protected GroupComparator() {
    super(IntPair.class, true);
    }
    @Override
    public int compare(WritableComparable w1, WritableComparable w2) { IntPair ip1 = (IntPair) w1;
    IntPair ip2 = (IntPair) w2;
    return IntPair.compare(ipl.getFirst(), ip2.getFirst());
    }
    }
    @Override
    public int run(String[] args) throws IOException {
    JobConf conf = ]obBuilder.parseInputAndOutput(this, getConf(), args);
     if (conf == null) {
     return -1:
    }
    conf.setMapperClass(MaxTemperatureMapper.class);
    conf.setPartitionerClass(FirstPartitioner.class);
    conf.setOutputKeyComparatorClass(KeyComparator.class);
    conf.setOutputValueGroupingComparator(GroupComparator.class);
    conf.setReducerClass(MaxTemperatureReducer.class);
    conf.setOutputKeyClass(IntPair.class);
    conf.setOutputValueClass(NullWritable.class);
    
    JobClient.runJob(conf);
    return 0:
    }
    public static void main(String[] args) throws Exception {
    int exitCode = ToolRunner.run(new MaxTemperatureUsingSecondarySort(), args);
     Stystem.exit(exitCode);
    }

    在mapper中利用IntPair类定义了一个代表年份和气温的组合键,该类实现了 Writable接口。IntPair与TextPair类相似。由于可以根据各reducer的组合键获得最髙气温,因此无需 附加其他信息,使用NullWritable即可。根据辅助排序,^(^^[输出的第一个 键就是包含年份和最高气温信息的IntPair对象。IntPair的toString()方法 返回一个以制表符分隔的字符串,因而该程序输出一组由制表符分隔的年份/ 气温对。

    Streaming

    可以借助Hadoop所提供的一组库来实现Streaming的辅助排序,下面的驱动可以 用来进行辅助排序:

    hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \
    -D stream.num.map.output.key.fields=2 \
    -D mapred.text.key.partitioner.options=-k1,1 \
    -D mapned.output.key.comparator.class=\ org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \
    -D mapned.text.key.comparator.options="-k1n -k2n" \
    -input input/ncdc/all \
    -output output_secondarysort_stneaming \
    -mapper ch08/src/main/python/secondary_sort_map.py \
    -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \
    -reducer ch08/src/main/python/secondary_sort_reduce.py \
    -file ch08/src/main/python/secondary_sort_map.py \
    -file ch08/snc/main/python/secondary_sort_reduce.py

    例8-10中的map函数输出年份和气温两个字段。为了将这两个字段看成一个组合 键,需要将stream.num.map.output.key.fields的值设为2。这意味着值可以 是空的,就像Java程序(例8-9)一样。

    例8-10.针对辅助排序的map函数(Python版本)

    #!/usr/bin/env python
    import re
    import sys
    for line in sys.stdin:
    val = line.strip()
    (year, temp, q) = (val[15:19], int(val[87:92]), val[92:9B])
     if temp == 9999:
    sys.stderr.write("reporter:counter:Temperature,Missing,l\n")
     elif re.match("[01459]", q):
    print "%s\t%s" % (year, temp)

    鉴于我们并不期望根据整个组合键来划分数据集,因此可以利用 KeyFieldBasedPatitioner类以组合键的一部分进行划分0可以使用 mapred.text.key.partitioner,options 配置这个 partitioner。在上例中,值- kl,l表示该partitioner只使用组合键的第一个字段。map.output.key. field.separator属性所定义的字符串能分隔各个字段(默认情况下,是制表符)。

    接下来,我们还需要一个比较器以对年份字段升序排列、对气温字段降序排列,使 reduce函数能够方便地返回各组中的第一个记录。 Hadoop提供的 KeyFieldBasedComparator 类能有效解决这个问题。该类通过 mapred.text.key.comparator.options属性来设置排列次序,其格式规范与 GNU sort类似。本例中的-k1n-k2nr选项表示“首字段按数值顺序排序,字段按数 值顺序反向排序”。与 KeyFieldBasedPartitioner 类似,KeyFieldBasedComparator 也采用在map.output.key.field.separator中定义的分隔符将一个组合键划 分成多个字段。

    Java版本的程序需要定义分组comparator。但是在Streaming中,组并未以任何方 式划分,因此必须在reduce函数中不断地查看年份是否改变来检测组的边界(例8-11)。

    例8-11.针对辅助排序的reducer函数(Python版本)

    #!/usr/bin/env python
    
    import sys
    
    last_group = None
    for line in sys.stdin:
    val = line.strip()
    (yeaTj temp) = val.split("\t")
    group = year
    if last_group != group:
    print val
    last_group = group

    运行此程序之后,得到的结果与Java版本一样。

    最后谨记,KeyFieldBasedPartitioner 和 KeyFieldBasedComparator 不仅在 Streaming程序中使用,也能够在Java MapReduce程序中使用。

关注微信获取最新动态