连接
MapReduce能够执行大型数据集间的“连接”(join)操作,但是,自己从头编写相 关代码来执行连接的确非常棘手。除了写MapReduce程序,还可以考虑采用一个 更高级的框架,如Pig、Hive或Cascading等,它们都将连接操作视为整个实 现的核心部分。 先简要地描述待解决的问题。假设有两个数据集:气象站数据库和天气记录数据 集,并考虑如何合二为一。一个典型的查询是:输出各气象站的历史信息,同时各 行记录也包含气象站的元数据信息,如图8-2所示。
连接操作的具体实现技术取决于数据集的规模及分区方式。如果一个数据集很大 (例如天气记录)而另外一个集合很小,以至于可以分发到集群中的每一个节点之中 (例如气象站元数据),则可以执行一个MapRedUCe作业,将各个气象站的天气记录 放到一块(例如,根据气象站山执行部分排序),从而实现连接。mapper或reducer根据各气象站ID从较小的数据集合中找到气象站元数据,使元数据能够被写到各 条记录之中。该方法将在“边数据分布”小节中详述,侧重于将数据分 发到tasktracker的机制。 连接操作如果由mapper执行,则称为“map端连接”,如果由reducer执行,则称 为“reduce端连接”。
如果两个数据集的规模均很大,以至于没有哪个数据集可以被完全复制到集群的每 个节点,我们仍然可以使用MapReduce来进行连接,至于到底采用map端连接还 是reduce端连接,则取决于数据的组织方式。最常见的一个例子便是用户数据库 和用户活动日志(例如访问日志)。对于一个热门服务来说,将用户数据库(或日志) 数据库分发到所有MapReduce节点中是行不通的。
map端连接
在两个大规模输入数据集之间的map端连接会在数据到达map函数之前就执行连 接操作。为达到该目的,各map的输入数据必须先分区并且以特定方式排序。各 个输入数据集被划分成相同数量的分区,并且均按相同的键排序(连接键)。同一键 的所有记录均会放在同一分区之中。听起来似乎要求非常严格(的确如此),但这的 确合乎MapReduce作业的输出。
图8-2.两个数据集的内连接
map端连接操作可以连接多个作业的输出,只要这些作业的reducer数量相同,键 相同、并且输出文件是不可切分的(例如,小于一个HDFS块,或gzip压缩)。在天 气例子中,如果气象站文件以气象站ID排序,记录文件也以气象站ID排序,而 且reducer的数量相同,则它们就满足了执行map端连接的前提条件。
利用 org.apache.hadoop.mapred. join 包中的 CompositeInputFormat 类来运 行一个map端连接。CompositeInputFormat类的输入源和连接类型(内连接或外 连接)可以通过一个连接表达式进行配置,连接表达式的语法较为简单。详情与示 例可参见包文档。
org.apache.hadoop.examples.Doin是一个通用的执行map端连接的命令行程 序。该例运行一个基于多个输入数据集的mapper和reducer的MapReduce作业, 以执行给定的操作。
reduce端连接
由于reduce端连接并不要求输入数据集符合特定结构,因而reduce端连接比map 端连接更为常用。但是,由于两个数据集均需经过MapReduce的shuffle过程,所 以reduce端连接的效率往往要低一些。基本思路是mapper为各个记录标记源,并 且使用连接键作为map输出键,使键相同的记录放在同一reducer中。以下技术能 帮助实现reduce端连接。
多输入
数据集的输入源往往有多种格式,因此可以使用MultipleInputs类来方便地解析和标注各个源。
辅助排序
如前所述,reducer将从两个源中选出相同的记录且并不介意这些记录是否已 排好序。此外,为了更好地执行连接操作,先将某一个源的数据传输到 reducer会非常重要。以天气数据连接为例,当天气记录发送到reducer的时 候,与记录有相同键的气象站信息最好也已经放在reducer,使reducer能够将 气象站名称填到天气记录之中再输出。虽然也可以不指定数据传输次序,并将 待处理的记录缓存在内存之中,但应该尽量避免这种情况,因为其中一组的记 录数量可能非常庞大,远远超出reducer的可用内存容量。
我们使用第4章的TextPair类构建组合键,包括气象站ID和“标记”。在这 里,“标记”是一个虚拟的字段,其唯一目的是对记录排序,使气象站记录比天气 记录先到达。一种简单的做法就是:对于气象站记录,“标记”值为0;对于天气 记录,“标记”值为1。例8-12和例8-13分别描述了执行该任务的两个mapper#。
例8-12.这个mapper用于reduce端连接中标记气象站记录
public class JoinStationMapper extends MapReduceBase implements Mapper<LongWritable, Text, TextPair, Text> { private NcdcStationMetadataParser parser = new NcdcStationMetadataParser(); public void map(LongWnitable key, Text value, OutputCollector<TextPair, Text> output. Reporter ) throws IOException { if (parser.parse(value)) { output.collect(new TextPair(parser.getStationId(), "0"), new Text(parser.getStationName())); } } }
例8-13.这个mapper类用于reduce端连接标记天气记录
public class JoinRecordMapper extends MapReduceBase implements Mapper<LongWritable, Text, TextPair, Text> { private NcdcRecordParser parser = new NcdcRecordParser(); public void map(LongWritable key, Text value, OutputCollector<TextPair, Text> output, Reporter reporter) throws IOException { parser.parse(value); output.collect(new TextPair(parser.getStationId(), "1"), value); } }
reducer知道自己会先接收气象站记录。因此从中抽取出值,并将其作为后续每条 输出记录的一部分写到输出文件。如例8-14所示。
例8-14.这个reducer用于连接已标记的气象站记录和天气记录
public class JoinReducer extends MapReduceBase implements Reducer { public void reduce(TextPair key, Iteratorvalues, OutputCollectoroutput. Reporter reporter) throws IOException { Text stationName = new Text(values.next()); while (values.hasNext()) { Text record = values.next(); Text outValue = new Text(stationName.toString() + "\t" + record.toString()); output.collect(key.getFirst(), outValue); } } }
上述代码假设天气记录的每个气象站lD字段与气象站数据集中的一条记录准确匹 配。如果这个假设不成立,则需要泛化代码,使用另一个Textpair。reduce()方法在处理天气记录之前,要能够区分哪些记录是气象站名称,检测(和处理)缺失或 重复的记录。
在reducer的迭代部分中,对象被重复使用(为了提高效率)。因此,从 第一个Text对象获得站点名称(即StationName)的值就非常关键。Text stationName = new Text(values.next()); 如果不执行该语句,则stationName会指向上一条记录的值,这显然 是错误的。
例8-15显示了该作业的驱动类。在该类中,关键在于根据组合键的第一个字段(即 气象站10)进行分区和分组,即使用一个自定义的partitioner_ KeyPartitioner) 和一个自定义的 comparator(FirstComparatorMtS TextPair 的嵌套类。
例8-15.对天气记录和气象站名称执行连接操作
public class JoinRecordWithStationName extends Configured implements Tool { public static class KeyPartitioner implements Partitioner<TextPair, Text> { @Override public void configure(JobConf job) {} @Override public int getPartition(TextPair key, Text value, int numPartitions) { return (key.getFirst().hashCode() & Integer.MAX_VALUE) % numPartitions; } @Override public int run(String[] args) throws Exception { if (args.length != 3) { JobBuilder.printUsage(this, "<ncdc input> <station input> <output>"); return -1; } JobConf conf = new JobConf(getConf(), getClass()); conf.setJobName("Join record with station name"); Path ncdcInputPath = new Path(args[0]); Path stationInputPath = new Path(args[1]); Path outputPath = new Path(args[2]); MultipleInputs.addInputPath(conf, ncdcInputPath, TextInputFormat.class, JoinRecordMapper.class); MultipleInputs.addInputPath(conf, stationInputPath, TextInputFormat.class, JoinStationMapper.class); FileOutputFormat.setOutputPath(conf, outputPath); conf.setPartitionerClass(KeyPartitioner*class); conf.setOutputValueGroupingComparator(TextPair.FirstComparator.class); conf.setMapOutputKeyClass(TextPair.class); conf.setReducerClass(JoinReducer.class); JobClient.runDob(conf); return 0: } public static void main(String[] angs) throws Exception { int exitCode = ToolRunnen.nun(new DoinRecondWithStationName(), angs); System.exit(exitCode); } }
在样本数据上运行该程序,获得以下输出
011990-99999 SIHCCAJAVRI 0067011990999991950051507004+68750.. 011990-99999 SIHCCAJAVRI 004B011990999991950051512004+68750.. 011990-99999 SIHCCAJAVRI 0043011990999991950051518004+68750.. 012650-99999 TYNSET-HANSMOEN 00430126509999919490B2412004+62B00.. 012650-99999 TYNSET-HANSMOEN 00430126509999919490B2418004+62300..