Hadoop教程

边数据分布

“边数据”(side data)是作业所需的额外的只读数据,以辅助处理主数据集。所面 临的挑战在于如何使所有map或reduce任务(这些任务散布在集群内部)都能够方 便而高效地使用边数据。

除了本小节描述的分布机制,还可以静态字段的方式将边数据缓存在内存中,供同一tasktracker上同一个作业的后续任务共享。该方法还需关注所占用的内存大小,因为可能影响混洗 所需要的内存。

利用JobConf来配置作业

JobConf类(继承自Configuration)的各种setter方法能够方便地配置作业的任一键/值对。如果仅需向任务传递少量元数据则非常有用。如果想获取任务的值,只 需覆盖Mapper或Reducer类的Configure()方法,并调用传入JobConf对象的 getter方法即可。

一般情况下,基本类型足以应付元数据编码。但对于更复杂的对象,用户要么自己 处理序列化工作(这需要实现一个对象与字符串之间的双向转换机制),要么使用 Hadoop 提供的 Stringifier 类。DefaultStringifier 使用 Hadoop 的序列化框 架来序列化对象。

但是这种机制会加大Hadoop守护进程的内存开销压力,在几百个作业同在一个系 统中运行的情况下尤为显著,因而并不适合传输只有几千字节的数据量。作业配置 由jobtracker, tasktracker和子JVM读取。每次读取配.置时,所有项都被读人到内 存(即使暂时不用的配置项也不例外)。例如,用户属性并不在jobtracker或tasktracker上读取,因此这种做法既浪费时间,又浪费内存。


分布式缓存

与在作业配置中序列化边数据的技术相比,Hadoop的分布式缓拷贝存机制更受青 睐,它能够在任务运行过程中及时地将文件和存档复制到任务节点以供使用。为 了节约网络带宽,在每一个作业中,各个文件通常只需复制到一个节点一次。

用法

对于使用DenericOptionsParser的工具来说,用户可以使用 -file选项指定待分发的文件,文件内包含以逗号隔开的URL列表。文件可以存 放在本地文件系统、HDFS或其他Hadoop可读文件系统(例如S3)之中。如果尚未 指定文件系统,则这些文件被默认是本地的。即使默认文件系统并非本地文件系 统,这也是成立的。

用户可以使用-archieves选项向自己的任务中复制存档文件JAR文件、ZIP文 件、tar文件和gizpper文件),这些文件会被反存档到任务节点。-libjars选项 会把JAR文件添加到mapper和reducer任务的类路径中。如果作业JAR文件并非 包含很多库JAR文件,这点会很有用。

以下指令显示如何使用分布式缓存来共享元数据文件,以得到气象站名称:

% hadoop jar job.jar MaxTemperatureByStationNameUsingDistributedCacheFile \
-files input/ncdc/metadata/stations-fixed-width.txt input/ncdc/all output

该命令将本地文件stations-fixed-width.txt未指定文件系统,从而被自动解析为本地 文件)复制到任务节点,从而可以查找气象站名称。类MaxTemperatureByStation NameUsingDistributedCacheFile 的代码如例 8-16 所示。

例8-16.査找各气象站的最高气温并显示气象站名称,气象站文件是一个分布式缓存文件

