Hadoop教程

关于Cascading

Cascading是一个开源的Java库和应用程序编程接口(API),它为MapReduce提供 了一个抽象层。它允许开发者构建出能在Hadoop集群上运行的复杂的、关键任务 的数据处理应用。

map和reduce操作提供了强大的原语操作。然而,在创建复杂的、可以被不同开 发者共享的合成性髙的代码时,它们粒度级别似乎不合适。再者,许多开发者发现 当他们面对实际问题的时候,很难用MapReduce的模式来思考问题。

为了解决第一个问题,Cascading用简单字段名和一个数据元组模型值来替代 MapReduce使用的键和值,而该模型的元组是由值的列表构成的。对第二个问 题,Cascading直接从Map和Reduce操作分离出来,引入了更高层次的抽象: Function, Filter, AggregatorfpBuffer。

其他一些可选择的方案在该项目初始版本公开发布的同时基本上也出现了,但 Cascading的设计初衷是对它们进行补充和完善。主要是考虑到大部分可选的架构 都是对系统强加一些前置和后置条件或有其他方面的要求而已。

例如,在其他几种MapReduce工具里,运行应用程序之前,你必须对数据进行预 格式化处理、过滤或把数据导入HDFS(Hadoop分布式文件系统)。数据准备步骤必 须在系统的程序设计抽象之外完成。相反,Cascading提供方法实现把数据准备和 管理作为系统程序设计抽象的组成部分。

该实例研究将首先介绍Cascading的主要概念,最后概括介绍ShareThis如何在自 己的基础框架上使用Cascading。

如果希望进一步了解Cascading处理模型,请参见项目主页上的“Cascading用户 手册”。

字段、元组和管道

MapReduce模型使用键和值的形式把输入数据和Map函数,Map函数和Reduce函 数以及Reduce函数和输出数据联系起来。

但据我们所知,实际的Hadoop应用程序通常会将多个MapReduce作业链在一起。 看一下用MapReduce模型实现的一个典型的字数统计例子。如果需要根据统计出 来的数值进行降序排列,这是一个可能的要求,它将需要启动另一个MapReduce 作业来进行这项工作。

因此,理论上来说,键和值的模式不仅把Map和Reduce绑定到一起,它也把 Reduce和下一次的Map绑定了,这样一直进行下去(图16-11)。即键/值对源自输 入文件,流过Map和RedUCe操作形成的链,并且最后终止到一个输出文件。实现 足够多这样链接的MapReduce应用程序,便能看出一系列定义良好的键/值操作, 它们被一遍一遍地用来修改键/值数据流的内容。

图16-11.基于MapReduce的计数和排序

Cascading系统通过使用具有相应字段名的元组(与关系型数据库中的表名和列名类 似)来替代键/值模式的方法简化了这一处理流程。在处理过程中,由这些字段和元 组组成的流数据在它们通过用户定义的、由管道(pipe)链接在一起的操作时得以处 理(图 16-12)。

因此,MapReduce的键和值被简化成如下形式。

字段是一个String(字符串)类型的名称集合(如“first_name”)、表示位置信 息的数值(如2和-1分别是第三和最后一个位置)或是两者混合使用的集合, 与列名非常像。因此字段用来声明元组里值的名称和通过名称在元组中选出对 应的值。后者就像执行SQL的select语句。

图16-12.由字段和元组链接的管道 元组

元组就是由java.lang.Comparable类对象组成的数组。元组与数据库中的 行或记录类似。

Map和Reduce操作都被抽象隐藏到一个或多个管道实例之后(图16-13)。

Each管道一次只处理一个单独的输入元组。它可以对输入元组执行一个 Function或一个Filter操作(后文马上要介绍)。

GroupBy管道在分组字段上对元组进行分组。该操作类似于SQL的group by语句。如果元组的字段名相同,它也能把多个输入元组数据流合并成一个 元组数据流。

CoGroup管道既可以实现元组在相同的字段名上连接,也可以实现基于相同 字段的分组。所有的标准连接类型(内连接——inner join,外连接——outer join等) 以及自定义连接都可以用于两个或多个元组数据流。

图16-13.管道类型

Every管道每次只处理元组的一个单独分组的数据,分组数据可以由GroupBy 或CoGroup管道产生。Every管道可以对分组数据应用Aggregator或 Buffer 操作。

SubAssembly管道允许在一个单独的管道内部进行循环嵌套流水线处理,或 反过来,一个管道也可以被嵌入更加复杂的流水线处理中。

