Hadoop教程

在集群上运行

目前,程序已经可以在少量测试数据上正确运行,下面可以准备在Hadoop集群的 完整数据集上运行了。第9章将介绍如何建立完全分布的集群,然而该章中的方法 也可以用在伪分布集群上。

打包

单机上运行的程序不需要任何修改就可以直接在集群上运行,但是需要把程序打包 为JAR文件发给集群。使用人扮可以简化这个过程,使用如下的任务(完整的build文件可以在示例代码中找到):

<jar destfile="job.jar" basedir="${classes.dir}"/>

如果每个JAR文件都有一个作业,可以在JAR文件的manifest中指定要运行的main类。如果main类不在manifest中,则必须在命令行指定(见下文)。任何非独 立的JAR文件应该打包到JAR文件的lib子目录中。这与Java Web application archive或WAR文件类似,只不过JAR文件是放在WEB-INF/lib子目录下WAR文 件中的。

启动作业

为了启动作业,我们需要运行驱动程序,使用-conf选项来指定想要运行作业的集 群(同样,也可以使用-fs和-jt选项):

% hadoop jar job.jar vB.MaxTemperatureDriver -conf conf/hadoop-cluster.xml \
          	input/ncdc/all max-temp

JobClient的runJob()方法启动作业并检査进程,有任何变化,就输出一行map 和reduce进度总结。

输出包含很多有用的信息。在作业开始之前,打印作业ID:如果需要在日志文件 中或通过hadoop job命令查询某个作业,必须要有ID信息。作业完成后,统计 信息(例如计数器)被打印出来。这对于确认作业是否完成是很有用的。例如,对于 这个作业,大约分析2MGB输入数据(“Map input bytes”),读取了 HDFS大约 34 GB压缩文件(“HDFS_BYTES_READ”)。输入数据被分成101个大小合适的 gzipped文件,因此即使不能划分数据也没有问题。

作业、任务和task attempt ID

作业ID的格式包含两部分:jobtracker(不是作业)开始的时间和唯一标识此作业 的由jobtracker维护的增量计数器。例如:ID为job_200904110811_0002的 作业是第二个作业(0002,作业ID从1开始),jobtracker在2009年4月11 日08:11开始运行这个作业。计数器的数字前面由0开始,以便于作业ID在目 录列表中进行排序。然而,计数器达到10000时,不能重新设置,导致作业ID更长(这些ID不能很好地排序)。

任务属干作业,任务ID通过替换作业ID的作业前缀为任务前缀,然后加上一 个后缀表示哪个作业里的任务。例如:task_200904110811_0002_m_000003 表示lD为job_200904110811_0002的作业的第4个map任务(000003,任务 ID从0开始计数)。作业的任务ID在初始化时产生,因此,任务ID的顺序不 必是任务执行的顺序。

如果在jobtracker重启并恢复运行作业后,作业被重启,那么taskattempt ID中 最后的计数值将从1000递增。



MapReduce 的 Web 界面

Hadoop的Web界面用来浏览作业信息,对于跟踪作业运行进度、查找作业完成后 的统计信息和日志非常有用。

jobtracker 页面

图5-1给出了主页的截屏。第一部分是Hadoop的安装细节,包括版本号、编译时间和 jobtracker的当前状态(在本例中,状态是running)和启动时间。

接下来是关于集群的概要信息,包括集群的负载情况和使用情况。这表明当前正在 集群上运行的map和reduce的数量,作业提交的数量,可用的tasktracker节点数 和集群的负载能力,集群中可用map和reduce的任务槽数(“Map Task Capacity"和“Reduce Task Capacity"),毎个节点平均可用的任务槽数。

概要信息的下面是正在运行的作业调度器的相关信息(此处是“默认值”)。可以单 击査看作业队列。

随后,显示的是正在运行、(成功地)完成和失败的作业。每部分都有一个作业表, 其中每行显示作业的ID、所属者、作业名(使用JobConf的setJobName()方法设 置的mapred.job.name属性)和进度信息。

