Hadoop 和 Hive$ Facebook 中的应用
概要介绍
Hadoop可以用于构造核心的后台批处理以及近似实时计算的基础架构。它也可用 于保存和存档大规模数据集。在下面这个实例中,我们将主要考察后台的数据架构 以及Hadoop在其中充当的角色。我们将描述假想的Hadoop配置,使用Hive的可能性一一Hive是建立于Hadoop基础上的数据仓库和SQL体系结构的开源代码一一和使用该体系架构构建的各种各样的商业及产品应用。
Hadoop 在 Facebook 的使用
发展史
随着Facebook网站的使用量增加,网站上需要处理和存储的日志和维度数据激 增。在这种环境下对任何一种数据处理平台的一个关键性要求是它必须具有快速的 支持系统扩展的应变能力。此外,由于工程资源有限,所以系统必须是可信的,并 且易于使用和维护。
最初,Facebook使用的数据仓库都是在Oracle系统上实现的。在我们遇到可扩展 性和性能方面的问题之后,开始调查是否有开源技术能够应用到我们的环境中。这 次调查工作的一部分内容就是我们部署了一个相对小规模的Hadoop实例对象,并 且把一部分核心数据集发布到这个实例对象上。Hadoop对我们来说有相当大的吸 引力,一是因为Yahoo!内部就一直使用这一技术来完成后台数据处理需求,二是 因为我们熟知Google提出并普及使用的MapReduce模型的简单性和可扩展性。
我们最初的原型系统开发得非常成功:工程师们都喜欢它能在合理的时间范围内处 理大数量级数据的能力,这是我们以前所没有的处理能力。能用自己熟悉的编程语 言来进行数据处理工作(使用Hadoop Streaming),他们也感到非常高兴。把我们的 核心数据集发布到一个集中式数据仓库也非常方便。几乎同时,我们开始开发Hive工具。这使用户在Hadoop集群上处理数据变得更加容易,因为普通的计算需 求都能用大多数程序员和分析师们熟悉的SQL语句来表达。
因此,集群的规模和使用迅速增长,现在Facebook正在运行世界第二大Hadoop 集群系统。在写这篇文章的时候,我们在Hadoop上存放的数据超过了2PB,每天 给它加载的数据超过10 TB。我们的Hadoop系统具有2400个内核,大约9 TB的 内存,并且在一天之中的很多时间点,这些硬件设备都是满负荷运行的。根据系统 的增长情况,我们能够迅速地进行集群规模扩展,而且我们已经能够利用开放资源 的优点,通过修改Hadoop代码让它适应我们的需求。同时我们也对开放资源做出 了贡献,比如我们开发的一些Hadoop核心组件,我们提供的Hive的开放资源代 码,Hive现在是Hadoop的一个子项目。
使用情况
在Facebook,对Hadocjp至少有下面四种相互关联但又不同的用法。
在大规模数据上产生以天和小时为单位的概要信息。这些概要信息在公司内用 于各种不同的目的:
基于这些概要信息产生的报告,可供工程或非工程职能组用来制定产品决策。概要信息包含用户数、网页浏览次数和网站访问时间的增长情况
提供在Facebook上进行广告营销活动的相关的效果数据
对网站属性的后台处理,比如计算你喜欢的人和应用程序
在历史数据上运行即时作业。数据分析结果有助于产品组和执行主管解决 问题。
成为我们日志数据集的实用而长期的存档存储器。
通过特定的属性进行日志事件查询(用这些属性对日志建立索引),这可以用于 维护网站的完整性并且保护用户免受垃圾邮件程序的侵扰。
数据架构
图16-4展示了我们数据架构的基本组件以及这些组件间的数据流。
图16-4.Facebook的数据仓库架构
如图16-4所示,数据处理过程中使用了以下组件。
Hadoop配置
Hadoop部署工作的中心思想是一体性。我们使用一个单独的HDFS 系统,大量的处理工作在一个单独的MapReduce集群上完成(运行一个单独的 jobtracker)。这样做的原因很简单。
只运行一个集群可以最小化管理成本。
数据不必复制。对于前面描述的使用情况,我们可以在同一个位置得到所有的 数据。所有的部门都使用同一个计算机集群可以极大地提升效率。我们的用户工作在一个相互协作的环境下,因此对于服务质量的要求还不是很 繁重(至少就目前而言)。我们也拥有一个单独的共享的Hive元数据存储工具(用MySQL数据库),它管理 HDFS上存储的所有Hive表涉及的元数据信息。
假想的使用情况
这一节,我们将描述几个典型问题,它们在大型网站上很普遍,由于涉及的开销和 规模都太大,所以这些问题很难通过传统的数据仓库管理技术来解决。Hadoop和 Hive技术对解决这些问题提供了一种扩展性更好、更有效的方法。
广告客户的洞察力和广告性能
Hadoop最普遍的一个用途是为大量数据产生概要信息。通常用于大型广告网络, 如Facebook广告网络,Google AdSense,等等,为广告商提供他们所发布的广告 的常规汇总统计信息这样一项特有的功能,这样做可以有效地帮助广告商调整他们的广告营销活动。
在大规模数据集上计算广告效果相关数据是一种数据密集型操作,Hadoop和Hive 在可扩展性和计算开销上的优势真的有助于在合理的时间和资金消耗范围内完成这 些计算。
许多广告网络为广告商提供了标准的基于CPC和CPM的广告计费单位。CPC广 告计费是根据广告的“点击数计费”(cost-per-click):广告商根据访问这个网站的 用户对广告的点击总数付费。另一方面,CPM广告计费是根据在这个网站上看这 个广告的人的比例计费。先不管这些标准计费单位,在最近几年,具有更多动态内 容的广告支持对个体用户进行不同的内容剪辑(个性化广告定制),这样一种活动也 在网络广告业中变得普遍起来。丫ahoo!通过SmartAds来实现个性化广告定制,而Facebook给广告商提供了 Social Ads。而后者允许广告商把来自用户朋友网络的信 息嵌入到广告中,例如,一则Nike广告可能指向某用户的一位朋友,而这个朋友 近期刚好也喜欢这个品牌,并且在Facebook上和朋友公开共享这个喜好。另外,Facebook也为广告商提供了 Engagement Ad广告形式,通过对广告发表意见/嵌入 视频交互,用户可以更有效地和广告交互。总之,在线广告网络为广告商们提供了 各种广告发布途径,广告商们感兴趣的是其广告营销活动的相关效果数据,而这种 多样性又为计算各种各样的效果数据增添了难度。
首先,广告商们希望知道总共有多少用户观看或点击了他们的广告以及有多少独立 用户。对于动态广告,他们甚至会对这些汇总信息的分类感兴趣,如通过广告单元 播放的动态信息分类或通过用户对广告的参与活动分类。例如,一个特定的广告可 能向3万个不同用户播放了 10万次。类似地,一段嵌入到Engagement Ad的视频 可能已经被10万个不同的用户观看。另外,通常我们会针对每则广告、每次广告 营销活动和每个帐户汇总报告这些效果相关数据。一个帐号有可能会对应多个广告 营销活动,而每个活动可能运行多则网络广告。最后,广告网络通常会根据不同时 间间隔报告这些数据。典型的时间段有天、周、月(起始日期相同)和月(固定天 数),甚至有时候是整个广告周期。再者,在数据分片和切割的方法中,广告商们 也想査看数据的地理分布情况,比如,对于某一则特定广告,亚太地区的浏览者或 点击者占多大比率。
很明显,这里有四种主要的维度层次分类:帐户、广告营销活动和广告维度;时间 段维度,交互类型维度,用户维度。最后一个用来指明独立用户的人数,而其他三 个是描述广告的相关属性。用户维度也可用来产生浏览和点击用户的地理分布汇总 图。总而言之,所有这些信息都有利于广告商们调整广告营销活动,从而提髙他们 在广告网络上的广告效果。除了这些数据流水线具有多维特性之外,从处理的数据量以及每天数据量的增长速度来看,如果没有Hadoop这样的技术,大型广告网络 的规模扩展会非常困难。举个例子,写这篇文章的时候,Facebook为了计算广告 的效果数据,每天所处理的日志数据量大约是1 丁8数量级(非压缩数据)。2008年 1月,每天处理的日志数据量大约是30 GB,那么目前这个数据量已经增长了 30 倍。随着硬件的增加,Hadoop扩展性增强的特性是使这些数据流能以最小的任务 配置修改来适应数据的增长。通常,配置修改涉及增加数据流上进行密集型计算部 分Hadoop作业模块的reducer的个数。目前,这些计算模块中最大的部分运行400 个reducer(比2008年1月所用的50个reducer增加了 8倍)。
即时分析和产品反馈
除了产生定期报告之外,数据仓库解决方案的另一种主要应用是能够支持即时分析 和产品反馈解决方案。例如,一个典型的网站对其产品做了修改,产品经理或工程 师们通常会基于与这个新特性相关的用户交互信息和点击率来推断这个新特性的影 响。产品团队甚至希望对这个改变带来的影响做更深入的分析,这个分析有可能针 对不同区域和国家进行,例如这个改变是否使美国用户的点击率增加或印度用户的 使用减少。使用了 Hive和普通的SQL数据库之后,在Hadoop上可以完成很多类 似的分析工作。点击率的测定方法可以简单地表达成广告的曝光数和与此新特性相 关链接点击次数之间的联系。这种数据能和地理位置信息结合起来用于计算产品的 改变对不同区域用户产生的影响。因此,通过对这些数据进行聚集运算,我们可以 得到平均点击率在不同地理区域上的分布。Hive系统用几行SQL查询语句就可以 简单方便地表达所有这些工作需求(这也将相应地产生多个Hadoop作业)。如果只 需要估算,可以使用Hive本身支持的取样函数取一组样本用户数据,然后运行同 样的查询语句。其中有些分析工作需要使用自定义map和reduce脚本与Hive SQL 联合执行,这种脚本也可以轻松嵌入到Hive查询语句。
一个更加复杂的分析工作的典型例子是估算在过去一整年里每分钟登录到网站的峰 值用户数。这个工作涉及对网页浏览日志文件采样(因为人气网站的网页浏览日志 文件总数是很庞大的),根据时间对它们分组,然后运行自定义reduce脚本找出不 同时间点的新用户数。这是一个要求同时使用SQL和MapReduce来解决终端用户 问题的典型例子,很容易利用Hive来解决这样的问题。
数据分析
Hive和Hadoop可以轻松用于为数据分析应用进行训练和打分工作。这些数据分析 应用能跨度不同领域,如人气网站、生物信息公司和原油勘探公司。对于在线广告 网络产业来说,这种应用的一个典型实例是预测什么样的广告特征能使广告更容易 被用户注意。通常,训练阶段涉及确定响应度量标准和预测性的特征。在本例中, 评测广告效用的一个良好度量标准可以是点击率。广告的一些有趣的特征可能是广 告所属的垂直产业、广告内容、广告在网页中的位置等。Hive可以简便易行地收 集训练数据,然后把它们输送到数据分析引擎(通常是R程序或MapReduce应 用)。在本例中,不同的广告效果数据和属性特征可以被结构化为拓¥6的表格。用 户可以方便地对数据进行取样^程序只能处理有限数据集,因此取样是必须的), 使用Hive查询语句执行适合的聚集和连接操作然后整合成一个响应表,它包含着 决定广告效用最重要的广告特征。然而,取样会有信息损失,有些更加重要的数据 分析应用就在MapReduce框架体系之上并行实现流行的数据分析内核程序来减少 信息损失。
一旦模型训练出来,就可以部署,用于根据每天的数据进行打分评估工作。但是大 多数数据分析任务并不执行毎日评测打分工作。实际上,其中有很多数据分析任务 具有即时的性质,要求做到一次性分析,然后结果作为输入进入产品设计过程。
Hive
概述
刚开始使用Hadoop时,我们很快就倾倒于它的可扩展性和有效性。然而,我们祖 心它是否可以被广泛采用,主要是因为用Java写MapReduce程序的复杂度问题(还 有培训用户写这种程序的代价)。我们知道很多公司的工程师和分析师很了解 SQL,它是一种查询和分析数据的工具,并且我们也清楚很多人都精通几门脚本语 言,如PHP和Python。因此,我们必须开发出一种软件来解决用户精通的脚本语 言和Hadoop编程所需语言不同的问题。
很明显,我们的很多数据集是结构化的,而且能够很容易进行数据分割。这些要求 很自然地形成一个结果:我们需要一个系统,它可以把数据模型化成表格和数据 块,并且它能够提供类似SQL的查询和分析语言。另外,能把使用用户所选编程 语言编写的自定义MapReduce程序嵌入查询这一能力也非常重要。这个系统就是 Hive。Hive是一个构建于Hadoop之上的数据仓库架构,在Facebook充当着重要的工具,用于对Hadoop中存储的数据进行査询。在下面几个小节,我们将详细描 述这一系统。
数据的组织
在所有数据集中,数据的组织形式是一致的,被压缩、分区和排序之后进行存储。
压缩——几乎所有数据集都采用gzip codec存储成顺序文件。旧的数据采用bzip重新 压缩,这样可以比用gzip编码压缩更多。bzip压缩的速度比gzip慢,但是对 旧数据的访问频率要低很多,因此考虑到节省硬盘空间,这种性能损失还是很 值得的。
分区——大部分数据集是根据日期进行分区的。独立的分区块被加载到Hive系统,PX 把毎个分区块加载到HDFS的一个单独的目录下。大多数情况下,这种分区 只根据相关联的记录日志文件(scribe logfile)的时间戳进行。然而,在某些情 况下,我们扫描数据,然后基于日志条目里能找到时间戳进行数据划分。回顾 前面的介绍,我们也将根据各种特征进行数据分区(如国家和日期)。
排序——在一张表里,毎个分区块通常根据某个唯一标识(ID)进行排序(如果ID存在的 话)。这样的设计有几个主要的优点:在这样的数据集上容易执行取样查询操作;我们能基于排序数据建立索引;对具有唯一标识的数据进行聚集和连接,运算更有效。
毎日的MapReduce作业会把数据加载成long-term数据格式(和近实时的数据导入 处理不同)。
查询语言
Hive查询语言和SQL类似。它具有传统的SQL的构造,如join, group by, where, select, from从句和from从句的子查询。它尽量把SQL命令转换成一 系列的MapReduce作业。除普通的SQL从句之外,它还有一些扩展功能,如具有 在查询语句中描述自定义mapper和reducer脚本的功能,有对数据进行一次扫描 就可以把它们插入多个表、数据块、HDFS和本地文件的功能,有在样本数据而不 是全部数据集上执行查询的功能(在测试查询的时候,这个功能非常有用)。Hive metastore代存储了表的元数据信息,它提供元数据给出代编译器,从而进行SQL命 令到MapReduce作业的转换。
通过数据块修剪,map端的聚合和其他一些特色功能,编译器会尝试创建可以优化 査询运行时间的方案。
在数据流水线中使用Hive
另外,Hive提供了在SQL语句里表达数据流水线的能力,并采用简单方便的方式 合并这些数据流,这一功能能够并且已经提供了大量的所需的灵活性。这一功能对 于改进中或开发中的系统和产品尤其有用。处理数据流时所需的许多操作都是大家 非常了解的 SQL 操作,如 join, group by 和 distinct aggregation。由于 Hive能够把SQL语句转换成一系列Hadoop MapReduce作业,所以创建和维护这 些数据流水线就非常容易。在这一节,我们用一个假想的广告网络的例子,通过展 示使用出76来计算广告商所需的某些典型汇总表来说明出代这些方面的功能。例 如,假设某个在线广告网络在Hive系统里把广告信息存储在名为dim_ads的表 里,把和某个广告曝光相关的信息存储在名为impression_logs表里, impression_logs表里数据根据日期进行分区,那么在Hive系统中査询2008-12-01这天的广告曝光数(广告网络会把广告营销中的每个和总的广告曝光数定期反馈 给广告商)可以表达为如下SQL语句:
SELECT a.campaign_id, count(1), count(DISTINCT b.user_id)FROM dim_ads a ]OIN impression_logs b ON(b.ad_id = a.ad_id)WHERE b.dateid = '2008-12-01'GROUP BY a.campaign_id;
这也是大家能在其他RDMS(关系数据库系统)如Oracle和DB2等上使用的典型的SQL语句。
为了从前面同样的连接数据上以广告和帐户为单位计算每天的广告曝光次数,Hive 提供了同时做多个group by操作的能力,查询如下所示(类似SQL语句但不是严 格意思上的SQL):
FROM(SELECT a.ad_id, a.campaign_id, a.account_id, b.user_id FROM dim_ads a JOIN impression_logs b ON (b.ad_id = a.ad_id)WHERE b.dateid = '2008-12-01') x INSERT OVERWRITE DIRECTORY 'results_gby_adid'SELECT x.ad_id, count(l), count(DISTINCT x.user_id) GROUP BY x.ad_id INSERT OVERWRITE DIRECTORY 'results_gby_campaignid'SELECT x.campaign_id, count(l), count(DISTINCT x.user_id) GROUP BY x.campaign_id INSERT OVERWRITE DIRECTORY * results_gby_accountid'SELECT x.account_id, count(l), count(DISTINCT x.user_id) GROUP BY x.account_id;
在Hive增添的一项优化功能中,其中一项是查询能被转换成一系列适用于“偏斜 数据”(skewed data)的 Hadoop MapReduce作业。实际上,join操作转换成一个 MapReduce作业,三个group by操作转换成四个MapReduce任务,其中第一个任务通过unique_id产生部分聚集数据。这一功能非常重要,因为 impression_logs表的数据在unique_id的分布比在ad_id上的分布更均匀(通 常在一个广告网络中,有些广告占主导地位,因为其客户分布更均匀)。因此,通 过unique_id计算部分聚集能让数据流水线把工作更均匀地分配到各个reducer。 简单改变查询中的日期谓词,同一个相同的査询模板便可以用于计算不同时间段的 效果数据。
但是计算整个广告周期的数据可以采用更好的方法,如果使用前面介绍的计算策略,我们必须扫描impression_logs表中的所有分区。因此,为了计算整个广告周期的数据,一个更可行的方法是在每天的中间表的分区上执行根据ad_id和unique_id的分组操作。这张表上的数据可以和次日的impression_logs合并增量产生整个周期的广告效果数据。例如,要想得到2008-12-01日的广告曝光数据,就需要用到2008-11-30日对应的中间表分区数据块。如下面的Hive查询语句所示:
INSERT OVERWRITE lifetime_partial_imps PARTITION(dateid-'2008-12-01')SELECT x.ad_idj x.user_id, sum(x_cnt)FROM (SELECT a.ad_id, a.user_idj a.cnt FROM lifetime_partial_imps a WHERE a.dateid = '2008-11-30'UNION ALLSELECT b.ad_id, b.user_id, 1 as cntFROM impression_log bWHERE b.dateid = '2008-12-01')xGROUP BY x.ad_id, x.user_id;
这个査询为2008-12-01计算局部合计数据,它可以用来计算2008-12-01的数据以 及2008-12-02的数据(这里没有展示)。SQL语句转换成一个单独Hadoop MapReduce作业,它实际上是在合并的输入流上做group by计算。在这个SQL 语句之后,可以做如下的Hive查询,它为每个分组计算出实际的数据(与前面对 日数据流水线的查询相似)。
FROM(SELECT a.ad_id, a.campaign_id, a.account_id, b.user_id, b.cnt FROM dim_ads a JOIN lifetime_partial_imps b ON (b.ad_id = a.ad_id)WHERE b.dateid = '2008-12-01’) x INSERT OVERWRITE DIRECTORY 'results_gby_adid'SELECT x.ad_id, sum(x.cnt), count(DISTINCT x.user_id) GROUP BY x.ad_id INSERT OVERWRITE DIRECTORY 'results_gby_campaignid'SELECT x.campaign_id, sum(x.cnt), count(DISTINCT x.user_id) GROUP BY x.campaign_id INSERT OVERWRITE DIRECTORY 'rGSults_gby_accountid'SELECT x.account—id, sum(x.cnt), count(DISTINCT x.user—id) GROUP BY x.account—id;
Hive和Hadoop都是批处理系统,它们计算数据的延迟超出了常用的RDBMS,如 Oracle和MySQL。因此,在许多情况下,把Hive和Hadoop系统产生的概要信息 加载到传统的RDBMS,让用户通过不同的Bl(商业智能)工具或网络门户来使用这 些数据仍然很有用。
存在的问题与未来工作计划
公平共享
Hadoop集群通常同时运行多个“日常生产作业” (production daily job)和 “即时作 业”(ad hoc job),日常生产作业需要在某个时间段内完成计算任务,而即时作业则 可能具有不同的优先级以及不同的计算规模。在选择典型安装时,日常生产作业倾 向于整夜运行,这时来自用户运行的即时作业的干扰最小。然而,大规模即时作业 和生产作业之间的工作时间重合常常是不可避免的,如果没有充分健壮的保障措 施,这种作业重合会导致生产作业的延迟。£1^(数据提取、转换和加载)处理也包 含几个近实时的作业,它们都必须以小时为间隔地运行(包括从NFS服务器复制 Scribe数据以及在某些数据集上以小时为单位的概要数据计算等处理)。它也意味 着只要有一个意外作业就会使整个集群当机,使生产处理处于危险境地。
Facebook开发并且贡献给Hadoop系统的Hadoop公平共享作业调度器加匕 8(;化如1灯)为许多这样的问题提供了解决方案。它为特定作业池中的作业保留保障 性计算资源,同时让闲置资源可以被任何作业使用。通过在各个池之间以一种公平 手段分配计算资源也可以防止大规模作业霸占集群资源。在集群里,内存是其中一 种竞争比较激烈的资源。我们对Hadoop系统做了一些修改,如果发现jobtracker 的内存短缺,Hadoop就会减缓或遏制作业提交。这能保证用户进程能够得到合理 的进程内存限额,并且为了阻止在同一个节点运行的MapReduce作业影响HDFS 后台程序(主要是因为大内存消耗),可以放置一些监控脚本程序。日志目录存储在 单独的硬盘分区,并且定期清理,我们认为把MapReduce中间存储放在单独的硬 盘分区上也是有用的。
空间管理
硬盘容量管理仍然是一个大挑战——数据的增长带来了硬盘使用的急速增加。数据 集日益攀升的许多发展中公司面临着同样的问题。在许多情况下,很多数据实际上 是临时数据。这种情况下,我们可以使用出^的保留期设置,并且可以以bzip格 式重新压缩以节省空间。尽管从硬盘存储的观点来看配置可能非常对称,但增加一 个高存储密度机器层来管理旧数据可能会有很大好处。这将使Hadoop存储存档数 据的消费变得更便宜。然而,对这些数据的访问应该是容易的。目前我们正在为这 一个数据存档层的实现而努力工作,统一旧数据处理的方方面面。
Scribe-HDFS 集成
目前,Scribe编写了几个NFS文件存档器(filer),然后前面所描述的自定义复制作业(copier job)就从这里收集和传送数据给HDFS。我们正致力于让Scribe直接把数 据写入其他的FS实例对象。这将简化Scribe的扩展和管理。基于对Scribe的正常 运行时间的髙要求它的目标HDFS对象可能不同于生产HDFS系统(因此它不会因 为用户作业而出现负载/停机的问题)。
改进Hive
Hive系统仍然处于活跃的开发阶段。人们关注着几个重要特性的开发,如order by,支持having从句,更多聚集函数,更多内置函数,日期类型,数据类型,等 等。同时,我们也在进行大量优化工作,如谓词下推和共同子表达式消除。在集成 方面,正在开发JDBC和ODBC驱动程序用于和OLAP及BI工具集成。通过所有 这些优化措施,我们希望能够释放MaPRedUCe和Hadoop的潜能,把它更进一步推 向非工程化社区以及用于Facebook。