Hadoop教程

剖析MapReduce作业运行机制

可以只用一行代码来运行一个MapReduce作业:JobClient.runJob(conf)。这 个简短的代码,幕后隐藏着大量的处理细节。本小节将揭示Hadoop运行作业时所 采取的措施。

整个过程如图6-1所示。包含如下4个独立的实体。

  • 客户端:提交MapReduce作业。

  • jobtracker:协调作业的运行。jobtracker是一个Java应用程序,它的主类是 JobTracker。

  • tasktracker:运行作业划分后的任务。tasktracker是Java应用程序,它的主类 是 TaskTracker。

  • 分布式文件系统(一般为HDFS,参见第3章),用来在其他实体间共享作业 文件。

  • 作业的提交

    Jobclient的runJob()方法是用于新建JobClient实例并调用其submitJob()方法的便捷方式(见图6-1的步骤1)。提交作业后,runJob()每秒轮询作业的进 度,如果发现自上次报告后有改变,便把进度报告到控制台。作业完成后,如果成 功,就显示作业计数器。如果失败,导致作业失败的错误被记录到控制台。


  • 图6-1.

    JobClient的submitJob()方法所实现的作业提交过程如下。

  • 向Jobtracker请求一个新的作业ID(通过调用JobTracker的getNewJobId()方法获取)。参见步骤2。

  • 检查作业的输出说明。例如,如果没有指定输出目录或输出目录已经存在,作 业就不提交,错误抛回给MapReduce程序。

  • 计算作业的输入分片。如果分片无法计算,比如因为输入路径不存在,作业就 不提交,错误返回给MapReduce程序。

  • 将运行作业所需要的资源(包括作业JAR文件、配置文件和计算所得的输入分 片)复制到一个以作业ID命名的目录下jobtracker的文件系统中。作业JAR的 副本较多(由mapred.submit.replication属性控制,默认值为10),因此在 运行作业的任务时,集群中有很多个副本可供tasktracker访问。参见步骤3。

  • 告知jobtracker作业准备执行(通过调用JobTracker的submitJob()方法实现)。参见步骤4。


  • 作业的初始化

    当JobTracker接收到对其submitJob()方法的调用后,会把此调用放入一个内部队列中,交由作业调度器(job scheduler)进行调度,并对其进行初始化。初始化 包括创建一个表示正在运行作业的对象——封装任务和记录信息,以便跟踪任务的状 态和进程(步骤5)。

    为了创建任务运行列表,作业调度器首先从共享文件系统中获取JobClient已计 算好的输入分片信息(步骤6)。然后为每个分片创建一个map任务。创建的reduce 任务的数量由]obConf的mapred.reduce.task属性决定,它是用 setNumReduceTasks()方法来设置的,然后调度器创建相应数量的要运行的reduce任务。任务在此时被指定ID。


    任务的分配

    tasktracker运行一个简单的循环来定期发送“心跳”(heartbeat)给jobtracker。“心 跳”告知jobtracker, tasktracker是否还存活,同时也充当两秦之间的消息通道。 作为“心跳”的一部分,tasktracker会指明它是否已经准备好运行新的任务,如果 是,jobtracker会为它分配一个任务,并使用“心跳”的返回值与tasktracker进行 通信(步骤7)。

    在Jobtracker为tasktracker选择任务之前,Jobtracker必须先选定任务所在的作业。 本章后面将介绍各种调度算法,但是默认的 方法是简单维护一个作业优先级列表。一旦选择好作业,jobtracker就可以为该作 业选定一个任务。

    对于map任务和reduce任务,tasktracker有固定数量的任务槽。例如,一个 tasktracker可能可以同时运行两个map任务和两个reduce任务。准确数量由 tasktracker核的数量和内存大小来决定。默认调 度器在处理reduce任务槽之前,会填满空闲的map任务槽,因此,如果 tasktracker至少有一个空闲的map任务槽,jobtracker会为它选择一个 map任务, 否则选择一个reduce任务。

    为了选择一个reduce任务,jobtracker简单地从待运行的reduce任务列表中选取下 一个来执行,用不着考虑数据的本地化。然而,对于一个map任务,jobtracker会 考虑tasktracker的网络位置,并选取一个距离其输入分片文件最近的tasktracker。 在最理想的情况下,任务是数据本地化的(data-local),也就是任务运行在输入分片 所在的节点上。同样,任务也可能是机架本地化的(rack-local):任务和输入分片在 同一个机架,但不在同一节点上。一些任务既不是数据本地化的,也不是机架本地 化的,而是从与它们自身运行的不同机架上检索数据。可以通过查看作业的计数器得知每类任务的比例。


    任务的执行

    现在,tasktracker已经被分配了一个任务,下一步是运行该任务。第一步,通过从 共享文件系统把作业的JAR文件复制到tasktracker所在的文件系统,从而实现作 业的JAR文件本地化。同时,tasktracker将应用程序所需要的全部文件从分布式缓存复制到本地磁盘(步骤8)。第二步,tasktracker为任务新建一个本地工作目录,并把JAR文件中的内容解压到这个文件 夹下。第三步,tasktracker新建一个TaskRunner实例来运行该任务。

    TaskRunner启动一个新的JVM(步骤9)来运行毎个任务(步骤10),以便用户定义 的map和reduce函数的任何软件问题都不会影响到tasktracker(例如导致崩溃或挂 起等)。但在不同的任务之间重用JVM还是可能的。子进程通过umbilical接口与父进程进行通信。任务的子进程毎隔几秒便告知父进 程它的进度,直到任务完成。

    Streaming 和 Pipes

    Streaming和Pipes都运行特殊的map和reduce任务,目的是运行用户提供的可执 行程序,并与之通信(参见图6-2)。

    在Streaming中,任务使用标准输入和输出Streaming与进程(可以用任何语言编写) 进行通信。另一方面,Pipes任务则监听套接字(socket),发送其环境中的一个端口号给C++进程,如此一来,在开始时,C++进程即可建立一个与其父Java Pipes任务的持久化套接字连接(persistent socket connection)。

    在这两种情况下,在任务执行过程中,Java进程都会把输入键/值对传给外部的进 程,后者通过用户定义的map或reduce函数来执行它并把输出的键/值对传回Java 进程。从tasktracker的角度看,就像tasktracker的子进程自己在处理map或 reduce代码一样。


    进度和状态的更新

    MapReduce作业是长时间运行的批量作业,运行时间范围从数秒到数小时。这是 一个很长的时间段,所以对于用户而言,能够得知作业进展是很重要的。一个作业 和它的毎个任务都有一个状态(status),包括:作业或任务的状态(比如,运行状态,成功完成,失败状态)、map和reduce的进度、作业计数器的值、状态消息或 描述(可以由用户代码来设置)。这些状态信息在作业期间不断改变,它们是如何与 客户端通信的呢?

    任务在运行时,对其进度(progress,即任务完成百分比)保持追踪。对map任务, 任务进度是已处理输入所占的比例。对reduce任务,情况稍微有点复杂,但系统 仍然会估计已处理reduce输入的比例。整个过程分成三部分,与shuffle的三个阶 段相对应。比如,如果任务已经执行 reducer 一半的输入,那么任务的进度便是5/6。因为已经完成复制和排序阶段(每 个占1/3),并且已经完成比如^阶段的一半(1/6)。


  • 图6-2.执行的Streaming和Pipes与tasktracker及其子进程的关系

    MapReduce中进度的组成

    进度并不总是可测量的,但是无论如何,它能告诉Hadoop有个任务正在运 行。比如,写输出记录的任务也可以表示成进度,尽管它不能用总的需要写的 百分比这样的数字来表示,因为即使通过任务来产生输出,也无法知道后面的 情况。

    进度报告很重要,因为这意味着Hadoop不会让正在执行的任务失败。构成进 度的所有操作如下:

  • 读入一条输入记录(在mapper或reducer中)

  • 写入一条输出记录(在mapper或reducer中)

  • 在一个Reporter中设置状态描述(使用Reporter的setStatus()方法)

  • 增加计数器(使用Reporter的incrCounter()方法)

  • 调用Reporter的progress()任务

  • 任务也有一组计数器,负责对任务运行过程中各个事件进行计数,这些计数器要么内置于框架中,例如已写入的map输出记录 数,要么由用户自己定义。

    如果任务报告了进度,便会设置一个标志以表明状态变化将被发送到tasktracker。 有一个独立的线程毎隔三秒检查一次此标志,如果已设置,则告知tasktracker当前 任务状态。同时,tasktracker每隔五秒发送“心跳”到jobtracker(5 秒这个间隔是最小值,因为“心跳”间隔是实际上由集群的大小来决定的:对于一个更大的集 群,间隔会更长一些),并且由tasktracker运行的所有任务的状态都会在调用中被 发送至jobtreacker。计数器的发送间隔通常少于5秒,因为计数器占用的带宽相对 较高。

    jobtracker将这些更新合并起来,产生一个表明所有运行作业及其所含任务状态的 全局视图。最后,正如前面提到的,JobClient通过毎秒査询jobtracker来接收最 新状态。客户端也可以使用]obClient的86«0匕()方法来得到一个RunningJob 的实例,后者包含作业的所有状态信息。

    图6-3对方法的调用进行了图解。


    作业的完成

    当jobtracker收到作业最后一个任务已完成的通知后,便把作业的状态设置为“成 功”。然后,在JobClient的查询状态时,便知道任务已成功完成,于是JobClient打印一条消息告知用户,然后从runJob()方法返回。


  • 图6-3.状态更新在MapReduce系统中的传递流程

    如果Jobtracker有相应的设置,也会发送一个HTTP作业通知。希望收到回调指令 的客户端可以通过job.end.notification.url属性来进行这项设置。

    最后,jobtracker清空作业的工作状态,指示tasktracker也清空作业的工作状态(如 删除中间输出)。

关注微信获取最新动态