计数器
在许多情况下,一个用户需要了解待分析的数据,尽管这并非所要执行的分析任务 的核心内容。以统计数据集中无效记录数目的任务为例,如果发现无效记录的比例 相当高,那么就需要认真思考为何存在如此多无效记录。是所采用的检测程序存在 缺陷,还是数据集质量确实很低,包含大量无效记录?如果确定是数据集的质量问 题,则可能需要扩大数据集的规模,以增大有效记录的比例,从而进行有意义的 分析。
计数器是一种收集作业统计信息的有效手段,用于质量控制或应用级统计。计数器 还可辅助诊断系统故障。如果需要将日志信息传输到map或reduce任务,更好的 方法通常是尝试传输计数器值以监测某一特定事件是否发生。对于大型分布式作业 而言,使用计数器更为方便。首先,获取计数器值比输出日志更方便,其次,根据 计数器值统计特定事件的发生次数要比分析一堆日志文件容易得多。
内置计数器
Hadoop为每个作业维护若干内置计数器(表8-1),以描述该作业的各项指标。例 如,某些计数器记录已处理的字节数和记录数,使用户可监控已处理的输入数据量 和已产生的输出数据量。
组别 | 计数器名称 | 说明 |
---|---|---|
Map-Reduce框架 | map输入的记录 | 作业中所有map已处理的输入记录数。每次 RecordReader读到一条记录并将其传给map的map() 函数时,这个计数器的值增加 |
Map-Reduce框架 | map跳过的记录 | 作业中所有map跳过的输入记录数 |
Map-Reduce框架 | map输入 的字节 | 作业中所有map已处理的未压缩输入数据的字节数。每 次RecordReader读到一条记录并将其传给map的 map()函数时,这个计数器的值增加 |
Map-Reduce框架 | map输出的记录 | 作业中所有map产生的map输出记录数。每次某一个 map 的 OutputCollector 调用collect()方法时,这 个计数器的值增加 |
Map-Reduce框架 | map输出的记录 | 作业中所有map已产生的未压缩输出数据的字节数。每次某一个 map 的 OutputCollector 调用collect()方法时,这 个计数器的值增加 |
Map-Reduce框架 | combine输入的记录 | 作业中所有combiner(如果有)已处理的输入记录数。 combiner的迭代器每次读一个值,这个计数器的值增 加。注意:本计数器代表combiner已经处理的值的个 数,并非相异码分组数(后者并无实质意义,因为对于combiner而言,并不要求每个键对应一个组 |
Map-Reduce框架 | combine输出的记录 | 作业中所有combiner(如果有)已产生的输出记录数。每 次某一个 combiner 的 OutputCollector 调用collect()方法时,这个计数器的值增加 |
Map-Reduce框架 | reduce输入的组 | 作业中所有reduce已经处理的相异码分组的个数。每当 某一个reducer的reduce()被调用时,这个计数器的值 增加 |
Map-Reduce框架 | reduce 输入的记录 | 作业中所有reducer已经处理的输入记录的个数。每当某个reduce的迭代器读一个值时,这个计数器的值增加。 如果所有reduce已经处理完所有输入,则这个计数器的 值与计数器“outputrecords”的值相同 |
Map-Reduce框架 | reduce输出的记录 | 作业中所有map已经产生的reduce输出记录数。每当某 个 reducer 的 OutputCollector 调用collect()方法 时,这个计数器的值增加 |
Map-Reduce框架 | reduce跳过的组 | 作业中所有reducer已经跳过的相异码分组的个数。 |
Map-Reduce框架 | reduce跳过的记录 | 作业中所有reducer已经眺过的输入记录的个数 |
Map-Reduce框架 | 溢出的记录 | 作业中所有map和reduce任务溢出到磁盘的记录数 |
文件系统 | 文件系统读的字节 | 作业中所有map和reduce任务溢出到磁盘的记录数 map和reduce任务从每个文件系统读出的字节数。每个 文件系统对应一个计数器,例如Local、HDFS、S3、 KFS等 |
文件系统 | 文件系统写的字节 | map和reduce任务写到每个文件系统的字节数 |
作业计数 | 已启用的map任务 | 已启动的map任务数,包括推测执行的任务 |
作业计数 | 已启用的reduce 任务 | 已启动的reduce任务数,包括推测执行的任务 |
作业计数 | 失败的map任务 | 失败的map任务数。参见“任务失 败”小节了解潜在的失败因素 |
作业计数 | 失败的reduce任务 | 失败的reduce任务数 |
作业计数 | 数据本地的map 任务 | 与输入数据处于同一节点的map任务数 |
作业计数 | 机架本地的map 任务 | 与输入数据处于同一机架的map任务数 |
作业计数 | 其他本地的map 任务 | 与输入数据不在同一机架的map任务数。由于机架 之间的带宽较小,Hadoop会尽量使map任务靠近 输入数据,因而这个计数器的值一般较小 |
计数器由其关联任务维护,并定期传到tasktracker,再由tasktracker传给 jobtracker.因此,计数器能够被全局地聚集。
一个任务的计数器值每次都是完整传输的,而非自上次传输之后再继续数未完成的 传输,以避免由于消息丢失而引发的错误。另外,如果一个任务在作业执行期间失 败,则相关计数器值会减小。仅当一个作业执行成功之后,计数器的值才是完整可 靠的。
用户定义的Java计数器
MapReduce允许用户编写程序来定义计数器,计数器的值可在mapper或reducer 中增加。多个计数器由一个Java枚举(enum)类型来定义,以便对计数器分组。一 个作业可以定义的枚举类型数量不限,各个枚举类型所包含的字段数量也不限。枚 举类型的名称即为组的名称,枚举类型的字段就是计数器名称。计数器是全局的。 换言之,MapReduce框架将跨所有map和reduce聚集这些计数器,并在作业结束 时产生一个最终结果。
在第 5 章中,我们创建了若干计数器来统计天气数据集中不规范的记录数。 例8-1中的程序对此做了进一步的扩展,能统计缺失记录和气温质量代码的分布 情况。
public class MaxTemperatuneWithCounters extends Configured implements Tool { enum Temperature { MISSING, MALFORMED } static class MaxTemperatuneMapperWithCountens extends MapReduceBase implements Mapper<LongWnitable, Text, Text, IntWnitable> { private NcdcRecondPanser parser = new NcdcRecondParser(); public void map(LongWnitable key, Text value,OutputCollector<Text, IntWnitable> output, Reporter reporter) throws IOException { parsen.parse(value); if (parser.isValidTemperature()) { int airTemperature = parser.getAirTemperature(); output.collect(new Text(parser.getYean()),new lntWritable(airTemperatune)); } else if (panser.isMalformedTemperature()) { System.err.println("Ignoring possibly corrupt input: " + value); neponter.incrCounter(Tempenatune.MALFORMED, 1); } else if (pansen.isMissingTemperature()) { reporter.incnCounten(Temperatune.MISSING, 1); } // dynamic counter reporten.incrCounter("TemperatuneQuality", pansen.getQuality() 1); } } @Override public int run(String[] angs) throws IOException { JobConf conf = DobBuilden.parseInputAndOutput(this, getConf(), angs); if (conf == null) { return -1; } conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWnitable.class); conf.setMapperClass(MaxTempGratureMappGnWithCountens.class); conf.setCombinerClass(MaxTemperatureReducer.class); conf.setReducerClass(MaxTemperatureReducer.class); JobClient.runDob(conf); return 0; } public static void main(Stning[] angs) throws Exception { int exitCode = ToolRunnen.nun(new MaxTemperatuneWithCounters(), angs); System.exit(exitCode); } }
理解上述程序的最佳方法是在完整的数据集上运行一遍:
% hadoop jar job.jar MaxTemperatuneWithCounters input/ncdc/all output-counters
作业一旦成功完成执行,最后会输出各计数器的值(通过调用JobClient的runJob() 方法)。以下是这些计数器的最终值。
09/04/20 06:33:36 INFO mapred.JobClient: TemperatureQuality 09/04/20 06:33:36 INFO mapred.JobClient: 2=1246032 09/04/20 06:33:36 INFO mapred.JobClient: 1=973422173 09/04/20 06:33:36 INFO mapred.JobClient: 0=1 09/04/20 06:33:36 INFO mapred.JobClient: 6=40066 09/04/20 06:33:36 INFO mapred.JobClient: 5=158291879 09/04/20 06:33:36 INFO mapred.JobClient: 4=10764500 09/04/20 06:33:36 INFO mapred.JobClient: 9=66136858 09/04/20 06:33:36 INFO mapred.JobClient: Air Temperature Records 09/04/20 06:33:36 INFO mapred.JobClient: Malformed=3 09/04/20 06:33:36 INFO mapred.JobClient: Missing=66136856
动态计数器
上述代码还使用了动态计数器,后者是一种不由Java枚举类型定义的计数器。由于 在编译阶段就已指定Java枚举类型的字段,因而无法使用枚举类型动态新建计数器。 例8-1尝试统计气温质量代码的分布,尽管格式规范定义了可以取的值,但相比之 下,预先用动态计数器来产生实际值更方便。在该例中,Reporter对象的incrCounter()方法有两个String类型的输入参数,分别代表组名称和计数器名称:
public void incnCounter(Stning gnoup, String counter, long amount)
鉴于Hadoop需先将Java枚举类型转变成String类型,再通过RPC发送计数器 值,这两种创建和访问计数器的方法——使用枚举类型和String类型——事实上 是等价的。相比之下,枚举类型易于使用,还提供类型安全,适合大多数作业使 用。如果某些特定场合需要动态创建计数器,可以使用String接口。
易读的计数器名称
计数器的默认名称是枚举类型的Java完全限定类名。由于这种名称在…化界面和 终端上可读性较差,因此Hadoop又提供了另一种方法(即使用“资源捆绑” (resource bundle))来修改计数器的显示名称。前面的例子即是如此,显示的计数器 名称是 “Air Temperature Records”,而非 “TemperatureSMISSING”。对于动态计 数器而言,组名称和计数器名称也用作显示名称,因而通常不存在这个问题。
为计数器提供易读名称也很容易。以把^枚举类型为名创建一个属性文件,用下划线 (_)分隔嵌套类型。属性文件与包含该枚举类型的顶级类放在同一目录。例如,例8-1 中的Temperature枚举类型对应的属性文件被命名为MaxTemperatureWithCounters_ Temperature.properties 。
属性文件只有一个CounterGroupName属性,其值便是整个组的显示名称。在枚举类型中定义的每个字段均与一个属性对应,属性名称是“字段名称name",属性值是该计数器的显示名称。属性文件MaxTemperatureWithCounters_Temperature.properties 的内容如下:
Counter6roupName=Air Temperature Records MISSIN6.name=Missing MALFORMED.name=Malformed
Hadoop使用标准的Java本地化机制将正确的属性文件载入到当前运行区域。例 如,创建一个名为 MaxTemperatureWithCounters_Temperature_zh_CN,properties 的 中文属性文件,在zh_CN区域运行时,就会使用这个属性文件。详情请参见 java,util.PropertyResourceBundle 类的相关文档。
获取计数器
除了通过Web界面和命令行(执行Hadoop job -counter指令)之外,用户还可以 使用Java API获取计数器的值。通常情况下,用户一般在作业运行完成、计数器 的值已经稳定下来时再获取计数器的值,而Java API还支持在作业运行期间就能 够获取计数器的值。例8-2展示了如何统计整个数据集中气温信息缺失记录的 比例。
例8-2.统计气温信息缺失记录所占的比例
import org.apache.hadoop.conf.Configured; import org.apache.hadoop.mapned.*; import org.apache.hadoop.util.*; public class MissingTemperatureFields extends Configured implements Tool { @Override public int run(String[] args) throws Exception { if (args.length != 1) { JobBuilder.printUsage(this, "<job ID>"); return -1; } JobClient jobClient = new JobClient(new DobConf(getConf())); String jobID = args[0]; RunningJob job = jobClient.get3ob(JobID.forName(jobID)); if (job == null) { System.err. printf("No job with ID %s iFound.\n", jobID); return -1; } if (!job.isComplete()) { System.enn.pnintf("Job %s is not complete.\n", jobID); return -1; } Counters counters = job.getCounters(); long missing = counters.getCounter( MaxTempenatuneWithCounters.Tempenature.MISSING); long total = counters.findCounter("ong.apache.hadoop.mapned. Task$Counter", "MAP_INPUT_RECORDS").getCounter(); System.out.pnintf("Reconds with missing temperature fields: %.2f%%\n", 100,0 * missing / total); return 0; } public static void main(Stning[] angs) throws Exception { int exitCode = ToolRunnen.nun(new MissingTempenatuneFields(), angs); System.exit(exitCode); } }
首先,以作业ID为输入参数调用一个JobClient实例的getJob()方法,返回一个RunningJob对象。检査是否有一个作业与指定HD相匹配。有多种因素可能导 致无法找到一个有效的RunningJob对象,例如,错误地指定了作业ID,或 jobtracker不再指向这个作业。内存中仅保留最新的100个作业,该阐值受 mapred. jobtracker. completeuserjobs.maximum 控制,当 jobtracker 重启时, 所有作业信息都被清除。
值得一提的是,Counters类的多个findCounter方法也会返回一个Counter 对象。本例调用该方法来获取内置的input records计数器的值。即通过组名称(即 枚举类型的Java完整类名)和计数器的名称(均为字符串)访问这个计数器。
最后,输出气温信息缺失记录的比例。针对整个天气数据集的运行结果如下:
% hadoop jar job.jar MissingTemperatureFields job_200904200610_0003 Records with missing temperature fields: 5.47%
用户定义的Streaming计数器
Streaming MapReduce程序通过向标准错误流发送一行特殊格式的信息来增加计数 器的值,格式如下:
reporter: counter: group,counter,amount
以下Python代码片段将Temperature组的Missing计数器的值增加
sys.stderr.write("reporter:counter:Temperature,Missing,1\n")
状态信息也可以类似方法发出,格式如下:
reporter:status:message