Hadoop教程

Rackspace的日志处理

Rackspace Hosting一直为企业提供管理系统,以同样的方式,Mailtrust在 2007 秋 变成Rackspace的邮件分部。Rackspace目前在几百台服务器上为100多万用户和 几千家公司提供邮件服务。

要求/问题

通过系统传输Rackspace用户的邮件产生了相当大的“文件”路径信息,它们以各 种格式的日志文件的形式存放,每天大约有150GB。聚集这些数据对系统发展规 划以及了解用户如何使用我们的系统是非常有帮助的,并且,这些记录对系统故障 排查也有好处。

假如一封邮件发送失败或用户无法登陆系统,这时非常重要的事是让我们的客服能 找到足够的问题相关信息开始调试。为了能够快速发现这些信息,我们不能把日志 文件放在产生它们的机器上或以其原始格式存放。相反,我们使用Hadoop来做大 量的日志处理工作,而其结果被Lucene索引之后用来支持客服的查询需求。

日志

数量级最大的两种日志格式是由Postfix邮件发送代理和Microsoft Exchange Server 产生的。所有通过我们系统的邮件都要在某个地方使用Postfix邮件代理服务器, 并且大部分消息都要穿越多个Postfix服务器。Exchange是必须独立的系统,但是 其中有一类profix服务器充当一个附加保护层,它们使用SMTP协议在各个环境 下的托管邮箱之间传递消息。

消息要穿越很多机器,但是每个服务器只知道邮件的目的地,然后发送邮件到下一 个负责的服务器。因此,为了给消息建立完整的历史信息,我们的日志处理系统需要拥有系统的全局视图。Hadoop给予我们的最大帮助是:随着我们的系统发展壮 大,系统日志量也随之增长。为了使我们的日志处理逻辑仍然可行,我们必须确保 它能扩展。MapReduce就是一个可以处理这种数据增长的完美系统架构。


简史

我们日志处理系统的前几个版本都基于MySQL的,但随着我们拥有越来越多的日 志机器,我们达到了一个MySQL服务器能够处理的极限。虽然该数据库模式已经 进行了适度的非规范化处理,使其能够较轻松地进行数据切片,但目前MySQL对 数据分区的支持仍然很脆弱。我们没有在MySQL上去实现自己的切片和处理方 案,而是选择使用Hadoop。


选择Hadoop

一旦选择在RDBMS(关系型数据库管理系统)上对数据进行分片存储,你就丧失了 SQL在数据集分析处理方面的很多优势。Hadoop使我们能够使用针对小型数据集 使用的同样的算法来轻松地并行处理所有数据。


收集和存储

曰志收集

产生日志的服务器分布在多个数据中心,但目前我们只有一个单独的Hadoop集 群,位于其中一个数据中心(见图16-8)。为了汇总日志数据并把它们放入集群,我 们使用syslog-ng(Unix syslog机制的替代机制)和一些简单的脚本来控制在如何Hadoop上新建文件。

图 16-8. Rackspace 的 Hadoop 数据流

在一个数据中心里,syslog-ng用于从source(源)机器传送日志数据到一组负载均衡 的collector(收集器)机器。在这些收集器上,每种类型的日志数据被汇成一个单独 的数据流,并且用gzip格式进行轻量级的压缩(图16-8步骤A)。远程收集器的数 据通过SSH通道跨数据中心传送到Hadoop集群所在的“本地收集器”(local collector)上(步骤B)。

一旦压缩的日志流到达本地收集器,数据就会被写入Hadoop(步骤C)。目前我们使 用简单的Python脚本把输入数据缓存到本地硬盘,并且定期使用Hadoop命令行界 面把数据放入Hadoop集群。当缓存日志数据量达到Hadoop数据块大小的倍数或 是缓存已经经过了足够长的时间时,脚本程序开始复制日志缓存数据到Hadoop的 各个输入文件夹。

这种从不同数据中心安全地汇总日志数据的方法在Hadoop支持SOCKS之前就已 经有人开发使用了,SOCKS 是通过 hadoop.rpc.socket.factory.class.default 参数和SocksSocketFactory类实现的。通过直接使用远程收集器对SOCKS的支 持和HDFS(分布式Hadoop文件系统)的API(应用程序编程接口),我们能够从系统 中消除一个磁盘的写入操作和降低系统的复杂性。我们计划在将来的开发中实现一 个使用这些特性的替代品。

一且原始日志被存放到Hadoop上, 这些日志就已经准备好交给我们的MapReduce 作业处理了。

日志存储

我们的他如叩集群目前包含15个datanode(数据节点),每个节点都使用普通商用 CPU和3个500 GB的硬盘。我们对文件使用默认的复本因子3,这些文件有6个 月的存档期限,其中两个复本用于其他用途。

Hadoop的namenode(域名节点)使用的硬件和datanode相同。为了提供比较高的可 用性,我们使用两个辅助namenode和一个虚拟IP,该IP可以很容易地指向3台 机器中具有HDFS快照的硬盘。这表明在故障转移情形下,根据辅助namenode的 快照时间,我们可能会丢失最多30分钟的数据。虽然这对于我们的日志处理应用 来说是可接受的,但是其他Hadoop应用可能要求通过为namenode镜像提供共享 存储的能力来实现无损的故障转移。


