Hadoop教程

MapReduce的工作流

至此,你已经知道MapReduce应用开发的机制了。我们目前还未考虑如何将数据 处理问题转化成MapReduce模型。

本书前面的数据处理都用来解决十分简单的问题(如在指定年份找到最高气温值的 记录)。如果处理过程更复杂,这种复杂度一般是因为有更多的MapReduce作业, 而不是更复杂的map和reduce函数。换而言之,通常是增加更多的作业,而不是 增加作业的复杂度。

对于更复杂的问题,可考虑使用比MapReduce更高级的语言,如 Pig、hive或 Cascading。一个直接的好处是:有了它之后,就用不着处理到MapReduce作业的 转换,而是集中精力分析正在执行的任务。

最后,Jimmy Lin 和 Chris Dyer 合著的 "Data-Intensive Text Processing with MapReduce"一书是学习MapReduce算法设计的优秀资源,强烈推荐。该书由 Morgan & Claypool 出版社于 2010 出版,网址为 http:〃mapreduce.me/。

将问题分解成MapReduce作业

让我们看一个更复杂的问题,我们想把它转换成MapReduce工作流。

假设我们想找到每个气象台每年每天的最高气温记录的均值。例如,要计算 029070-99999气象台的1月1日的每日最髙气温的均值,我们将从这个气象台的 1901年1月1日,1902年1月1日,直到2000年的1月1日的气温中找出每日最 高气温的均值。

我们如何使用MapReduce来计算它呢?计算自然分解为下面两个阶段。

(1)计算每对station-date的每日最高气温。
本例中的MapReduce程序是最髙气温程序的一个变种,不同之处在于本例中的键是一个综合的station-date,而不只是年份。

(2)计算每个station-day-month键的每日最高气温的均值。
mapper从上一个作业得到输出记录(station-date,最高气温值),丢掉年份部分,将其值投影到记录(station-day-month,最髙气温值)。然后reducer为每个 station-day-month键计算最高气温值的均值。

第一阶段的输出看上去就是我们想要的气象台的信息。(示例的 mean_max_daily_temp.sh 脚本提供了 Hadoop Streaming 中的一个实现)

029070-99999 19010101 0029070-99999 19020101 -94...

前两个字段形成键,最后一列是指定气象台和日期所有记录中的最高气温。第二阶 段计算这些年份中每日最高气温的平均值:

029070-99999 0101 -68

以上是气象台029070-99999在整个世纪中1月1日的日均最髙气温-6.81。

只用一个MapReduce过程就能完成这个计算,但它可能会让部分程序员花更多 精力。®设计更多(简单的)頁叩仏如£^阶段将导致更多可分解的、可维护的mapper和 reducer。第16章的案例学习包括使用MapReduce来解决的大量实际问题,在每个 例子中,数据处理任务都是使用两个或更多MapReduce作业来实现的。对于理解 如何将问题分解成MapReduce工作流,第16章所提供的详细介绍非常有价值。

相对于我们已经做的,mapper和reducer完全可以进一步分解。mapper 一般执行 输入格式解析、投影(选择相关的字段)和过滤(去掉无关记录)。在前面的mapper 中,我们在一个mapper中实现了所有这些函数。然而,还可以将这些函数分割到 不同的mapper,然后使用Hadoop自带的ChainMapper类库将它们连接成一个 mapper结合使用ChainReducer,你可以在一个MapReduce作业中运行一系列 的mapper,再运行一个reducer和另一个mapper链。


运行独立的作业

当MapReduce工作流中的作业不止一个时,问题随之而来:如何管理这些作业按 顺序执行?有几种方法,其中主要考虑的是:是否有一个线性的作业链或一个更复 杂的作业有向无环图(directed acyclic graph, DAG)。

对于线性链表,最简单的方法是一个接一个地运行作业,等前一个作业运行结束后 再运行下一个:

DobClient.runJob(conf1);DobClient.runJob(conf2);

如果一个作业失败,runJob()方法就抛出一个IOException,这样一来,管道中 后面的作业就无法执行。根据具体的应用程序,你可能想捕获异常,并清除前一个 作业输出的中间数据。

对于比线性链表更复杂的结构,有相关的类库可以帮助你合理安排工作流。它们也 适用于线性链表或一次性作业。最简单的是org.apache.hadoop.mapred. jobcontrol包中的]obControl类。]obControl的实例表示一个作业的运行 图,你可以加入作业配置,然后告知]0%0#「01实例作业之间的依赖关系。在一 个线程中运行]0%0#「01时,它将按照依赖顺序来执行这些作业。也可以査看进 程,在作业结束后,可以查询作业的所有状态和每个失败相关的错误信息。如果一 个作业失败,JobControl将不执行与之有依赖关系的后续作业。

Oozie

不同于在客户端运行并提交作业的JobControl, Ooize{http://yahoo.github.com/oozie)作为服务器运行,客户端提交一个工作流到服务器。在Ooize中,工作流是 一个由动作化出0幻节点和控制流节点组成的DAG(有向无环图)。动作节点运行MapReduce作业或Pig作业来执行工作流任务,就像HDFS的移动文件操作。控制 流节点通过构建条件逻辑(不同执行分支的执行依赖于前一个动作节点的输出结果) 或并行执行来管理活动之间的工作流执行情况。当工作流结束时,Oozie通过发送一个HTTP的回调向客户端通知工作流的状态。还可以在毎次进入^作流或退出一个动作节点时接收到回调。

Oozie允许失败的工作流从任意点重新运行。这对于处理工作流中由于前一个耗时 活动而出现瞬态错误的情况非常有用。

关注微信获取最新动态