Hadoop教程

MapReduce的类型

Hadoop的MapReduce中,map和reduce函数遵循如下常规格式:

map: (K1, VI) — list(K2, V2) 
reduce:(K2,list(V2)) — list(K3, V3)

一般来说,map函数输入的键/值的类型(K1和V1)类型不同于输出类型(K2和 V2)。虽然,reduce函数的输入类型必须与map函数的输岀类型相同,但reduce函 数的输出类型(K3和V3)可以不同于输入类型。例如以下Java接口代码:

public interface Mapper<K1, V1, K2, V2> extends JobConfigurable, Closeable 
{
    void map(K1 key, V1 value, 0utputCollector<K2, V2>output. Reporter reporter)
	 throws IOException;
}
public interface Reducer<K2, V2, K3, V3> extends DobConfigurable, Closeable 
{
    void reduce(K2 key, Iterator<V2> values,0utputCollector<K3, V3> output, Reporter reporter) throws IOException;
}

前面讲过,OutputCollector纯粹是为了输出键/值对(所以用类型作为参数),而Rerporter是用来更新计数和状态信息。在发布的MapReduce API 0.20.0或以后的 版本中,这两个函数被合并在一个上下文对象(context object)中。

如果使用combine函数,它与reduce函数的形式相同(它是Reducer的一个实 现),不同之处是它的输出类型是中间的键/值对类型(K2和V2),这些中间值可以 输入 reduce 函数:

map: (K1, V1)→list(K2, V2)
combine: (K2, list(V2))→list(K2, V2)
reduce: (K2, list(V2))→ list(K3, V3)

combine与reduce函数通常是一样的,在这种情况下,K3与K2类型相同,V3与V2类型相同。

partition函数将中间的键/值对(K2和V2)进行处理,并且返回一个分区索引 (partition index)。实际上,分区单独由键决定(值是被忽略的)。

partition: (K2,V2) — integer

或用Java的方式:

public interface Partitioner<K2, V2> extends DobConfigurable {
    int getPartition(K2 key, V2 value, int numPartitions);
}

这些理论对配置MapReduce作业有帮助吗?表7-1总结了配置选项,把属性分为可 以设置类型的属性和必须与类型相容的属性。

输入数据的类型由输入格式进行设置。例如,对应于TextlnputFormat的键类型 是LongWritable,值类型是Text。其他的类型通过调用JobConf上的方法来进 行显式设置。如果没有显式设置,中间的类型默认为(最终的)输出类型,也就是默 认值LongWritable和Text。因此,如果K2与K3是相同类型,就不需要调用 setMapOutputKeyClass(),因为它将调用setOutputKeyClass()来设置,同 样,如果V2与V3相同,只需要使用setOutputKeyClass()。

这些为中间和最终输出类型进行设置的方法似乎有些奇怪。为什么不能结合 mapper和reducer导出类型呢?原因是Java的泛型机制有很多限制:类型擦除 (type eraser)导致运行过程中类型信息并非不一直可见,所以Hadoop不得不明确 进行设定。这也意味着可能用不兼容的类型来配置MapReduce作业,因为这些配 置在编译时无法检査。与MapReduce类型兼容的设置列在表7-1中。类型冲突是 在作业执行过程中被检测出来的,所以一个比较明智的做法是先用少量的数据跑一次测试任务,发现并修正任何类型不兼容的问题。

默认的MapReduce作业

如果没有指定mapper或reducer就运行MapReduce,会发生什么情况?我们运行 一个最简单的MapReduce程序来看看:

public class MinimalMapReduce extends Configured implements Tool{
    @Override
    public int run(String[] args) throws Exception {
        if (args.length != 2) {
            System.err.printf("Usage: %s [generic options] <input> <output>\n",gGtClass().getSimpleName());
            ToolRunner.printGenericCommandUsage(System.err);
            return -1;
        }
        JobConf conf = new ]obConf(getConf(), getClass());
        FileInputFormat.addInputPath(conf, new Path(args[0]));
        FileOutputFormat.setOutputPath(conf, new Path(args[1]));
        3obClient.run3ob(conf);
        return 0;
    }

    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new MinimalMapReduce(), args);
        System.Gxit(exitCode);
    }
}

我们唯一设置的是输入路径和输出路径。在气象数据的子集上运行以下命令:

% hadoop MinimalMapReduce "input/ncdc/all/190{l,2}.gz" output

输出目录中得到命名为part-00000的输出文件。这个文件的前几行如下(为了适 应页面进行了截断处理):

0→0029029070999991901010106004+64333+023450FM-12+000599999V0202701N01591...
0→0035029070999991902010106004+64333+023450FM-12+000599999V0201401N01181...
135→0029029070999991901010113004+64333+023450FM-12+000599999V0202901N00821...
141→0035029070999991902010113004+64333+023450FM-12+000599999V0201401N01181...
270→0029029070999991901010120004+64333+023450FM-12+000599999V0209991C00001...
282→0035029070999991902010120004+64333+023450FM-12+000599999V0201401N01391...