日志的MapReduce模型

处理

在分布式系统中,唯一标识符令人失望的是它们极少是真正唯一的。所有的电子邮 件消息都拥有一个(所谓的)唯一标识符,叫message-id,它由消息发起的主机产 生,但是一个不良客户端能够轻松发送重复消息副本。另外,因为Postfix设计者 并不相信message-id可以唯一地标识消息,所以他们不得不提出设计一个独立的ID(标识)叫queue-id,在本地机器的生命周期内唯一。

尽管message-id趋向于成为消息的权威标识,但在Postfix日志中,需要使用queue- id来查找message-id。看例16-1第二行(为了适合页面大小,日志行的格式 做了调整),你将发现十六进制字符串lDBD21B48AE,它就是该行消息的queue- id。因为日志收集的时候(可能每隔几小时进行一次),每个消息(包括它的message-id)的信息都输出到单独的行,所以让我们的解析代码保留消息状态是必要的。

例16-1. Postfix日志行

Nov 12 17:36:54 gate8.gate.sat.mlsrvr.com postfix/smtpd[2552]: connect from hostname Nov 12 17:36:54 relay2.relay.sat.mlsrvr.com postfix/qmgr[9489]: lDBD21B48AE: from=
 
  , size=5950, nrcpt=l (queue active)Nov 12 17:36:54 relay2.relay.sat.mlsrvr.com postfix/smtpd[28085]: disconnect from hostnameNov 12 17:36:54 gate5.gate.sat.mlsrvr.com postfix/smtpd[22593]: too many errors after DATA from hostnameNov 12 17:36:54 gate5.gate.sat.mlsrvr.com postfix/smtpd[22593]: disconnect from hostnameNov 12 17:36:54 gatel0.gate.sat.mlsrvr.com postfix/smtpd[10311]: connect from hostnameNov 12 17:36:54 relay2.relay.sat.mlsrvr.com postfix/smtp[28107]: D42001B48B5: to=
  
   , relay=hostname[ip], delay=0.32, delays=0.28/0/0/0.04, dsn=2.0.0, status=sent (250 2.0.0 Ok: queued as lDBD21B48AE)Nov 12 17:36:54 gate20.gate.sat.mlsrvr.com postfix/smtpd[27168]: disconnect from hostnameNov 12 17:36:54 gate5.gate.sat.mlsrvr.com postfix/qmgr[1209]: 645965A0224: removed Nov 12 17:36:54 gate2.gate.sat.mlsrvr.com postfix/smtp[15928]: 732196384ED: to=
   
    , relay=hostname[ip], conn_use=2, delay=0.69, delays=0.04/ 0.44/0.04/0.17, dsn=2.0.0, status=sent (250 2.0.0 Ok: queued as 02E1544C005) Nov 12 17:36:54 gate2.gate.sat.mlsrvr.com postfix/qmgr[13764]: 732196384ED: removed Nov 12 17:36:54 gatel.gate.sat.mlsrvr.com postfix/smtpd[26394]: NOQUEUE: reject: RCP T from hostname 554 5.7.1 
    
     : Client host rejected: The sender's mail server is blocked; from=
     
       to=
      
        proto=ESMTP helo=
       
      
     
    
   
  
 

从MapReduce的角度看,日志的每一行是一个单独的键/值。第一步,我们需要把 所有的行和一个单独的queue-id键联系起来,然后执行reduce过程判断日志消息 值数据是否能表明这个queue-id对应的数据是完整的。

类似地,一旦我们拥有一个消息完整的queue-id,在第二步,我们需要根据 message-id对消息进行分组。我们把每个完整的queue-id和message-id 对应(Map) 起来,让它们作为键(key),而它对应的日志行作为值(value)。在Reduce阶段,我 们判断针对某个message-id的所有的queue-id是否都表明消息已经离开我们的系统。

邮件日志的MapReduce作业的两阶段处理和它们的InputFormat与OutputFormat形成了一种“分阶段事件驱动架构”(stagedevent-drivenarchilecture,SEDA)应用类型。在SEDA里,一个应用被分解为若干个“阶段”,“阶段”通过 数据队列区分。在Hadoop环境下,队列可能是MapReduce作业使用的Hdfs中的 一个输入文件夹或MapReduce作业在Map和Reduce处理步骤之间形成的隐性的 数据队列。

在图16-9中,各个阶段之间的箭头代表数据队列,虚线箭头表示隐性的 MapReduce数据队列。每个阶段都能通过这些队列发送键值对(SEDA称之为事件 或消息)给其他处理阶段。

图16-9. MapReduce链

阶段1: Map在我们的邮件日志处理作业的第一阶段,Map阶段的输入或是以行 号为键、以对应的日志消息为值的数据,或是以queue-id为键、以对应的日志消 息数组作为值的数据。当我们处理来自输入文件数据队列的源日志文件的时候,产 生第一种类型的输入,而第二种类型是一种中间格式,它用来表示一个我们已经 试图处理但因为queue-id不完整而重新进行数据排队的queue-id的状态信息。

