排序
排序是MapReduce的核心技术。尽管应用本身可能并不需要对数据排序,但仍可 能使用MapReduce的排序功能来组织数据。本节将讨论几种不同的数据集排序方 法,以及如何控制MapReduce的排序。
准备
面将按气温字段对天气数据集排序。由于气温字段是有符号整数,所以不能将该 字段视为Text对象并以字典顺序排序。反之,我们要用顺序文件存储数据,其 IntWritable键代表气温(并且正确排序),其Text值就是数据行。
例8-3中的MapReduce作业只包含map任务,它过滤输人数据并移除包含有无效 气温的记录。各个map创建并输出一个块压缩的顺序文件。相关指令如下:
%hadoop jar job.jar SortDataPreprocessor input/ncdc/all input/ncdc/all-seq
例8-3.该MapReduce程序将天气数据转成顺序文件格式
public class SortDataPreprocessor extends Configured implements Tool { static class CleanerMapper extends MapReduceBase implements Mapper<LongWritable, Text, IntWritable, Text> { private NcdcRecordParser parser = new NcdcRecordParser(); public void map(LongWritable key Text value,OutputCollecton<IntWnitable, Text> output, Reporter reporter) throws IOException { parser.panse(value); if (parser.isValidTempenatune()) { output.collect(new IntWritable(parser.getAinTempenatune()), value); } } } @Override public int nun(String[] args) throws IOException { JobConf conf = JobBuilder.panseInputAndOutput(this, getConf(), args); if (conf == null) { return -1; } conf.setMapperClass(CleanerMapper.class); conf.setOutputKeyClass(IntWnitable.class); conf.setOutputValueClass(Text.class); conf.setNumReduceTasks(0); conf.setOutputFonmat(SequenceFileOutputFormat.class); SequenceFileOutputFormat.setCompnessOutput(conf, true); SequenceFileOutputFormat.setOutputCompressorClass(conf, GzipCodec.class); SequenceFileOutputFormat.setOutputCompressionType(conf, CompressionType.BLOCK); JobClient.runJob(conf); return 0; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new SortDataPreprocessor(), args); System.exit(exitCode); } }
部分排序
“默认的MapReduce作业”小节所述,在默认情况下,MapReduce 根据输入记录的键对数据集排序。例8-4则是一个变种,它利用IntWritable键 对顺序文件排序。
例8-4. MapReduce程序:调用默认的HashPartitioner根据Intwritable键对顺序 文件进行排序
public class SontByTempenatureUsingHashPartitioner extends Configured implements Tool { @Override public int run(String[] args) throws IOException { JobConf conf = DobBuilder.parseInputAndOutput(this, getConf(), args); if (conf == null) { return -1; } conf.setInputFopmat(SequenceFileInputFormat.class); conf.setOutputKeyClass(IntWritable.class); conf.setOutputFormat(SequenceFileOutputFormat.class); SequenceFileOutputFormat.setCompressOutput(conf, true); SequenceFileOutputFormat.setOutputCompressorClass(conf, GzipCodec.class); SequenceFileOutputFormat_ setOutputCompressionType(conf, CompressionType.BLOCK); JobClient.runDob(conf); return 0; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new SortByTemperatureUsingHashPartitioner(),args); System.exit(exitCode); } }
假设采用30个reducer来运行该程序:
% hadoop jar job.jar SortByTemperatureUsingHashPartitioner \ -D mapred.reduce.tasks=B0 input/ncdc/all-seq output-hashsort
该指令产生30个已排好序的输出文件。但是如何将这些小文件合并成一个有序的 文件却并非易事。例如,直接将纯文本文件连接起来无法保证全局有序。幸运的 是,许多应用并不强求待处理的文件全局有序。例如,对于查找操作来说,部分排 序的文件就已经足够了。
应用:基于分区的MapFile查找技术
以按键执行查找操作为例,在多文件情况下效率更髙。在例8-5中,输出格式被改 SMapFileOutputFormat,则会输出30个map文件,我们可以基于这些文件执 行查找操作。
例8-5.该MapReduce程序对一个顺序文件排序并输出MapFile
public class SortByTempenatureToMapFile extends Configured implements Tool { @Ovennide public int nun(Stning[] args) throws IOException { JobConf conf = DobBuilden.panseInputAndOutput(this, getConf(), angs); if (conf == null) { return -1; } conf.setInputFonmat(SequenceFileInputFonmat.class); conf.setOutputKeyClass(IntWritable.class); conf.setOutputFormat(MapFileOutputFormat.class); SequenceFileOutputFormat•setCompressOutput(conf, true); SequenceFileOutputFormat.setOutputCompnessonClass(conf, GzipCodec.class); SequenceFileOutputFormat.setOutputCompnessionType(conf, CompressionType.BLOCK); DobClient.runJob(conf); return 0; } public static void main(String[] angs) throws Exception { int exitCode = ToolRunner.run(new SontByTemperatuneToMapFile(), angs); System.exit(exitCode); } }
MapFileOutputFormat提供了两个很方便的静态方法,用于对MapReduce输出文 件执行査找操作,其用法如例8-6所示。
例8-6.从MapFiles集合中获取符合指定键的第一项记录
public class LookupRecordByTemperature extends Configured implements Tool { @Override public int nun(String[] angs) throws Exception { if (args.length != 2) { JobBuilden.pnintUsage(this, "<path> <key>"); return -1; } Path path = new Path(args[0]); IntWritable key = new IntWnitable(Integen.panseInt(angs[l])); FileSystem fs = path.getFileSystem(getConf()); Readen[] readers = MapFileOutputR>rmat.getReaders(fs, path, getConf()); Pantitionen<IntWnitable, Text> pantitionen = new HashPantitionen<IntWnitable, Text>(); Text val = new Text(); Writable entry = MapFileOutputFonmat.getEntny(neaders, pantitionen, key, val); if (entry == null) { System.enn.pnintln("Key not found: " + key); return -1; } NcdcRecordParser pansen = new NcdcRecondPansen(); pansen.panse(val.toStning()); System.out.pnintf("%s\t%s\n", pansen.getStationId(), pansen.getYean()); return 0; } public static void main(Stning[] angs) throws Exception { int exitCode = ToolRunner.run(new LookupRecondByTempenatune(),angs); System.exit(exitCode); } }
getReaders()方法为MapReduce作业创建的每个输出文件分别打开一个 MapFile.Reader实例,用一个reader数组(即reader)表示。之后,getEntry()方法使用partitioner找到包含指定键的Reader实例,再通过该Reader实例的get()方法得到这个键对应的值(val)。如果getEntry()返回null,则表明没有找 到匹配的键。否则,返回一个描述对应气象站山和年份的值。
举例来说,若要查找首条气温为-101:的记录(记住,气温字段是整数类型,其值是 真实气温的10倍。例如,_101被记为-100),则有:
% hadoop jar job.jar LookupRecordByTemperature output-hashmapsort -100 357460-99999 1956
我们还可以直接用readers数组来获得包含指定键的所有记录。readers数组按 分区排序,因而针对一个指定键的reader,均使用MapReduce作业中的同一个
partitioner: Reader reader = readers[partitioner.getPartition(key, val, readers.length)];
找到reader之后,可通过MapFile的get()方法获取第一条包含指定键的记录。 接着,循环调用next()获取下一条记录,直到键改变为止。相关程序如例8-7所示。
例8-7.从一个MapFiles集合中获取包含指定键的所有记录
public class LookupRecondsByTempenatune extends Configured implements Tool { @Override public int nun(Stning[] angs) throws Exception { if (args.length != 2) { JobBuilden.pnintUsage(this, "<path> <key>"); return -1; } Path path = new Path(args[0]); IntWritable key = new IntWritable(Integer.parseInt(args[l])); FileSystem fs = path.getFileSystem(getConf()); Reader[] readers = MapFileOutputFormat.getReaders(fs, path, getConf()); Partitioner<IntWnitable, Text> partitioner = new HashPartitionen<IntWritable, Text>(); Text val = new Text(); Reader reader = readers[partitioner.getPartition(key, val, readers.length)]; Writable entry = reader.get(keyj val); if (entry == null) { System.err.println("Key not found: " + key); return -1; } NcdcRecordPanser parser = new NcdcRecondParsen(); IntWritable nextKey = new IntWritable(); do { panser.parse(val.toString()); System.out.printf("%s\t%s\n", parser.getStationId(), parser.getYear()); } while(reader.next(nextKey, val) && key.equals(nextKey)); return 0; } public static void main(Stning[] args) throws Exception { int exitCode = ToolRunner.run(new LookupRecordsByTemperatune(), angs); System.exit(exitCode); } }
下例描述如何获取所有_10℃的读数,并计数:
% hadoop jar job.jar LookupRecordsByTemperature output-hashniapsort -100 \ 2> /dev/null | wc -1 1489272
全排序
如何用Hadoop产生一个全局排序的文件?最简单的方法是使用一个分区(a single partition)。但该方法在处理大型文件时效率极低,因为一台机器必须处理所有输 出文件,从而完全丧失了 MapReduCe所提供的并行架构的优势。
事实上仍有替代方案:首先,创建一系列排好序的文件,其次,串联这些文件,最 后,生成一个全局排序的文件。主要的思路是使用一个partitinoner来描述全局排序 的输出。例如,可以为上述文件创建4个分区,在第一分区中,各记录的气温小 于10℃,第二分区的气温介于-10℃和0℃之间,第三个分区的气温在0℃和10℃之间,最后一个分区的气温大于10℃。
该方法的关键点在于如何划分各个分区。理想情况下,各分区所含记录数应该大致 相等,使作业的总体执行时间不会受制于个别reducer。在前面提到的分区方案 中,各分区的相对大小如下所示。
气温范围 | <-10℃ | [-10℃,0℃] | [0℃,10℃] | ≥10℃ |
---|---|---|---|---|
记录所占的比例 | 11% | 13% | 17% | 59% |
显然,记录没有被均匀分开。因此,需要深入了解整个数据集的气温分布才能建立 更均勻的分区。写一个MapReduce作业计算落入各个气温桶的记录数比较容易。 例如,图8-1显示了桶大小为1℃时各桶的分布情况,各点分别对应一个桶。
图8-1.天气数据集合的气温分布
获得气温分布信息意味着可以建立一系列分布更均匀的分区。但由于这个操作需要 遍历整个数据集,因此并不实用。通过对键空间进行采样,就可较为均匀地划分数 据集。采样的核心思想是只查看一小部分键,获得键的近似分布,并由此构建分 区。幸运的是,Hadoop已经内置了若干采样器,不需要用户自己编写。
InputSampler类实现了Sampler接口,该接口的唯一成员方法(即getSampler) 有两个输入参数(一个InputFormat对象和一个JobConf对象),返回一系列样本键:
public interface Sampler<K,V> { K[] getSample(InputFormat<K,V> inf,JobConf job) throws IOException; }
这个接口通常不直接由客户端调用,而是由InputSampler类的静态方法 writePartitionFile()调用,目的是创建一个顺序文件来存储定义分区的键。
public static <K,V> void writePartitionFile(JobConf job, Sampler<K,V> sampler) throws IOException;
顺序文件供TotalOrderPartitioner使用,为排序作业创建分区。例8-8整合了 上述内容。
例8-8.该MapReduce程序利用TotalOrderPartitioner根据IntWritable键对顺序文件进 行全局排序
public class SortByTemperatureUsingTotalOrderPartitioner extends Configured implements Tool { @Override public int run(String[] args) throws Exception { JobConf conf = JobBuilder.parseInputAndOutput(this, getConf(), args); if (conf == null) { return -1: } conf.setInputFormat(SequenceFileInputFormat.class); conf_setOutputKeyClass(IntWritable.class); conf.setOutputFormat(SequenceFileOutputFormat.class); SequenceFileOutputFormat.setCompressOutput(conf, true); SequenceFileOutputFormat.setOutputCompressorClass(conf, SequenceFileOutputFormat.setOutputCompressionType(conf, CompressionType.BLOCK); GzipCodec.class); conf.setPartitionerClass(TotalOrderPartitioner.class); InputSampler.Sampler<IntWritable, Text> sampler = new InputSampler.RandomSampler<IntWritable, Text>(0.1, 10000, 10); Path input = FileInputFormat.getInputPaths(conf)[0]; input = input.makeQualified(input.getFileSystem(conf)); Path partitionFile = new Path(input, "_partitions"); TotalOndenPantitionen.setPantitionFile(conf, partitionFile); InputSamplen.wnitePantitionFiIe(conf, sampler); // Add to DistnibutedCache URI pantitionUni=new URI(pantitionFile.toString()+"#_partitions"); DistnibutedCache.addCacheFile(pantitionUni, conf); DistnibutedCache.cneateSymlink(conf); JobClient.runJob(conf); return 0; } public static void main(String[] angs) throws Exception { int exitCode = ToolRunner.run( new SortByTempenatuneLlsingTotalOrderPartitioner(),args); System.exit(exitCode); } }
该程序使用了一个RandomSampler,以指定的采样率均匀地从一个数据集中选择 样本。在本例中,采样率被设为0.1。 RamdomSanpler的输入参数还包括最大样本 数和最大分区(本例中这两个参数分别是10 000和10,这也是InputSampler作为 应用程序运行时的默认设置),只要满足其中任意一个限制条件,即停止采样。采 样器在客户端运行,因此,限制分片的下载数量以加速采样器的运行就显得尤为重 要。在实践中,采样器的运行时间仅占作业总运行时间的一小部分。
InputSampler写的分区文件被命名为_partitions,放在输入目录中(由于该文件以“_” 打头,所以不会被选为输入文件)。为了和集群上运行的其他任务共享分区文件, 还需将该文件就载入到分布式缓存。
以下显示了分别以_5.6℃、13.9℃和22.0℃为分区边界得到的4个分区及其所占比 例。易知,新方案比旧方案更为均匀。
气温范围 | <5.6℃ | [-5.6℃,13.9℃] | [13.9℃,22℃] | ≥22℃ |
---|---|---|---|---|
记录所占的比例 | 29% | 24% | 23% | 24% |
输入数据的特性决定着如何挑选最合适的采样器。以SplitSampler为例,它只 采样一个分片中前n条记录。由于并未从所有分片中广泛采样,因而该采样器并不适合已经排好序的数据。
Hadoop内置的采样器还有IntervalSample,它以一定的间隔定期从划分中选择 键,因此对于已排好序的数据来说是一个更好的选择。RandomSampler是优秀的 通用采样器。如果疫有采样器可以满足应用需求(记住,采样目的是创建大小均匀 的一系列分区),则只能写程序来实现Sampler接口。
InputSampler类和TotalOrderPartitioner类的一个好特性是用户可以自由定 义分区数。该值通常取决于集群上reducer任务槽的数量(该值需稍小于reducer任 务槽的总数,以应付可能出现的故障)。由于TotalOrderPartitioner只用于分区边 界均不相同的时候,因此,当键空间较小时,设置太大的分区数可能会导致数据 冲突。
以下是运行方式:
% hadoop jar job.jar SortByTemperatureUsingTotalOrderPartitioner \ -D mapred.reduce.tasks=30 input/ncdc/all-seq output-totalsort
该程序输出30个内部已经排好序的分区。此外,分区i中的所有键都小于分区i+1 中的键。
辅助排序
MapReduce框架在记录到达reducer之前按键对记录排序,但键所对应的值并没有 被排序。由于值来自不同的map任务,所以在多次运行程序时,值的出现顺序并 不固定,导致每次运行作业的时间会各有不同。一般来说,大多数MapReduce程 序无需考虑值在reduce函数中的出现顺序。但是,有时也需要通过对键进行排序 和分组等以实现对值的排序。
例如,考虑如何设计一个MapReduce程序以计算每年最高气温。如果全部记录均 按照气温降序排列,则无需遍历整个数据集即可获得查询结果——获取各年份的首 条记录并忽略剩余记录。该方法并不是最佳方案,但演示了辅助排序的工作机理。
为此,首先构建一个同时包含年份和气温信息的组合键,期望所有记录先按年份升 序排序,再按气温降序排列:
1900 35℃ 1900 34℃ 1900 34℃ ... 1901 36℃ 1901 35℃
下面,对记录值的排序方法做一个总结。
定义包括自然键和自然值的组合键。
键的comparator根据组合键对记录进行排序,即同时利用自然键和自然值进 行排序。
针对组合键的partitioner和分组comparator在进行分区和分组时均只考虑 自然键。
Java代码
综合起来便得到例8-9中的源代码,该程序再一次使用了纯文本输入。
例8-9.该应用程序通过对键中的气温进行排序来找出最高气温
public class MaxTemperatureUsingSecondarySort extends Configured implements Tool { static class MaxTemperatureMapper extends MapReduceBase implements Mapper<LongWritable, Text, IntPair, NullWritable> { private NcdcRecordParser parser = new NcdcRecordParser(); public void map(LongWnitable key, Text value; OutputCollector<IntPair, NullWritable> output, Reporter reporter) throws IOException { parser.parse(value); if (pansen.isValidTemperatune()) { output.collect(new IntPair(parser.getYearInt(), + parser.getAirTemperature()), NullWritable.get()); } } } static class MaxTemperatureReducer extends MapReduceBase implements Reducen<IntPair, NullWnitable, IntPair, NullWritable> { public void neduce(IntPair key, Iterator<NullWritable> values, OutputCollector<IntPair, NullWritable> output, Reporter reporter) throws IOException { output.collect(key, NullMritable.get()); public static class FirstPartitioner implements Partitioner<IntPair, NullWritable> { @Override public void configure(DobConf job) {} @Override public int getPartition(IntPair key, NullWritable value) int numPartitions) { return Math.abs(key.getFirst() *127) % numPartitions; } } public static class KeyComparator extends WritableComparator protected KeyComparator() { super(IntPair.class, true); } @Override public int compare(WritableComparable wl, WritableComparable w2) { IntPair ipl = (IntPair) w1; IntPair ip2 = (IntPair) w2; int cmp = IntPair.compare(ipl.getFirst(), ip2.getFirst()); if (cmp != 0) { return cmp; } return -IntPair.compare(ipl.getSecond(), ip2.getSecond()); //reverse public static class GroupComparator extends WritableComparator( protected GroupComparator() { super(IntPair.class, true); } @Override public int compare(WritableComparable w1, WritableComparable w2) { IntPair ip1 = (IntPair) w1; IntPair ip2 = (IntPair) w2; return IntPair.compare(ipl.getFirst(), ip2.getFirst()); } } @Override public int run(String[] args) throws IOException { JobConf conf = ]obBuilder.parseInputAndOutput(this, getConf(), args); if (conf == null) { return -1: } conf.setMapperClass(MaxTemperatureMapper.class); conf.setPartitionerClass(FirstPartitioner.class); conf.setOutputKeyComparatorClass(KeyComparator.class); conf.setOutputValueGroupingComparator(GroupComparator.class); conf.setReducerClass(MaxTemperatureReducer.class); conf.setOutputKeyClass(IntPair.class); conf.setOutputValueClass(NullWritable.class); JobClient.runJob(conf); return 0: } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new MaxTemperatureUsingSecondarySort(), args); Stystem.exit(exitCode); }
在mapper中利用IntPair类定义了一个代表年份和气温的组合键,该类实现了 Writable接口。IntPair与TextPair类相似。由于可以根据各reducer的组合键获得最髙气温,因此无需 附加其他信息,使用NullWritable即可。根据辅助排序,^(^^[输出的第一个 键就是包含年份和最高气温信息的IntPair对象。IntPair的toString()方法 返回一个以制表符分隔的字符串,因而该程序输出一组由制表符分隔的年份/ 气温对。
Streaming
可以借助Hadoop所提供的一组库来实现Streaming的辅助排序,下面的驱动可以 用来进行辅助排序:
hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \ -D stream.num.map.output.key.fields=2 \ -D mapred.text.key.partitioner.options=-k1,1 \ -D mapned.output.key.comparator.class=\ org.apache.hadoop.mapred.lib.KeyFieldBasedComparator \ -D mapned.text.key.comparator.options="-k1n -k2n" \ -input input/ncdc/all \ -output output_secondarysort_stneaming \ -mapper ch08/src/main/python/secondary_sort_map.py \ -partitioner org.apache.hadoop.mapred.lib.KeyFieldBasedPartitioner \ -reducer ch08/src/main/python/secondary_sort_reduce.py \ -file ch08/src/main/python/secondary_sort_map.py \ -file ch08/snc/main/python/secondary_sort_reduce.py
例8-10中的map函数输出年份和气温两个字段。为了将这两个字段看成一个组合 键,需要将stream.num.map.output.key.fields的值设为2。这意味着值可以 是空的,就像Java程序(例8-9)一样。
例8-10.针对辅助排序的map函数(Python版本)
#!/usr/bin/env python import re import sys for line in sys.stdin: val = line.strip() (year, temp, q) = (val[15:19], int(val[87:92]), val[92:9B]) if temp == 9999: sys.stderr.write("reporter:counter:Temperature,Missing,l\n") elif re.match("[01459]", q): print "%s\t%s" % (year, temp)
鉴于我们并不期望根据整个组合键来划分数据集,因此可以利用 KeyFieldBasedPatitioner类以组合键的一部分进行划分0可以使用 mapred.text.key.partitioner,options 配置这个 partitioner。在上例中,值- kl,l表示该partitioner只使用组合键的第一个字段。map.output.key. field.separator属性所定义的字符串能分隔各个字段(默认情况下,是制表符)。
接下来,我们还需要一个比较器以对年份字段升序排列、对气温字段降序排列,使 reduce函数能够方便地返回各组中的第一个记录。 Hadoop提供的 KeyFieldBasedComparator 类能有效解决这个问题。该类通过 mapred.text.key.comparator.options属性来设置排列次序,其格式规范与 GNU sort类似。本例中的-k1n-k2nr选项表示“首字段按数值顺序排序,字段按数 值顺序反向排序”。与 KeyFieldBasedPartitioner 类似,KeyFieldBasedComparator 也采用在map.output.key.field.separator中定义的分隔符将一个组合键划 分成多个字段。
Java版本的程序需要定义分组comparator。但是在Streaming中,组并未以任何方 式划分,因此必须在reduce函数中不断地查看年份是否改变来检测组的边界(例8-11)。
例8-11.针对辅助排序的reducer函数(Python版本)
#!/usr/bin/env python import sys last_group = None for line in sys.stdin: val = line.strip() (yeaTj temp) = val.split("\t") group = year if last_group != group: print val last_group = group
运行此程序之后,得到的结果与Java版本一样。
最后谨记,KeyFieldBasedPartitioner 和 KeyFieldBasedComparator 不仅在 Streaming程序中使用,也能够在Java MapReduce程序中使用。