Hadoop教程

计数器

在许多情况下,一个用户需要了解待分析的数据,尽管这并非所要执行的分析任务 的核心内容。以统计数据集中无效记录数目的任务为例,如果发现无效记录的比例 相当高,那么就需要认真思考为何存在如此多无效记录。是所采用的检测程序存在 缺陷,还是数据集质量确实很低,包含大量无效记录?如果确定是数据集的质量问 题,则可能需要扩大数据集的规模,以增大有效记录的比例,从而进行有意义的 分析。

计数器是一种收集作业统计信息的有效手段,用于质量控制或应用级统计。计数器 还可辅助诊断系统故障。如果需要将日志信息传输到map或reduce任务,更好的 方法通常是尝试传输计数器值以监测某一特定事件是否发生。对于大型分布式作业 而言,使用计数器更为方便。首先,获取计数器值比输出日志更方便,其次,根据 计数器值统计特定事件的发生次数要比分析一堆日志文件容易得多。

内置计数器

Hadoop为每个作业维护若干内置计数器(表8-1),以描述该作业的各项指标。例 如,某些计数器记录已处理的字节数和记录数,使用户可监控已处理的输入数据量 和已产生的输出数据量。

组别计数器名称说明
Map-Reduce框架map输入的记录作业中所有map已处理的输入记录数。每次 RecordReader读到一条记录并将其传给map的map() 函数时,这个计数器的值增加
Map-Reduce框架map跳过的记录作业中所有map跳过的输入记录数
Map-Reduce框架map输入 的字节作业中所有map已处理的未压缩输入数据的字节数。每 次RecordReader读到一条记录并将其传给map的 map()函数时,这个计数器的值增加
Map-Reduce框架map输出的记录作业中所有map产生的map输出记录数。每次某一个 map 的 OutputCollector 调用collect()方法时,这 个计数器的值增加
Map-Reduce框架map输出的记录作业中所有map已产生的未压缩输出数据的字节数。每次某一个 map 的 OutputCollector 调用collect()方法时,这 个计数器的值增加
Map-Reduce框架combine输入的记录作业中所有combiner(如果有)已处理的输入记录数。 combiner的迭代器每次读一个值,这个计数器的值增 加。注意:本计数器代表combiner已经处理的值的个 数,并非相异码分组数(后者并无实质意义,因为对于combiner而言,并不要求每个键对应一个组
Map-Reduce框架combine输出的记录作业中所有combiner(如果有)已产生的输出记录数。每 次某一个 combiner 的 OutputCollector 调用collect()方法时,这个计数器的值增加
Map-Reduce框架reduce输入的组作业中所有reduce已经处理的相异码分组的个数。每当 某一个reducer的reduce()被调用时,这个计数器的值 增加
Map-Reduce框架reduce 输入的记录作业中所有reducer已经处理的输入记录的个数。每当某个reduce的迭代器读一个值时,这个计数器的值增加。 如果所有reduce已经处理完所有输入,则这个计数器的 值与计数器“outputrecords”的值相同
Map-Reduce框架reduce输出的记录作业中所有map已经产生的reduce输出记录数。每当某 个 reducer 的 OutputCollector 调用collect()方法 时,这个计数器的值增加
Map-Reduce框架reduce跳过的组作业中所有reducer已经跳过的相异码分组的个数。
Map-Reduce框架reduce跳过的记录作业中所有reducer已经眺过的输入记录的个数
Map-Reduce框架溢出的记录作业中所有map和reduce任务溢出到磁盘的记录数
文件系统文件系统读的字节作业中所有map和reduce任务溢出到磁盘的记录数 map和reduce任务从每个文件系统读出的字节数。每个 文件系统对应一个计数器,例如Local、HDFS、S3、 KFS等
文件系统文件系统写的字节map和reduce任务写到每个文件系统的字节数
作业计数已启用的map任务已启动的map任务数,包括推测执行的任务
作业计数已启用的reduce 任务已启动的reduce任务数,包括推测执行的任务
作业计数失败的map任务失败的map任务数。参见“任务失 败”小节了解潜在的失败因素
作业计数失败的reduce任务失败的reduce任务数
作业计数数据本地的map 任务与输入数据处于同一节点的map任务数
作业计数机架本地的map 任务与输入数据处于同一机架的map任务数
作业计数其他本地的map 任务与输入数据不在同一机架的map任务数。由于机架 之间的带宽较小,Hadoop会尽量使map任务靠近 输入数据,因而这个计数器的值一般较小

计数器由其关联任务维护,并定期传到tasktracker,再由tasktracker传给 jobtracker.因此,计数器能够被全局地聚集。

一个任务的计数器值每次都是完整传输的,而非自上次传输之后再继续数未完成的 传输,以避免由于消息丢失而引发的错误。另外,如果一个任务在作业执行期间失 败,则相关计数器值会减小。仅当一个作业执行成功之后,计数器的值才是完整可 靠的。


用户定义的Java计数器

MapReduce允许用户编写程序来定义计数器,计数器的值可在mapper或reducer 中增加。多个计数器由一个Java枚举(enum)类型来定义,以便对计数器分组。一 个作业可以定义的枚举类型数量不限,各个枚举类型所包含的字段数量也不限。枚 举类型的名称即为组的名称,枚举类型的字段就是计数器名称。计数器是全局的。 换言之,MapReduce框架将跨所有map和reduce聚集这些计数器,并在作业结束 时产生一个最终结果。

在第 5 章中,我们创建了若干计数器来统计天气数据集中不规范的记录数。 例8-1中的程序对此做了进一步的扩展,能统计缺失记录和气温质量代码的分布 情况。

public class MaxTemperatuneWithCounters extends Configured implements Tool 
{
    enum Temperature {
        MISSING,
        MALFORMED
    }
    static class MaxTemperatuneMapperWithCountens extends MapReduceBase implements Mapper<LongWnitable, Text, Text, IntWnitable> 
    {
        private NcdcRecondPanser parser = new NcdcRecondParser();
        public void map(LongWnitable key, Text value,OutputCollector<Text, IntWnitable> output, Reporter reporter) throws IOException 
        {
            parsen.parse(value);
            if (parser.isValidTemperature())
           {
                int airTemperature = parser.getAirTemperature();
                output.collect(new Text(parser.getYean()),new lntWritable(airTemperatune));
            } 
            else if (panser.isMalformedTemperature()) 
           {
                System.err.println("Ignoring possibly corrupt input: " + value);
                neponter.incrCounter(Tempenatune.MALFORMED, 1);
            } 
            else if (pansen.isMissingTemperature()) 
            {
                reporter.incnCounten(Temperatune.MISSING, 1);
            }
            // dynamic counter
            reporten.incrCounter("TemperatuneQuality", pansen.getQuality() 1);

        }
    }
    @Override
    public int run(String[] angs) throws IOException 
    {
         JobConf conf = DobBuilden.parseInputAndOutput(this, getConf(), angs);
        if (conf == null) {
          return -1;
        }
        conf.setOutputKeyClass(Text.class);
        conf.setOutputValueClass(IntWnitable.class);
        
        conf.setMapperClass(MaxTempGratureMappGnWithCountens.class);
        conf.setCombinerClass(MaxTemperatureReducer.class);
        conf.setReducerClass(MaxTemperatureReducer.class);
        
        JobClient.runDob(conf);
        return 0;
    }
    public static void main(Stning[] angs) throws Exception {
        int exitCode = ToolRunnen.nun(new MaxTemperatuneWithCounters(), angs);
        System.exit(exitCode);
    }
}

理解上述程序的最佳方法是在完整的数据集上运行一遍:

% hadoop jar job.jar MaxTemperatuneWithCounters input/ncdc/all output-counters

作业一旦成功完成执行,最后会输出各计数器的值(通过调用JobClient的runJob() 方法)。以下是这些计数器的最终值。

09/04/20  06:33:36  INFO mapred.JobClient: TemperatureQuality
09/04/20  06:33:36  INFO mapred.JobClient: 2=1246032
09/04/20  06:33:36  INFO mapred.JobClient: 1=973422173
09/04/20  06:33:36  INFO mapred.JobClient: 0=1
09/04/20  06:33:36  INFO mapred.JobClient: 6=40066
09/04/20  06:33:36  INFO mapred.JobClient: 5=158291879
09/04/20  06:33:36  INFO mapred.JobClient: 4=10764500
09/04/20  06:33:36  INFO mapred.JobClient: 9=66136858
09/04/20  06:33:36  INFO mapred.JobClient: Air Temperature Records
09/04/20  06:33:36  INFO mapred.JobClient: Malformed=3
09/04/20  06:33:36  INFO mapred.JobClient: Missing=66136856

动态计数器

上述代码还使用了动态计数器,后者是一种不由Java枚举类型定义的计数器。由于 在编译阶段就已指定Java枚举类型的字段,因而无法使用枚举类型动态新建计数器。 例8-1尝试统计气温质量代码的分布,尽管格式规范定义了可以取的值,但相比之 下,预先用动态计数器来产生实际值更方便。在该例中,Reporter对象的incrCounter()方法有两个String类型的输入参数,分别代表组名称和计数器名称:

public void incnCounter(Stning gnoup, String counter, long amount)

鉴于Hadoop需先将Java枚举类型转变成String类型,再通过RPC发送计数器 值,这两种创建和访问计数器的方法——使用枚举类型和String类型——事实上 是等价的。相比之下,枚举类型易于使用,还提供类型安全,适合大多数作业使 用。如果某些特定场合需要动态创建计数器,可以使用String接口。

易读的计数器名称

计数器的默认名称是枚举类型的Java完全限定类名。由于这种名称在…化界面和 终端上可读性较差,因此Hadoop又提供了另一种方法(即使用“资源捆绑” (resource bundle))来修改计数器的显示名称。前面的例子即是如此,显示的计数器 名称是 “Air Temperature Records”,而非 “TemperatureSMISSING”。对于动态计 数器而言,组名称和计数器名称也用作显示名称,因而通常不存在这个问题。

为计数器提供易读名称也很容易。以把^枚举类型为名创建一个属性文件,用下划线 (_)分隔嵌套类型。属性文件与包含该枚举类型的顶级类放在同一目录。例如,例8-1 中的Temperature枚举类型对应的属性文件被命名为MaxTemperatureWithCounters_ Temperature.properties 。

属性文件只有一个CounterGroupName属性,其值便是整个组的显示名称。在枚举类型中定义的每个字段均与一个属性对应,属性名称是“字段名称name",属性值是该计数器的显示名称。属性文件MaxTemperatureWithCounters_Temperature.properties 的内容如下:

Counter6roupName=Air Temperature Records
MISSIN6.name=Missing
MALFORMED.name=Malformed

Hadoop使用标准的Java本地化机制将正确的属性文件载入到当前运行区域。例 如,创建一个名为 MaxTemperatureWithCounters_Temperature_zh_CN,properties 的 中文属性文件,在zh_CN区域运行时,就会使用这个属性文件。详情请参见 java,util.PropertyResourceBundle 类的相关文档。

获取计数器

除了通过Web界面和命令行(执行Hadoop job -counter指令)之外,用户还可以 使用Java API获取计数器的值。通常情况下,用户一般在作业运行完成、计数器 的值已经稳定下来时再获取计数器的值,而Java API还支持在作业运行期间就能 够获取计数器的值。例8-2展示了如何统计整个数据集中气温信息缺失记录的 比例。

例8-2.统计气温信息缺失记录所占的比例

import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.mapned.*;
import org.apache.hadoop.util.*;

public class MissingTemperatureFields extends Configured implements Tool 
{
  @Override
  public int run(String[] args) throws Exception {
       if (args.length != 1) {
             JobBuilder.printUsage(this, "<job ID>");
             return -1;
        }
        JobClient jobClient = new JobClient(new DobConf(getConf()));
        String jobID = args[0];
        RunningJob job = jobClient.get3ob(JobID.forName(jobID));
        if (job == null) {
            System.err. printf("No job with ID %s iFound.\n", jobID);
            return -1;
        }
        if (!job.isComplete()) {
            System.enn.pnintf("Job %s is not complete.\n", jobID);
            return -1;
        }
        Counters counters = job.getCounters();
        long missing = counters.getCounter(
        MaxTempenatuneWithCounters.Tempenature.MISSING);

        long total = counters.findCounter("ong.apache.hadoop.mapned. Task$Counter",
          "MAP_INPUT_RECORDS").getCounter();
        System.out.pnintf("Reconds with missing temperature fields: %.2f%%\n",
        100,0 * missing / total);
        return 0;
    }
    public static void main(Stning[] angs) throws Exception 
    {
        int exitCode = ToolRunnen.nun(new MissingTempenatuneFields(), angs);
        System.exit(exitCode);
    }
}

首先,以作业ID为输入参数调用一个JobClient实例的getJob()方法,返回一个RunningJob对象。检査是否有一个作业与指定HD相匹配。有多种因素可能导 致无法找到一个有效的RunningJob对象,例如,错误地指定了作业ID,或 jobtracker不再指向这个作业。内存中仅保留最新的100个作业,该阐值受 mapred. jobtracker. completeuserjobs.maximum 控制,当 jobtracker 重启时, 所有作业信息都被清除。

值得一提的是,Counters类的多个findCounter方法也会返回一个Counter 对象。本例调用该方法来获取内置的input records计数器的值。即通过组名称(即 枚举类型的Java完整类名)和计数器的名称(均为字符串)访问这个计数器。

最后,输出气温信息缺失记录的比例。针对整个天气数据集的运行结果如下:

% hadoop jar job.jar MissingTemperatureFields job_200904200610_0003
Records with missing temperature fields: 5.47%

用户定义的Streaming计数器

Streaming MapReduce程序通过向标准错误流发送一行特殊格式的信息来增加计数 器的值,格式如下:

reporter: counter: group,counter,amount

以下Python代码片段将Temperature组的Missing计数器的值增加

sys.stderr.write("reporter:counter:Temperature,Missing,1\n")

状态信息也可以类似方法发出,格式如下:

reporter:status:message

关注微信获取最新动态