每一行首先是一个整数,其次是Tab(制表符),然后是一段原始气象数据记录。虽 無这并不是一个有用的程序,但理解它如何产生输出,确实能够洞悉.运行 MapReduce作业时Hadoop是如何使用默认设置的。例7-1的示例与前面 MinimalMapReduce完成的事情一模一样,但是它显式地把作业环境设置为默 认值。

例7-1.最小的MapReduce驱动程序,默认值显式设置

public class MinimalMapReduceWithDefaults extends Configured implements Tool {
    @Override
    public int run(String[] args) throws IOException {
        JobConf conf = DobBuilder.parseInputAndOutput(this, getConf(), args);
        if (conf == null) {
            return -1;
        }

        conf.setInputFormat(TextInputFormat.class);

        conf.setNumMapTasks(l);
        conf.setHapperClass(IdentityHapper.class);
        conf.setHapRunnerClass(HapRunner.class);

        conf.setMapOutputKeyClass(LongWritable.class);
        conf.setMapOutputValueCla$s(Text.class);

        conf.setPartitionerClass(HashPartitioner.class);

        conf.setNumReduceTasks(l);
        conf.setReducerClas5(IdentityReducer.class);

        conf.setOutputKeyClass(LongWritable.clas5); 
        conf.setOutputValueClass(Text.class);
        conf.setOutputFormat(TextOutputFormat.class);

        JobClient.runJob(conf);
        return 0;
    }
    public static void main(String[] args) throws Exception 
    {
        int exitCode = ToolRunner.run(new MinimalMapReduceWithDefaults(), args);
        System.exit(exitCode);
    }
}

通过把打印使用说明和把输入/输出路径逻辑抽取到一个帮助方法中,我们对run() 方法的开始几行进行了简化。几乎所有MapReduce驱动程序都有两个参数(输入与 输出),所以此处进行这样的代码约简是可行的。以下是JobBuilder类中的相关 方法,供大家参考:

public static ]obConf parseInputAndOutput(Tool tool, Configuration conf,String[] args){
    if (args.length != 2) 
    {
        printUsage(tool, "<input> <output>");
        return null;
    }
    JobConf jobConf = new JobConf(conf, tool.getClass());
    FileInputFormat.addInputPath(jobConf, new Path(args[0]));
    FileOutputFormat.setOutputPath(jobConf, new Path(args[1]));
}
public static void printUsage(Tool tool, String extraArgsUsage) 
{
    system.err.printf("Usage: %s [genericOptions] %s\n\n",
    tool.getClass().getSimpleName(), extraArgsUsage);
    GenericOptionsParser.printGenericCommandUsage(system.err);
}

回到例7-1中的MinimalMapReducewithDefaults,虽然有很多其他的默认作业 设置,但加粗显示的部分是执行一个作业最关键的。接下来我们逐一讨论。

默认的输入格式是TextlnputFormat,它产生的键类型是LongWritable(文件中 每行中开始的偏移量值),值类型是Text(文本行)。这也解释了最后输出的整数的 含义:它们是行偏移量。

虽然看上去很像,但事实上,setNumMapTasks函数调用并不必将map任务的数 量设定成1。它只是一个提示,真正的map任务的数量将取决于输入文件的大小以 及文件块的大小(如果此文件在HDFS中)。

默认的mapper是IdentityMapper,它将输入的键和值原封不动地写到输出中:

public class IdentityMapper<K,V>
   extends MapReduceBase implements Mapper<K, V, K, V> 
{
   public void map(K key, V val,
       OutputCollector<K,V> output, Reporter reporter)
    throws IOException {
        output.collect(key, val);
   }
}

IdentityMapper是一个泛型类型(generic type),它可以接受任何键或值的类型, 只要map输入和输出键的类型相同,输入和输出值的类型相同就可以。在这个例子中,map的输出键是Longwritable类型,map的输出值是Text类型。

map任务是由MapRunner负责运行的,MapRunner是MapRunnable的默认实 现,它顺序地为每一条记录调用一次Mapper的map方法。

默认的partitioner是HashPartitioner,它对每条记录的键进行哈希操作以决定 该记录应该属于哪个分区。每个分区对应一个reducer任务,所以分区数等于作业 的reducer的个数:

public class HashPartitioner<K2, V2> implements Partitioner<K2, V2> {
	public void configure(JobConf job) {}
  public int getPartition(K2 key, V2 value,
           int numPartitions) {
      return (key.hashCode() & Integer.MAX_VALUE) % numPartitions;
  }
}

键的哈希码被转换为一个非负整数,它由哈希值与最大的整型值做一次按位与操作 而获得,然后用分区数进行取模操作,来决定该记录属于哪个分区索引。