public class MaxTemperatureByStationNameUsingDistributedCacheFile
	extends Configured implements Tool 
{
static class StationTemperatureMapper extends MapReduceBase
implements Mapper<LongWritable, Text, Text, IntWritable> 
{
    private NcdcRecordParser parser = new NcdcRecordParser();

    public void map(LongWritable key, Text value,
    OutputCollector<Text, IntWritable> output, Reporter reporter)
    	throws IOException 
        {
            parser.parse(value); 
            if (parser.isValidTemperature()) 
            {
                output.collect(new Text(parser.getStationId()),
                new IntWritable(parser.getAirTemperature()));
            }
        }
    }
    static class MaxTemperatureReducerWithStationLookup extends MapReduceBase
    implements Reducer<Text, IntWritable, Text, IntWritable> 
    {

        private NcdcStationMetadata metadata;
    
        @Override
        public void configure(JobConf conf) {
            metadata = new NcdcStationMetadata();
            try {
                metadata.initialize(new File("stations-fixed-width.txt"));
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
        public void reduce(Text key, Iterator<IntWritable> values,
        OutputCollector<Text, IntWritable> output, Reporter reporter)
        throws IOException 
        {
            String stationName = metadata.getStationName(key.toString());
    
            int maxValue = Integer.MIN.VALUE;
            while (values.hasNext())
            {
                maxValue = Math.max(maxValue, values.next().get());
            }
            output.collect(new Text(stationName), new IntWritable(maxValue));
        }
    }

    @Override
    public int run(String[] args) throws IOException 
    {
        JobConf conf = JobBuilder.parseInputAndOutput(this, getConf(), args);
        if (conf == null) {
            return -1;
        }

        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWritable.class);
        
        conf.setMapperClass(StationTemperatureMapper.class);
        conf.setCombinerClass(MaxTemperatureReducer.class);
        conf.setReducerClass(MaxTemperatureReducerWithStationLookup.class);

        JobClient.runJob(conf);
        return 0;
    }
    public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new MaxTemperatureByStationNameUsingDistributedCacheFile(), args);
        System.exit(exitCode);
    }
}

这个程序通过气象站查找最高气温,因此mapper(StationTemperatureMapperHX 仅输出(气象站ID,气温)对。对于combiner,该程序重用MaxTemperatureReducer(参见第2章和第5章)来为map端的map输出分组获得最髙气温。Reducer则有所不同,因为它不仅要查找 最髙气温,还需要根据缓存文件查找气象站名称。

该程序在reducer的configure()方法中用文件的原始名称来获取缓存文件,该文 件的路径与任务的工作目录相同。

以下是输出的小片段,显示部分气象站的最髙气温值。

PEATS RIDGE WARATAH   372
STRATHALBYN RACECOU   410
SHEOAKS AWS   399
WANGARATTA AERO   409
MOOGARA   334
MACKAY AERO   331

工作机制

当用户启动一个作业,Hadoop 将由-files 和-archieves选项所指定的文件复制到jobtracker的文件系统(一般是 HDFS)之中。接着,在任务运行之前,tasktracker 将文件从jobtradcer的文件系统中复制到本地磁盘——缓存——使任务能够访问文 件。从任务的角度来看,这些文件就已经在那儿了(它并不关心这些文件是否来自 HDFS) tasktracker为缓存中的文件各维护一个计数器来统计这些文件的被使用情况。当任 务即将运行时,针对该文件所使用的所有文件的计数器值增1,当任务执行完毕之 后,这些计数器值均减1。当相关计数器值为0时,表明该文件没有被任何任务使 用,可以从缓存中移除。缓存的容量是有限的——默认10GB,因此需要经常删除 无用的文件以腾出空间来装载新文件。缓存大小可以通过配置属性local.cache.size 进行配置,以字节为单位。

尽管这个机制并不确保在同一tasktracker上运行的作业的后续任务能否在缓存中找 到文件,但是成功的概率相当大。原因在于作业的多个任务在调度之后几乎同时开 始运行,在此期间基本不可能有足够多的其他任务也在运行,乃至于将该任务所需 文件从缓存中移除出去。

文件存放在tasktracker的${mapred.local.dir}/taskTracker/archive目录下。但是用户 无需细究这一点,因为这些文件同时以符号链接的方式指向任务的工作目录。

DistributedCache API

由于可以通过GenericOptionsParser间接使用分布式缓存,大多数应用不需要 使用 DistributedCache API。事实上,利用 GenericOptionsParser 访问分布 式缓存更方便。例如,可以将本地文件复制到HDFS中去,接着JobClient会通 过addCacheFile()和addCacheArchive()方法告诉 DistributedCache 在 HDFS中的位置。当文件存放到本地时,JobClient同样获得DistributedCache来创建符号链接,其形式为文件的URI加fragment标识。例如,以URI hdfs://namenode/foo/bar myfile指定的文件通过符号链接以myfile文件名存放在任 务的工作目录下。例8-8显示的例子便使用了这个API。

在任务节点上,最方便的方法是直接访问本地化的文件。然而,有时候用户需要获 得缓存中所有有效文件的列表。JobConf有两个方法可以做到这一点: getLocalCacheFiles()和getLocalCacheArchives()函数均返回一个指向本地 文件路径对象数组。

关注微信获取最新动态