所有这些管道被开发者链接在一起形成“管道流水线处理流程”,这里每个流水线 可以有很多输入元组流(源数据,source)和很多输出元组流(目标数据,sink)(见图 16-14)。

图16-14.简单的管道流水线

从表面上看来,这可能比传统的MapReduce模型更复杂。并且,不可否认,相较 于Map, Reduce,Key和Value,这里涉及的概念更多。但实际上,我们引入了更 多的概念,它们必须都工作协助提供不同的功能。

例如,如果一个开发者想对reducer的输出值提供“辅助排序”功能,她将需要实 现Map、Reduce , 一个“合成” Key(嵌套在父Key中的两个Key),值, partitioner、一个用于“输出值分组”的comparator和一个“输出键”的 comparator,所有这些概念以各种方式结合协作使用,并且在后续的应用中几乎不 可重用。

在Cascading里,这项工作只对应一行代码:new GroupBy(<previous>,<groupingfields>, <secondary sorting fields>),其中 previous 是数据源管道。


操作

如前所述,Cascading通过引入一些替换性操作脱离了MapReduce模式,这些操作 或应用于单个元组,或应用于元组分组(图16-15)。

Function作用于单个的输入元组,对每个输入,它可能返回0或多个输出元组。 Function操作供 Each类型的管道使用。

图16-15.操作类型

Filter是一种特殊的函数,它的返回值是boolean(布尔)值,用于指示是否 把当前的元组从元组流中删除。虽然定义一个函数也能实现这一目的,但是 Filter是为实现这一目的而优化过的操作,并且很多过滤器能够通过逻辑运 算符(如And,Or,Xor和Not)分组,可以快速创建更复杂的过滤操作。

Aggregator对一组元组执行某种操作,这些分组元组是通过一组共同字段分 组得到的。比如,字段“last-name”值相同的元组。

Buffer和Aggregator操作类似,不同的是,它被优化用来充当一个“滑动 窗口”扫描一个唯一分组中所有的元组。当开发者需要有效地为一组排序的元 组插入遗漏的值时,或计算动态均值的时候,这个操作非常有用。通常,处理 元组分组数据的时候,Aggregator也是一个可选的操作,因为很多 Aggregator能够有效地链接起来工作,但有时,Buffer才是处理这种作业的 最佳工具。

管道流水线创建的时候,这些操作便绑定到各管道(图16-16)。

Each和Every类型的管道提供了一种简单的元组选择机制,它们可以选择一些或 所有的输入元组,然后把这些选择的数据传送给它的子操作。并且我们有一个简单 的机制把这些操作的结果和原来的输入元组进行合并,然后产生输出元组。这里并 不详细说明机制,它使得每个操作只关心参数指定的元组值和字段,而不是当前输 入元组的整个字段集。其次,操作在不同应用程序之间重用,这点和Jave方法重 用的方式相同。

图16-16.操作流程


Tap类、Scheme对象和Flow对象

在前面的几个图中,我们多次提到源数据(source)和目标数据(sink)。在Cascading 系统中,所有的数据都是读自或写入Tab类实例,但是它们是通过Scheme对象被 转换成或取自元组实例对象。

Tap

Tap类负责如何访问数据以及从哪个位置访问数据。例如,判断数据是存于HDFS还是存于本地?在Amazon S3中,还是跨出HTTP协议进行访问?

Scheme

Scheme类负责读取原始数据并把它们转换成元组格式/或把元组数据写入原始 数据格式文件,这里的原始数据可以是文本行、Hadoop 二进制的顺序文件或 是一些专用格式数据。

注意,Tap类对象不是管道处理流程的一部分,因此它们不是pipe类型。

但是当Tap对象在集群上变得可执行的时候,它们就和管道组件关联到一起。当一个管道处理流程与必要的几个源和目标数据Tap实例关联一起后,我们就得到一个Flow对象。Flow对象是在管道处理流程与指定数量的源及目标数据Tap关 联时创建的,而Tap对象的功能是输出或获取管道流程期望的字段名。就是说, 如果Tap对象输出一个具有字段名“line”的元组(通过读取HDFS上的文件数 据),那么这个管道流程头部必须也希望字段名是“line”。否则,连接管道处理流 程和Tap的处理程序会立刻失败并报错。

因此,管道处理流程实际上就是数据处理定义,并且它们本身不是“可执行”的。 在它们可以在集群上运行之前,必须连接到源和目标Tap对象。这种把Tap和管 道处理流程分开处理的特性使Cascading系统非常强大。