默认情况下,只有一个reducer,因此,也就只有一个分区,在这种情况下, partitioner操作将由于所有数据都已放入同一个分区而无关紧要了。然而,如果有 很多reducer, 了解HashParttioner的作用就非常重要。假设键的散列函数足够 好,那么记录将被均匀分到若干个reduce任务中,这样,具有相同键的记录将由 同一个reduce任务进行处理。

选择reducer的个数

单个reducer的默认配置对Hadoop新手而言很容易上手。真实的应用中,作业 都把它设置成一个较大的数字,否则由于所有的中间数据都会放到一个reducer 任务中,从而导致作业效率极低。注意,在本地作业运行器上运行时,只支持 0个或 1 个 reducer。

reducer最优个数与集群中可用的reducer任务槽数相关。总槽数由集_中节点 数与每个节点的任务槽数相乘得到。该值由mapred.tasktracker.reduce.task.maximum属性的值决定。

一个常用的方法是:设置比总槽数稍微少一些的reducer数,这会给reducer任 务留有余地(容忍一些错误发生而不需要延长作业运行时间)。如果reduce任务 很大,比较明智的做法是使用更多的reducer,使得任务粒度更小,这样一来, 任务的失败才不至于显著影响作业执行时间。

默认的reducer是IdentityReducer,它也是一个泛型类型,它简单地将所有的输 入写到输出中:

public class IdentityReducer<K, V> extends MapReduceBase implements Reducer<K, V, K, V> 
{
    public void reduce(K key, Iterator<V> values,OutputCollector<K, V> output, Reporter reporter) throws IOException 
    {
        while (values.hasNext()) 
        {
            output.collect(key, values.next());
        }
    }
}

对于这个任务来说,输出的键是LongWritable类型,而值是Text类型;事实 上,对于MapReduce程序来说,所有键都是LongWritable类型,所着值都是 Text类型,因为它们是输入键/值的类型,并且map函数和reduce函数都是恒等 叫咖办)函数。然而,大多数MapReduce程序不会一直用相同的键或值类型,所 以就像上一节中描述的那样,必须配置作业来声明使用的类型。

记录在发送给reducer之前,会被MapReduce系统进行排序。在这个例子中,键是 按照数值的大小进行排序的,因此来自输入文件中的行会被交叉放入一个合并后的 输出文件。

默认的输出格式是TextOutputFormat,它将键和值转换成字符串并用Tab进行分 隔,然后一条记录一行地进行输出。这就是为什么输出文件是用制表符(Tab)分隔 的:这是TextOutputformat的一个特点。

Streaming中的键和值

Streaming应用可以决定分隔符,该分隔符用于通过标准输入把键/值对转换为一串 比特值发送到map或reduce函数。默认情况下是Tab(制表符),但是如果键或值中 本身含有Tab分隔符,这个功能就很有用了,它能将分隔符修改成其他符号。

类似地,当map和reduce输出结果键/值对时,也需要一个可配置的分隔符来进行 分隔。更进一步,来自输出的键可以由多个字段进行组合:它可以由一条记录的前 n 个字段组成(由 stream.num.map.output.key.fields 或 stream, num.reduce output.key.Fields进行定义),剩下的字段就是值^例如,一个Streaming处理 的输出是“a,b, c”(分隔符是逗号),n设为2,则键解释为“a、b”,而值是c。

mapper和reducer的分隔符是相互独立进行配置的。这些属性可参见表7-2,数据 流图如图7-1所示。

这些属性与输入和输出的格式无关。例如,如果stream.reduce.output.field.separator 被设置成冒号,reduce Streaming的把a: b行写入标准输出,那么Streaming的reducer就会知道a作为键,b作为值。如果使用标准的TextOutPutFormat那么 这条记录会使用Tab将键和值分隔写到输出文件。可以通过设置 mapred,textoutputformat. Separator 来修改 TextOutputFormat 的分隔符。

表7-2. Streaming的分隔符属性

属性名称类型默认值描述
stream.map.input.field.
separator
String\t此分隔符用于将输入键/值字符串作
为字节流传递到流map
stream.map.output.field.
separator
String\t此分隔符用于把流map处理的输出
分割成map输出需要的键/值字符串
tream.num.map.output.
key.fields
int1
stream.map.output.field.separator
分隔的字段数,这些字段作为map 输
出键
stream.reduce.input.field.
separator
String\t此分隔符用于将输入键/值字符串作
为字节流传递到流reduce
stream.neduce.
output.field.separator
String\t此分隔符用于将来自流reduce处理  
的输出分割成reduce最终输出需要 的键
的键/值字符串
stneam.num. reduce.
output.key.fields
int1由stream.reduce.output.field.separator
分隔的字段数量,这些字段作为
reduce输出键

图7-1.在Streaming MapReduce作业中使用分隔符的位置

关注微信获取最新动态