为了能处理这两种格式的输入,我们实现了 Hadoop的lnputFormat类,它根据 FileSplit输入文件的扩展名把工作委托给底层的SequenceFileRecordReader 类或LineRecordReader类处理。这两种输入格式的文件来自HDFS中不同的输 入文件夹(数据队列)。

阶段1: Reduce在这一阶段,Reduce根据queue-id是否拥有足够的日志行来判 定它是否完整^假如queue-id已经完整,便输出以message-id作为键、以 HopWritable对象为值的数据对。否则,queue-id被设置为键,日志行数组重新列 队并和下一组原始日志进行Map处理。这个过程将持续到queue-id已经完整或操 作超时。

通过实现OutputFormat类完成输出不同的结果,这一过程对应于我们的 两个lnputFormat对象输入格式。在Hadoop API在版本r0.17.0添加 MultipleSequenceFileOutputFormat 类之前,我们已经实现 MultipleSequenceFileOutputFormat类,它们实现同样的目标:我们需要 Reduce作业的输出对根据其键的特点存储到不同的文件。

阶段2: Map在邮件日志处理作业的第二个步骤,输入是从上个阶段得到的数 据,它是以message-id1为键、以HopWritable类对象数据为值的数据对。这一步 骤并不包含任何逻辑处理:而是使用标准的SequenceFilelnputFormat类和 IdentityMapper类简单地合并来自第一阶段的输入数据。

对于输出,我们又一次使用MultiSequenceFileOutputFormat类对象。如果 reducer判定对于某个message-id的所有queue-id能够创建一条完整的消息路径, 消息就会被序列化,并排队等候SolrOutputFormat类的处理。否则,消息的 HopWritable对象会被列入阶段2: Map阶段,然后使用下一批queue-id等待重 新处理。

合并相近词搜索

在一系列的MapReduce阶段完成之后,一系列不同计算机会得知新的索引的信 息,进而可以进行索引合并。这些搜索节点它们还运行Apache Tomcat和Solr来 托管已经完成的索引信息,这些捜索节点不仅具有把索引合并置于本地磁盘的服务 (见图16.8步骤D),它们还运行Apache Tomeate和Solr来托管已完成的索引 信息。

来自SolrOutputFormat类的每个压缩文件都是一个完整的Lucene索引,Lucene 提供IndexWriter.addIndexes()方法支持快速合并多个索引。我们的 MergeAgent服务把每个新索引解压到Lucene RAMDirectory或FSDirectory(根 据文件的大小),把它们合并到本地硬盘,然后发送一个<commit/>请求给Solr实 例,后者负责提供索引服务并使更新后的索引能够用于查询处理。

切片Query/Management(查询/管理)API是一个PHP代码层,它主要是处理输出 索引在所有搜索节点上的“切片”(Sharding)。我们使用一个简单的“一致性哈 希” 来判定搜索节点和索引文件之间的对应关系。目前,索引 首先按照创建时间切片,然后再根据其文件名的哈希值切片,但是我们计划将来用 对发送地址的哈希值来取代对文件名的哈希值(见阶段2: Reduce)。

因为HDFS已经处理了 Lucene索引的复制问题,所以没有必要在Solr实例中保留 多个副本。相反,在故障转移时,相应的搜索节点会被完全删除,然后由其他节点 负责合并索引。

搜索结果使用这个系统,从产生日志到获得捜索结果供客服团队使用,我们获得 了 15分钟的周转时间。

我们的捜索API支持Lucene的全部查询语法,因此我们常常可以看到下面这样的 复杂查询:

sender:"mapreduce@rackspace.com" - recipient:"hadoop@rackspace.com"recipient:"@rackspace.com" short-status:deferred timestamp:[1228140900 TO 2145916799]

查询返回的每个结果都是一个完整的序列化消息路径,它表明了各个服务器和接收 者收是否收到了这个消息。现在我们把这个路径用一个20图展示出来(图16-10), 用户可以通过扩展自己感兴趣的节点来和这个图互动,但是在这个数据的可视化方 面还有很多需要改进的地方。

图16-10.数据树 为分析进行存档

除了为客服提供简短词语的搜索功能之外,我们也对日志数据的分析感兴趣。毎晚,我们运行一系列的MapReduce作业,它们的输入是白天产生的索引数据。 我们实现了 SolrInputFormat类对象,它可以拖回并解压索引,然后用对的形式 输出毎个文档。使用这种lnputFormat类,我们可以遍历一天产生的所有消息路 径,可以回答我们邮件系统的几乎任何问题,包括:

毎个域的数据(病毒程序,垃圾邮件,连接状况,收件人)
最有效的垃圾邮件规则
特定用户产生的负载
消息量反弹的原因
连接的地理分布信息
特定机器之间的平均时间延迟

因为在Hadoop上,我们拥有好几个月的压缩索引信息,所以还能够回顾性地回答 夜间日志概要工作忽略的问题。例如,我们近期想确定每个月消息发送量最大的IP地址,这个任务我们可以通过一个简单的一次性MapReduce作业来完成。

关注微信获取最新动态