如果认为管道处理流程和Java类相似,那么Flow就像Java对象实例(图16-17)。 也就是说,在同一个应用程序里面,同样的管道处理流程可以被实例化很多次从而 形成新的Flow,不用担心它们之间会有任何干扰。如此一来,管道处理流程就可 以像标准Java库一样创建和共享。

图16-17.流水线处理过程


Cascading 实战

现在我们知道Cascading是什么,清楚地了解它是如何工作的,伹是用Cascading 写的应用程序是什么样子呢?我们来看看例16-2。

例16-2.字数统计和排序

Scheme sourceScheme =new TextLine(new Fields("line"));  Tap source =new Hfs(sounceSchemG, inputPath); Scheme sinkScheme = new TextLine(); Tap sink =new Hfs(sinkScheme, outputPath) SinkMode.REPLACE);  Pipe assembly = new Pipe("wondcount"); String negexStning = "(?<!\\pL)(?=\\pL)[^ ]*(?<=\\pL)(?!\\pL)";Function negex = new RegexGenerator(new Fields("wond"), negexStning); assembly =new Each(assembly, new Fields("line"), negex);  assembly =new GnoupBy(assembly, new Fields("wond")); Aggregator count = new Count(new Fields("count")); assembly = new Eveny(assembly, count); assembly =new GroupBy(assembly) new Fields("count"), new Fields("wond")); FlowConnector flowConnector = new FlowConnector();Flow flow =flowConnector.connect("word-count",source, sink, assembly);  flow.complete();

在这个例子里,我们统计输入文件中的不同单词的数量,并根据它们的自然序(升 序)进行排序。假如有些词的统计值相同,这些词就根据它们的自然顺序(字母序) 排序。

这个例子有一个明显的问题,即有些词可能会有大写字母;例如,“the”和 “The”,当它出现在句首的时候就是“The”。因此我们可以插入一个新的操作 来强制所有单词都转换为小写形式,但是我们意识到那些需要从文档中解析词语的 所有将来的应用都必须做同样的操作,因此我们决定创建一个可重用的管道 SubAssembly,如同我们在传统应用程序中创建一个子程序一样(参见例16-3)。

例 16-3.创建一个 SubAssembly