最后,页面的底部是一些链接信息,指向jobtracker日志和jobtracker历史信息: 记录jobtracker运行过的所有作业的信息。在作业存储到历史信息页之前,主页上 只显示 100 个作业(通过 mapred.jobtracker.completeuserjobs.maximum 属性 来配置)。注意,作业历史是永久存储的,因此,可以从以前运行的jobtracker中找到作业。

作业历史

作业历史包括已完成作业的事件和配置信息。还包括作业是否成功完成的信 息。作业历史可以用来实现jobtracker重启.后的作业恢复(参见 mapned.jobtnacken.nestant.necoven属性),为运行作业的用户提供有用 信息。

作业历史文件存放在jobtracker本地文件系统中的history子目录中。通过 hadoop.job.histony.location属性来设置历史文件存放在Hadoop文件系 统的任意位置。jobtracker的历史文件会保存30天,随后由系统删除。

作业输出目录的_logs/history子目录为用户存放第二个备份。这个存放位置可 以通过设置hadoop.job.history.usen.location进行重写。如果将其值设 置为特殊值none,则不会有用户作业历史被保存,虽然作业历史仍然是集中 存放的。用户的作业历史文件不会被系统删除。

历史日志包括作业、任务和尝试事件,所有这些信息以纯文本方式存储。特殊 作业的历史可以通过Web界面或在命令行方法下用hadoop job -history(指 定的作业输出目录中)查看。

作业页面

单击作业ID进入作业页面(如图5-2所示)。页面最上方是作业的摘要,包括一些 基本信息,例如:作业的拥有者、作业名和作业运行时间。作业文件是整理过的作 业配置文件,包括作业运行中有效的所有属性和值。如果不确定某个属性的值,可 以点击查看文件。

作业运行期间,可以在作业页面监视作业进度,页面信息会定期自动更新。摘要信 息下方的表展示map和reduce进度。Num Task显示该作业map和reduce的总 数。其他列显示的是这些任务的状态:Pending(等待运行)、Running, Complete(成功完成)和killed(失败任务————用Failed标记更准确)。最后一列显 示的是一个作业所有map和reduce任务中失败和中止的task attempt总数(task attempt可标记为killed,原因可能是:它们是推测执行的副本。task attempt运行的 tasktracker已结束,或这些task attempt已被用户中止)。

在该页面的随后部分,可以找到显示每个任务进度的完成图。reduce完成图被分为 reduce任务的三个阶段:copy(map输出传输到reduce的tasktracker时)、sort(合并reduce输入时)和reduce(reduce函数运行产生最后输出时)。

在该页的中间部分是作业计数器表。这些信息在作业运行期间动态更新,为作业进度和整体健康程度提供另一个有用的信息。


获取结果

一旦作业完成,有许多方法可以获取结果。每个reducer产生一个输出文件,因 此,在max-temp目录中会有30个部分文件(part file),命名为part-00000到part-00029。

这个作业产生的输出很少,所以很容易从HDFS中将其复制到开发机器上。 hadoop fs 命令中的-getmerge选项非常有用,可以得到源模式目录中的所有文 件,并在本地文件系统上把它们合并成一个单独的文件。-getmerge选项对 hadoop fs命令很有用,因为它得到了源模式指定目录下所有的文件,并将其合 并为本地文件系统的一个文件:

% hadoop fs -getmerge max-temp max-temp-local
% sort max-temp-local | tail
1991  607
1992  605
1993  567
1994  568
1995  567
1996  561
1997  565
1998  568
1999  568
2000  558

因为reduce的输出分区文件是无序的(使用hash partitioner的缘故),我们对 输出进行排序。对MapReduce的数据做些后期处理是很常见的,把这些数据送入 分析工具(例如R、电子数据表甚至关系数据库)进行处理。

图5-2.任务页面的屏幕截图

如果输出文件比较小,另外一种获取方式是:使用-cat选项将输出文件打印到控 制台:

