任务的执行
在"剖析MapReduc作业运行机制”小节中,我们结合整个作业的背景 知道了 MapReduce系统是如何执行任务的。在本小节,我们将知道MapReduce用 户对任务执行的更多的控制。
推测执行
MapReduce模型将作业分解成任务,然后并行地运行任务以使作业的整体执行时 间少于各个任务顺序执行的时间。这使作业执行时间对运行缓慢的任务很敏感,因 为只运行一个缓慢的任务会使整个作业所用的时间远远长于执行其他任务的时间。 当一个作业由几百或几千任务组成时,可能出现少数“拖后腿”的任务是很常 见的。
任务执行缓慢可能有多种原因,包括硬件老化或软件配置错误,但是,检测具体原 因很困难,因为任务总能够成功完成,尽管比预计执行时间长。Hadoop不会尝试 诊断或修复执行慢的任务,相反,在一个任务运行比预期慢的时候,它会尽量检 测,并启动另一个相同的任务作为备份。这就是所谓的任务的“推测执行” (speculative execution)。
必须认识到,如果同时启动两个重复的任务,它们会互相竞争,导致推测执行无法 工作。这对集群资源是一种浪费。相反,只有在一个作业的所有任务都启动之后才 启动推测执行的任务,并且只针对那些已运行一段时间(至少一分钟)且比作业中其 他任务平均进度慢的任务。一个任务成功完成后,任何正在运行的重复任务都将被 中止,因为已经不再需要它们了。因此,如果原任务在推测任务前完成,也测任务 就会被终止,同样地,如果推测任务先完成,那么原任务就会被中止。
推测执行是一种优化措施,它并不能使作业的运行更可靠。如果有一些软件缺陷会 造成任务挂起或运行速度减慢,依靠推测执行来避免这些问题显然是不明智的,并 且不能可靠地运行,因为相同的软件缺陷可能会影响推测式任务。应该修复软件缺 陷,使任务不会挂起或运行速度减慢。
默认情况下,推测执行是启用的。可以基于集群或基于每个作业,单独为map任 务和reduce任务启用或禁用该功能。相关的属性如表6-3所示。
属性名称 | 类型 | 默认值 | 描述 |
---|---|---|---|
mapred.map.tasks. speculative.execution |
boolean | true | 如果任务运行变慢,该 属性决定着是否要启动 map任务的另外一个实例 |
mapred.reduce.tasks. speculative.execution |
boolean | true | 如果任务运行变慢,该属性决定着是否要启动 reduce任务的另外一个 实例 |
为什么会想到关闭推测执行?推测执行的目的是减少作业执行时间,但这是以集群 效率为代价的。在一个繁忙的集群中,推测执行会减少整个吞吐量,因为冗余任务 的执行时会减少作业的执行时间。鉴于此,一些集群管理员倾向于在集群上关闭此 选项,而让用户根据个别作业需要而开启该功能。Hadoop老版本尤其如此,因为 在调度推测任务时,会过度使用推测执行方式。
任务JVM重用
Hadoop在它们自己的Java虚拟机上运行任务,以区分其他正在运行的任务。为每 个任务启动一个新的JVM将耗时大约1秒,对运行1分钟左右的作业而言,这个 额外消耗是微不足道的。但是,有大量超短任务(通常是map任务)的作业或初始化 时间长的作业,它们如果能对后续任务重用JVM,就可以体现出性能上的优势。
启用任务重用JVM后,任务不会同时运行在一个JVM上。JVM顺序运行各个任 务。然而,tasktracker可以一次性运行多个任务,但都是在独立的JVM内运行 的。
控制任务JVM重用的属性是mapred.job.reuse.jvm.num.tasks,它指定给定 作业毎个JVM运行的任务的最大数,默认值为1(见表6-4),不同作业的任务总是 在独立的JVM内运行。如果该属性设置为-l,则意味着同一作业中的任务都可以 共享同一个JVM,数量不限。JobConf中的setNumTasksToExecutePerJvm()方 法也可以用于设置这个属性。
表6-4.任务JVM重用的属性
属性名称 | 类型 | 默认值 | 描述 |
---|---|---|---|
mapred.job.reuse. jvm.num.tasks |
int | 1 | 在一个tasktracker上,对于给定的作业的每个JVM上可以运行的任务最 大数。-1表示无限制,即同一个 JVM可以被该作业的所有任务使用 |
通过充分利用HotSpot JVM所用的运行时优化,计算密集型任务也可以受益于任 务JVM重用机制。在运行一段时间后,HotSpot JVM构達足够多的信息来检测代 码中的性能关键部分,并将热点部分的Java字节码动态转换成本地机器码。这对 运行时间长的过程很有效,但对于那些只运行几秒钟或几分钟的JVM,不能充分 获得HotSpot带来的好处。在这些情况下,值得启用任务JVM重用功能。
共享JVM的另一个非常有用的地方是:作业各个任务之间的状态共享。通过在静 态字段中存储相关数据,任务可以较快速访问共享数据。
跳过坏记录
大型数据集十分庞杂。它们经常有损坏的记录。它们经常有不同格式的记录。它们 经常有缺失的字段。理想情况下,用户代码可以很好地处理这些情况。但实际情况 中,忽略这些坏的记录只是权宜之计。取决于正在执行的分析,如果只有一小部分 记录受影响,那么忽略它们不会显著影响结果。然而,如果一个任务由于遇到一个 坏的记录而发生问题一通过抛出一个运行时异常——任务就会失败。失败的任务 将被重新运行(因为失败可能是由硬件故障或任务可控范围之外的一些原因造成 的),但如果一个任务失败4次,那么整个作业会被标记为失败。如果数据是导致任务抛出异常的“元凶”,那么重新运行任 务将无济于事,因为它每次都会因相同的原因而失败。
处理坏记录的最佳位置在于mapper和reducer代码。我们可以检测出坏记录并忽 略它,或通过抛出一个异常来中止作业运行。还可以使用计数器来计算作业中总的 坏记录数,看问题影响的范围有多广。
极少数情况是不能处理的,例如软件缺陷(bug)存在于第三方的库中,我们无法在mapper或reducer中修改它。在这些情况下,可以使用Hadoop的skipping mode选 项来自动跳过坏记录。
启用skipping mode后,任务将正在处理的记录报告给tasktracker。任务失败时, tasktracker重新运行该任务,跳过导致任务失败的记录。由于额外的网络流量和记 录错误以维护失败记录范围,所以只有在任务失败两次后才会启用skipping mode
因此对于一个一直在某条坏记录上失败的任务,tasktracker将运行以下taskattempt 得到相应的结果。
(1)任务失败。
(2)任务失败。
(3)开启skipping mode。任务失败,但是失败记录由tasktracker保存。
(4)仍然启用skipping mode。任务继续运行,但跳过上一次尝试中失败的坏记录。
在默认情况下,skipping mode是关闭的,我们用SkipBadRedcord类单独为map 和reduce任务启用此功能。值得注意的是,每次task attempt,skipping mode都只能检测出一个坏记录,因此这种机制仅适用于检测个别坏记录(也就是说,每个任只有少数几个坏记录)。为了给skipping mode足够多尝试次数来检测并跳过一个 输入分片中的所有坏记录,需要增加最多task attempt次数(通过 mapred.map.max.attemps 和 mapred.reduce.max.attemps 进行设置)。
Hadoop检测出来的坏记录以序列文件的形式保存在_logs/skip子目录下的作业输出 目录中。在作业完成后,可查看这些记录(例如,使用hadoop fs-text幻进行诊断。
任务执行环境
Hadoop为map任务或reduce任务提供运行环境相关信息。例如,map任务可以知 道它处理的文件的名称,map任务 或reduce任务可以得知任务的尝试次数。表6-5中的属性可以从作业的配置信息 中获得,通过为mapper或reducer提供一个configure()方法实现(其中,配置信 息作为参数进行传递),便可获得这一信息。
表6-5任务执行环境的属性
属性名称 | 类型 | 示例 |
---|---|---|
mapred.job.id | string | job_200811201130 0004 |
mapred.tip.id | string | task 200811201130_ 0004_m_000003 |
mapred.task.id | string | attempt_2008112011300004_m_000003:0 |
mapred.task.partition | int | 3 |
mapred.task.is.map | boolean | true |
Streaming环境变量
Hadoop设置作业配置参数作为Streaming程序的环境变量。但它用下划线来代替 非字母数字的符号,以确保名称的合法性。下面这个Python Streaming脚本解释了 如何用Python Streaming脚本来检索mapred.job.id属性的值。
os.environ["mapred_job_id"]
也可以应用Streaming 启动程序的-cmdenv选项,来设置MapReduce所启动的 Streaming进程的环境变量(一次设置一个变量)。比如,下面的语句设置了 MAGIC_PARAMETER 环境变量:
-cmdenv MAGIC_PARAMETER=abracadabra
任务附属文件
对于map和reduce任务的输出,常用的写方式是通过OutputCollector来收集键/值对。有一些应用需要比单个键/值对模式更灵活的方式,因此这些应用程序直 接将map或reduce任务的输岀文件写到分布式文件系统,如HDFS。
值得注意的是,要确保同一个任务的多个实例不会向同一个文件进行写操作。这需 要避免两个问题:如果任务失败并被重试,那么在第二个任务运行时原来的部分输 出依旧是存在的,所以应先删除原来的文件。第二个问题是,在启用推测执行的情 况下,同一任务的两个实例会同时写一个文件。
对于常规任务输出的这个问题,Hadoop的解决办法是,将输出写到这一次任务尝试特定的临时文件夹。这个目录是{mapred.output.dir}/_temporary/${mapred.task.id}。
一旦任务成功完成,该目录的内容就被复制到作业的输出目录 ${mapred.output.dir}。因此,如果一个任务失败并被重试,第一个任务尝试的部分 输出就会被清除。这个任务和该任务的推测实例(位于不同的工作目录,并且只有 第一个完成的任务才会把其工作目录中的内容传到输出目录,其他的都被丢弃。
Hadoop也为应用程序开发人员提供了使用这个特征的机制。一个任务可以通过从 其配置文件中检索mapred.work.output.dir属性的值,来找到它的工作目录。 另一种方法是,MapReduce程序使用Java API调用FileOutputFormat的getWorkOutputPath()静态方法以得到表示工作目录的Path对象。该系统框架 会在执行任务之前创建工作目录,所以用户不必自己动手新建。
举一个简单的例子,假设有一个程序用来转换图像文件的格式。一种实现方法是用 一个只有map任务的作业,其中每个map指定一组要转换的图像。如果 map 任务把转换后的图像写到它的工作目录,那么在任务成功完成之后,这些图像会被 传到输出目录。