public class ParseWordsAssembly extends SubAssembly   {public ParseWordsAssembly(Pipe previous)  {String regexString = "(?<!\\pL)(? = \\pL)[^ ]*(?< = \\pL)(?!\\pL)"; Function regex = new RegexGenerator(new Fields("word"), regexString); previous = new Each(previous, new Fields("line"), regex);String exprString = "word.toLowerCase()";Function expression =new ExpressionFunction(new Fields("word"), exprString,String.class); previous = new Each(previouS, new Fields("word"),'expression); setTails(previous); 

首先,我们新建一个SubAssembly类,它管理我们的“解析词”管道组件。因为 这是一个Java类,所以可用于其他任何应用程序,当然这要求它们处理的数据中 有word字段(例16-4)。注意,也有办法可以使这个函数更加通用,这些方法在 “Cascading用户手册”中都有介绍。

例16-4.用一个SubAssembly扩展单词计数和排序

Scheme sourceScheme = new TextLine(new Fields("line"));Tap source = new Hfs(sourceScheme, inputPath);Scheme sinkScheme = new TextLine(new Fields("word", "count"));Tap sink = new Hfs(sinkScheme, outputPath, SinkMode.REPLACE);Pipe assembly = new Pipe("wordcount");assembly =new' ParseWordsAssembly(assembly); assembly = new GroupBy(assembly, new Fields("word"));Aggregator count = new Count(new Fields("count")); assembly = new Every(assembly, count);assembly = new GroupBy(assembly, new Fields("count"), new Fields("word"));FlowConnector flowConnector = new FlowConnector();Flow flow = flowConnector.connect(*'word-count", source, sink, assembly);flow.complete();

灵活性

后退一步,让我们来看看这个新的模型给我们带来了什么好处,或更妙的是,消除 了哪些不足。

可以看出,我们不必再用MapReduce作业模式来考虑问题,或考虑Mapper和Reducer接口的实现问题,后续的MapReduce作业和前面的MapReduce作业如何 绑定或链接。在运行的时候,Cascading “规划器”(planner)会算出最优的方法把管 道处理流程切分成MapReduCe作业,并管理作业之间的链接(图16-18)。

图16-18.怎么把Flow翻译成链式MapReduce作业

因此,开发者可以以任何粒度来构造自己的应用程序。它们可以一开始就只是一个 很小的做日志文件过滤处理的应用程序,但是后来可以根据需要不断增添新的 功能。

Cascading是一个API而不是类似SQL的字符串句法,因此它更灵活。首先,开发 者能用他们熟悉的语言创建特定领域语言(domain-specific language, DSL),像 Groovy,JRuby,Jython , ScaIa等(示例参见项目网站)。其次,开发者能对 Cascading不同的部分进行扩展,像允许自定义Thrift或JSON对象使其能读写, 并且允许它们以元组数据流的形式传送。


Hadoop 和 Cascading 在 ShareThis 的应用

ShareThis是一个方便用户共享在线内容的共享网络。通过单击网页上或浏览器插 件上的一个按钮,ShareThis允许用户无缝地访问他们的任何在线联系人及在线网 络,并且允许他们通过电子邮件,IM, Facebook, Digg,手机SMS等方式共享它 们的内容,而这一过程的执行甚至不要求他们离开当前的访问网页。发布者能配置 他们的ShareThis按钮来标记服务的全球共享能力,如此推动网络流量,刺激传播 活动,追踪在线内容的共享。通过减少网页不需要的内容及提供通过社会网络、隶 属组和社区实时的内容发布功能,ShareThis还简化了社区媒体服务。

ShareThis用户通过在线窗口共享网页和信息时,一个连续的事件数据流就进入 ShareThis网络。这些事件首先要过滤和处理,然后传送给各种后台系统,包括 AsterData, Hypertable 和 Katta。

这里着重介绍“日志处理管道”(图16-19)。日志处理管道只是简单地从S3文件夹(bucket)里读取数据,进行处理(稍后介绍),然后把结果存入另一个文件夹。简单 消息队列服务(Simple Queue Service,SQS)用于协调各种事件的处理,用它来标记 数据处理执行程序的开始和完成状态。下行数据流是一些其他的处理程序,它们用 于拖动数据装载AsterData数据仓库,如从Hypertable系统获取URL列表作为网 络爬取工具的下载源,或把下载的网页推入Katta系统来创建Lucene索引。注 意,Hadoop系统是ShareThis整个架构的中心组件。它用于协调架构组件之间的数 据处理和数据移动工作。

有了 Hadoop系统作为前端处理系统,在所有事件日志文件被加载到AsterData集 群或被其他组件使用之前,它会基于一系列规则基子一系列规则对数据进行解析、 过滤、清理和组织。AsterData是一个集群化数据众库系统,它能支持大数据存 储,并允许使用标准的SQL语法发出复杂的即时查询请求。ShareThis选择 ^^如叩集群来进行数据清理和准备工作,然后它把数据加载到八31打0“3集群实现 即时分析和报告处理。尽管使用AsterData也有可能达到我们的目的,但是在处理 流程的第一阶段使用Hadoop系统来抵消主数据仓库的负载具有重要意义。

为了简化开发过程,制定不同架构组件间的数据协调规则以及为这些组件提供面向 开发者的接口,Cascading被选为主要的数据处理API。这显示出它和“传统的” Hadoop用例的差别,它们主要是用“Hadoop”来实现对存储数据的査询处理。

图16-19. ShareThis日志处理管道

相反的,Cascading和HadMp的结合使用为端到端的完整解决方案提供了一个更 好、更简单的结构,因此对用户来说更有价值。

对于开发者来说,Cascading的学习过程很简单,它从一个简单的文本解析单元测 试(通过创建Cascading.ClusterTestCase类的子类)开始,然后把这个单元程序 放入有更多规则要求的处理层,并且在整个过程中,与系统维护相关的应用逻辑组 织不变。Cascading用以下几种方法帮助保持这种逻辑组织的不变性。首先,独立 的操作(Function, Filter等)都可以进行独立写和测试。其次,应用程序被分成 不同的处理阶段:一个阶段是解析,一个阶段是根据规则要求进行处理,最后一个 阶段是封装/整理数据,所有这些处理都是通过前述的SubAssembly基础类实现的。

ShareThis的日志文件数据看起来非常像Apache日志文件,它们有日期/时间戳、 共享URL、引用页URL和一些元数据。为了让分折下行数据流使用这些数据,这 些URL必须先解压(解析查询字符串数据和域名等)《因此需要创建一个高层的 SubAssembly对象来封装解析工作,并且,如果字段解析很复杂,SubAssembly 子对象就可被嵌入来解析一些特定字段。

我们使用同样的方式来应用处理规则。当毎个Tuple对象通过SubAssembly对象 实例的时候,如果有任何规则被触发,该对象就会被标记上标签“坏”(bad)。具 有“坏”字标签的Tuple对象,会被附上被标记的原因用于后来的审查工作。

最后,创建一个切分SubAssembly对象来做两件事。第一,用于对元组数据流进 行分流处理,一个数据流针对标记“好”(good)的数据,另一个针对标记“坏”的 数据。第二件是,切分器把数据切分成片,如以小时为单位。为了实现这一动作, 只需要两个操作:第一个是根据已有数据流的timestamp(时间戳)创建区间段,第 二个是使用interval(区间)和good/bad元数据来创建目录路径(例如,“05/good/” 中“05”是早上5点,“good”是经过所有规则验证的数据)。这个路径然后被Cascading TemplateTap使用,这是一个特殊的Tap类型,它可以根据Tuple对 象值把元组数据流动态输出到不同的路径位置。

本例中,“path”值被Template Tap用来创建最终输出路径。 开发者也创建了第四个SubAssembly类型对象 它用于在单元测试时应用Cascading Assertion断言)类。这些断言用来复查规则组件和解析SubAssembly做的工作。 在例16-5的单元测试中,我们看到partitioner没有被检测,但是它被放入另外一 个这里没有展示的集成测试中了。

例16-5.Flow单元测试

public void testLogParsing() throws IOException {Hfs source = new Hfs(new TextLine(new Fields("line")), sampleData);Hfs sink =new Hfs(new TextLine(), outputPath + "/parser", SinkMode.REPLACE);Pipe pipe = new Pipe("parser");// split "line" on tabspipe = new Each(pipe, new Fields("line"), new RegexSplitter("\t"));pipe = new LogParser(pipe);pipe = new LogRules(pipe);// testing only assertionspipe = new ParserAssertions(pipe);Flow flow = new FlowConnector().connect(source, sink, pipe); flow.complete(); // run the test flow// verify there are 98 tuples, 2 fields, and matches the regex pattern // for TextLine schemes the tuples are { "offset", "line } validateLength(flow, 98, 2, Pattern.compile("^[0-9] + (\\t[^\\t]*){19}$"));}

针对集成和部署,许多Cascading内置属性都可以使该系统和外部系统更容易集 成,并进行更大规模的处理工作。

在生产环境中运行时,所有的SubAssembly对象都连接起来并规划到一个Flow 对象里,但是除了有源和目标Tap对象之外,我们也设计了trap(捕捉)Tap类型 (图16-20)。通常,当远程的Mapper或Reducer任务的操作抛出一个异常的时候, Flow对象就会失败并杀死它管理的所有MapReduce作业。当一个Flow有trap 的时候,所有的异常都会被捕捉并且造成异常的数据信息会被保存到当前这个捕捉 程序对应的了3卩对象里。然后可以在不终止当前Flow的情况下,继续处理下一个 Tuple对象。有时你想让程序在出现错误的时候就停止,但在这里,ShareThis开 发者知道在生产系统运行的时候,他们能同时回览并査看“失败”的数据,然后更新其单元测试。丢失几个小时的处理时间比丢失几个坏记录数据更糟糕。

使用Cascading的事件监听器,Amazon SQS可被集成进来。当一个Flow结束的时 候,系统就发送一条消息来通知其他系统它们已经可以从Amazon S3上获取准备 好的数据了。当Flow处理失败的时候,会有不同的消息发送,向其他的进程 报警。其余的位于不同的独立集群的下行数据流进程将在中断的日志处理管道位置处开始 处理。现在日志处理管道一天运行一次,因此没有必要让100个节点的集群闲着运 转23个小时。因此我们是毎24小时执行一次终止和启用操作。

将来,在小型的集群上根据业务需求,增加运行间歇期到毎6个小时一次或1小时 一次都是非常简单的。其他的集群系统可以独立地根据各自负责的业务需要以不同 的间隔期启用或关闭。例如,网络数据爬取组件(使用Bixo,它是EMI和 ShareThis开发的基于Cascading的网络数据爬取工具)可以在一个小型集群上与 Hypertable集群协作连续运转。这种随需应变的模型在Hadoop上运行良好,毎个 集群都能把工作负载调节到它期望处理的数量级。

图 16-20. ShareThis 日志处理 Flow


总结

对于处理和协调跨不同架构组件的数据的移动这个问题,Hadoop是一个非常强大 的平台。它唯一的缺点是它的主要计算模型是MapReduce。

Cascading的目标是(不用MapRedue模式来考虑设计方案的情况下)帮助开发者通过 使用一个逻辑定义良好的API来快速而简单地建立强大的应用程序,而同时又把 提髙数据分布、复制、分布式处理管理的性能和程序活性的工作都留给了Hadoop。

关注微信获取最新动态