% hadoop fs -cat max-temp/*

深入分析后,我们发现某些结果看起来似乎没有道理。比如,1951年(此处没有显 示)的最髙气温是590℃!如何找出产生这个结果的原因呢?这是不正确的输入数据还 是程序中的bug?


作业调试

最经典的方法通过打印语句来调试程序,这在Hadoop中同样适用。然而,需要考 虑复杂的情况:当程序运行在几十台、几百台甚至几千台节点上时,如何找到并检 测调试语句分散在这些节点中的输出呢?为了处理这种情况,我们要查找一个特殊 情况,我们用一个调试语句记录到一个标准错误中,它将发送一个信息来更新任务 的状态信息以提示我们査看错误日志。我们马上将看到,Web UI简化了这个操 作。

我们还要创建一个自定义的计数器来统计整个数据集中不合理的气温记录总数。这 就提供了很有价值的信息来处理如下情况一一如果这种情况经常发生,我们需要从 中进一步了解事件发生的条件以及如何提取气温值,而不是简单地丢掉这些记录。 事实上,调试一个作业的时候,应当总想是否能够使用计数器来获得需要找出事件 发生来源的相关信息。即使需要使用日志或状态信息,但使用计数器来衡量问题的 严重程度也是有帮助的。

如果调试期间产生的日志数据规模比较大,可以有如下选择。第一是将这些信息写 到map的输出流供reduce分析和汇总,而不是写到标准错误流。这种方法通常必 须改变程序结构,所以先选用其他技术。第二种方法,可以编写一个程序(当然是 MapReduce程序)来分析作业产生的日志。

我们把调试加入mapper(版本4),相对于reducer,因为我们希望找到导致这些异常 输出的数据源:

public class MaxTemperatureMapper extends MapReduceBase
 implements Mapper<LongWritable, Text, Text, IntWritable> {
enum Temperature {
OVER_100
}
private NcdcRecordParser parser = new NcdcRecordParser();

public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter  reporter
throws IOException {

parser.parse(value);
if (parser.isValidTemperature()) {
int airTemperature = parser.getAirTemperature();
if (airTemperature > 1000) {
System.err.println("Temperature over 100 degrees for input: " + value);
reporter.setStatus("Detected possibly corrupt record: see logs.");
reporter.incrCounter(Temperature.OVER_100, 1);
}
output.collect(new Text(parser.getYear()), new IntWritable(airTemperature));
}
}
}

如果气温超过了 100℃(表示为1000,因为气温只保留小数点后一位),我们输出一 行到标准错误流以代表有问题的行,同时使用Reporter的setStatus()方法来 更新map中的状态信息,引导我们查看日志。我们还增加了计数器,表示为Java 中enum类型的字段。在这个程序中,定义一个OVER_100字段来统计气溫超过 100℃的记录数。

完成这些修改,我们重新编译代码,重新创建JAR文件,然后重新运行作业,并 在运行时进入任务页面。

任务页面

任务页面包括一些查看作业中任务细节的链接。例如,点击map链接,进入一个 页面,所有map任务的信息都列在这一页上。还可以只查看已完成的任务。图5-3 中的截图显示了带有调试语句的作业页面中的一部分。表中的毎行代表一个任务, 提供的信息包括毎个任务的开始时间和结束时间,来自tasktracker的错误报告,一个用来査看毎个任务的计数器的链接。

Status列对调试非常有用,因为它显示了任务的最新状态信息。任务开始之前, 显示的状态为initializing,一旦开始读取记录,它便以字节偏移量和长度作为 文件名,显示它正在读取的文件的划分信息。你可以看到我们为任务 task_200904110811 003_m_000044进行调试时的状态显示,单击日志页面找到 相关的调试信息。注意,这个任务有一个附加计数器,因为这个任务的用户计数器 有一个非零的计数。

任务详细信息页面

从任务页面中,可以单击任何任务获得更多相关信息。图5-4的详细任务信息页面 显示了每个task attempt。在这个示例中,只有一个成功完成的task attempt。此图表进一步提供了十分有用的数据,如task attempt的运行节点和指向任务日志文件 和计数器的链接。

Actions列包括终止task attempt的链接。默认情况下,这项功能是禁用的,Web 用户界面是只读接口。将webinterface.private.actions设置成ture,即可启用此动作的链接。

图5-3.任务页面的屏幕截图

图5-4.任务详细信息页面的屏幕截图

将webinterface.Private.actions设置为true,意味着允许任何 人访问HDFSWeb界面来刪除文件。dfs.web.ugi属性决定以哪 个用户身份运行HDFS Web UI,从而控制可以查看或删除哪些 文件。

对于map任务,页面中还有一部分显示了输入分片分布在哪些节点。 通过跟踪成功task attempt的日志文件链接(可以看到每个日志文件的最后4KB或 8KB或整个文件),会发现存在问题输入记录。这里考虑到篇幅,已经进行了分行 和截断处理:

Temperature over 100 degrees for input:
0335999999433181957042302005+37950+139117SAO +0004R3SNV020113590031500703569999994
33201957010100005+35317+139650SAO+000899999V02002359002650076249N004000599+0067...

此记录的格式看上去与其他记录不同。可能是因为行中有空格,这是规范中没有描述的。 作业完成后,查看我们定义的计数器的值,以检查在整个数据集中有多少记录超过 100°c。通过Web界面或命令行,可以查看计数器:

% hadoop job -counter job_200904110811_0003
'v4.MaxTemperatureMapper$Temperature'\ OVER_100
3

-counter选项的输入参数包括作业ID,计数器的组名(这里一般是类名)和计数器 名称(enum名)。这里,在超过十亿条记录的整个数据集中,只有三个异常记录。 对于许多大数据问题,一般会扔掉不正确的记录,然而,我们需要谨慎处理这种情 况,因为我们寻找的是一个极限值-最高气温值,而不是一个总量。尽管如此,扔掉三个记录也许并不会改变结果。

Hadoop用户日志

针对不同用户,Hadoop在不同的地方生成日志。表5-2对此进行了总结。

在本小节,你会看到,MapReduce任务日志可以从Web界面访问,这是最便捷 的方式。也可以从正在进行task attempt(task的这个tasktracker的本地文件系 统中找到日志文件,目录以task attempt来命名。如果启用任务〕VM重用功能 (参见第184页的“任务JVM重用”小节),每个日志文件累加成为整个JVM 运行日志,所以,多个task attempt存放在一个日志文件中。Web界面隐藏了这 一点,只显示与正在查看的task attempt相关的部分日志。

对这些日志文件的写操作是很直接的„任何到标准输出或标准错误流的写操 作都直接写到相关日志文件。当然,在Streaming方式下,标准输出被用于 map或reduce的输出,所以并不会出现在标准输出日志文件中。

有一些控制用于管理任务日志的大小和记录保留时间。默认情况下,日志最短 在24小时后删除(通过mapred.userlog.retain.hours属性来设置)。也可 以用mapred.userlog.limit.kb属性在每个日志文件的最大大小上设置一个 阈值,默认值是0,表示没有上限。

表5-2.Hadoop的日志

曰志 主要受众 描述
系统守护进程日志 管理员 每个Hadoop守护进程产生一个日志文件(使用log4j)和另一个(文件合并标准输出和错误)。这些文件分别写入HADOOP_ LOG_DIR环境变量定义的目录。
HDFS审计日志 管理员 这个日志记录所有HDFS请求,默认是关闭 状态。虽然可以配置,但它一般写入namenode的日志
MapReduce作业历史日志 用户 记录作业运行期间发生的事件(如任务完成)。集中保存在jobtracker上的_logs/history 子目录中的作业输出目录中
MapReduce任务日志 用户 每个tasktrcker子进程都用log4j产生一个日 志文件(称作syslog), 一个保存发到标准输出 (stdout)数据的文件,一个保存标准错误 (stderr)的文件。这些文件写入到HADOOP LOG DIR环境变量定义的目录的userlogs的子目录中

处理不合理的数据

捕获引发问题的输入数据是很有价值的,因为我们可以在测试中用它来检查 mapper的工作是否正常:

Test
public void parsesMalformedTemperature() throws IOException {
MaxTemperatureMapper mapper = new MaxTGmpenatuneMappen();
Text value = new Text("0335999999433181957042302005+3795&4-139117SAO +0004" +
   // Yean ^^^^
"RJSN V02011359003150070356999999433201957010100005+353");
    // Temperature ^^^^^
OutputCollectonoutput = mock(OutputCollecton.class);
	Reporter reporter = mock(Reponten.class);
	mappen.map(null, value, output, reporter);
venify(output, never()).collect(any(Text.class),any(IntWnitable.class));
venify(reponter).incnCounten(MaxTempenatureMapper.Temperature. MALFORMED, 1);
}

引发问题的记录与其他行的格式是不同的。例5-11显示了修改过的程序(版本5), 它使用的解析器忽略了那些没有首符号(+或-)气温字段的行。我们还引入一个计数 器来统计,因为这个原因而被忽略的记录数。

例5-11.该阳mapper用于査找最高气温

public class MaxTemperatureMapper extends MapReduceBase
implements Mapper<LongWritable, Text, Text, IntWritable> {

enum Temperature {
MALFORMED
}

private NcdcRecordPansen parser = new NcdcRecondParsen();
public void map(LongWritable key, Text value,
OutputCollector<Text, IntWritable> output, Reporter reporter)
throws IOException {

parser.parse(value);
if (parser.isValidTemperature()) {
int airTemperature = parser.getAirTemperature();
output.collect(new Text(parsen.getYear()), new IntWritable(airTemperature));
} else if (parser.isMalformedTemperature()) {
System.err.println("Ignoring possibly corrupt input: " + value);
reporter.incrCounter(Temperature.MALFORMED, 1);
}
}
}

使用远程调试器

当一个任务失败并且没有足够多的记录信息来诊断错误时,可以选择用调试器运行 该任务。在集群上运行作业时,很难使用调试器,因为你不知道哪个节点处理哪部 分输入,所以不能在错误发生之前安装调试器。然而,可以设置运行作业的属性来 让Hadoop保留作业在运行期间产生的所有中间值。这些数据可以用来独立地在调 试器上重新运行那些出错的任务。注意,任务在原处运行,即在故障的节点上,这 会增加错误重现的几率。

首先,将配置属性keep.failed.task.files的值设置为true,以便在任务失败 时,tasktracker能保留足够的信息让任务在相同的输入数据上重新运行。然后,再 次运行作业,并使用Web UI查看故障节点和task attempt ID(该ID以字符串 attempt_开始)。

接着,需要用前面保留的文件作为输入,运行一个特殊的作业运行器,即 IsolationRunner。登录到故障节点,找到那个taskattempt的目录。它可能是在 本地MapReduce目录下的某一个目录,由mapred.local.dir属性设置,如果这个属性是一个以逗号分隔 的目录列表(为了将负载分散到机器的物理磁盘),就需要查看所有目录找到那个特 定task attempt的目录。特定task attempt的目录位于如下路径:

apred,Locai,dir/taskTracker/jobcache/job-ID/task-attempt-ID

这个目录包含多个文件和子目录,其中job.xml文件包含task attempt期间生效的 所有作业的配置属性,IsolationRunner用它来创建一个JobConf实例。对于 map任务,这个目录还包含一个含有输入划分序列化表示的文件,所以map任务 可以取得相同的输入数据。对于reduce任务,则有一个map输出备份,它作为reducer的输入,存放在output目录中。

还有一个work目录,它是task attempt的工作目录。我们改变到这个目录以运行 IsolationRunner。需要设置一些选项来连接到远程调试器:

% export HADOOP_OPTS="-agentlib:jdwp=transport=dt_socket,server=y,suspend=y, \address=8000"

suspend=y选项表示JVM在运行代码前先等待调试器连接。用以下命令启动 IsolationRunner:

% hadoop org.apache.hadoop.mapred.IsolationRunner ../job.xml

下一步,设置断点,连接远程调试器(所有主流的Java IDE都支持远程调试,可查阅说明文档),随后任务会在你的控制下运行。可以这样重新运行任务任意多次。 幸运的话,可以找到并修复错误。

在这个过程中,可以使用其他标准的Java调试技术,如kill-QUIT pid或 jstack来进行线程转储。

很多情况下,有必要知道这种技术并不只适用于失败的任务,还可以保留成功完成 任务的中间结果文件,以便检查不失败的任务。这时,将属性 keep.task.files.pattern设置为一个正则表达式(与保留的任务ID匹配)。

关注微信获取最新动态