Hadoop教程

使用Pig和Wukong来探索10亿数量级边的网络图

超大规模的网络是非常有魅力的。它们所能建模的东西是非常普遍的:假如你有一 堆东西(我们称它们是节点,node),它们是相关联的(边,edge),并且假如节点和 边(node/edge元数据)能叙述一个故事的话,你就能得到一个网络图。

我以前做过一个称为Infochirnps的项目,这是一个发现、共享或出售数据集的全 球性网站。在Infochimps网站,我们有很多技术可以应用于加入我们项目数据集 的任何有趣的网络图。我们主要使用Pig(见第11章)和Wukong (http:github.com/mrflip/wutong),这是我们用Rubby语言开发的处理Hadoop流数据 的工具箱。这些工具,我们便可以用简单的脚本语言(如下面给出的例子一样)——基 本上所有这些脚本都不超过一页——来处理terabyte(千兆,TB)量级的图数据。在infochirnps.org上査询“network”得到以下几个数据集。

社交网络,如Twitter或Facebook。我们客观地把人模型化为节点,把关系 (@mrflip和@tom_e_white是朋友)或行为(@infochirnps提到了@hadoop)模型化 为边。用户已发送的清息数和所有这些消息的词集便是节点元数据的各个重要 信息片段。

链接的文档集(如维基百科或整个网络数据集)。毎个页面是一个节点(把标 题、浏览次数和网页类别作为节点元数据)。每个超链接是一条边,用户从一 个页面点击进入另一个网页的频率作为边的元数据。

c.elegans线虫研究项目中的神经元(节点)和突触(边)的联系。

髙速公路地图,出口是节点,髙速公路的分段是边。Open Street Map项目的 数据集是拥有全球性覆盖的地点名称(节点元数据),街道编号范围(边的元数据) 及更多其他信息。

或是一些不易发现的隐秘的图,假如你能用一个有趣的系统来做分析的话,这 个网络图就会很清晰。浏览几百万条Twitter消息,为同一条消息中出现的每 对非键盘字符产生一条边。简单地通过观察“often, when humans use最,they also use近”这句话,你就能重建人类语言地图(参见图16-23)。

图16-23.Twitter语言地图

这些有机相连的网络图让人惊讶的地方是,如果有足够的数据,一系列功能强大的 工具软件通常就能够使用这种网络结构来揭示出更深刻的知识。例如,我们可以使 用同一种算法的各种变体来做下面各个任务:

对维基百科链接文档集,找出最重要的网页。Google使用这个算法的更加精 良的改进版来确定排序靠前的搜索结果。

确定Twitter社区图中的名人和专家。如果用户的跟随者人数比用户“trstrank"(一种排序值)推算出的数高出很多,就说明他们往往就是垃圾制 造者。

通过收集5年以上的几百万个匿名考试分数来预测某个学校在学生教育问题上 的影响力。

测量社区

在Infochimps集合中,最有趣的网络是对Twitter社区图进行大规模爬取所得到的 网络。它有多达9千万个节点和20亿条边,这个图对于帮助我们理解人们的谈话 和掌握他们之间的关系来说是一个非常了不起的工具。下面利用三种方法来总结用 户社区的特征,使用的是“谈论Infochirnps或Hadoop的用户”子图:

  • 和他们一起讨论的用户(@reply图)是谁?
  • 他们是否与参与问题讨论的人者互换了意见(对称链接)?
  • 在该用户的社区里,有多少用户彼此相关(聚类因子)?

  • 每个人都在和我说话:Twitter回复关系图

    Twitter允许你回复其他人的消息,从而参与谈论。因为这是一种明显的公众行 为,所以回复就代表一种强的“社会性标记”(social token):它表明对别人谈论事 情感兴趣,并表明这种兴趣值得转播。

    处理过程的第一步是用Wukong完成的,Wukong是面向Hadoop的Ruby程序库。 它能让我们编写出处理多个TB级数据流的小而灵活的程序。以下代码片断取自一 个用于表示twitter消息(或tweet)的类:

    class Tweet < Struct.new(:tweet_id, :screen_name, :created_at,                         :reply_tweet_id, :reply_screen_name, :text) def initialize(raw_tweet)#... gory details of parsing raw tweet omitted endTweet is a reply if there's something in the reply_tweet_id slot def is_reply?not Feply_tweet_id.blank? true end

    Twitter的Stream API可以让大家轻松得到千兆字节的消息。它们是原始的JSON 格式数据:

    {"text":"Dust finished the final draft for Hadoop: the Definitive Guide!","screen_name":"tom_e_white","reply_screen_name":null, "id":3239897342,"reply_tweet_id":null,...}{"text":"@tom_e_white Can't wait to get a copy!","screen_name": "nirflip*',"reply_screen_name: "tom_e_white","id":3239873453,,,reply_tweet_idM : 3239897342,...}{"text":"@josephkelly great job on the #InfoChimps API.Remind me to tell you aoout the time a baboon broke into our house.","screen_name": "wattsteve", "reply_screen_name": "josephkelly","id":16434069252,...}{Mtext":"@mza Re: http://j.mp/atbroxmr Check out @James_Rubino's http://bit.ly/clusterfork ? Lots of good hadoop refs there too", "screen_name":"mrflip","reply_s c reen_name": "@mza","id":7809927173,...}{"text":"@tlipcon divide lots ofdata into little parts. Magic software gnomes fix up the parts, elves then assemble those into whole things #hadoop", "screen_name": '*nealrichter*'/Yeply_screen_name": "tlipcon","id":4491069515,...}

    reply_screen_name和reply_tweet_id让你能跟随整个交流过程(否则正如你看 到的,这两个值被设置为null)。我们找到每个回复,并且输出相应的用户ID,然 后形成一条边:

    class ReplyGraphMapper < LineStreamer def process(raw_tweet) tweet = Tweet.new(raw_tweet) if tweet.is_reply?emit [tweet.screen_name, tweet.reply_screen_name] end end end

    mapper从LineStreamer类派生出来,LineStreamer类把每一行作为一个单独 记录提供给process方法。我们只需定义process方法,其余的工作由Wukong 和Hadoop完成。这个案例里,我们使用原始JSON格式的记录来创建tweet对 象。遇到用户A回复用户B的地方,就输出一条边,记作用制表符分割的A和 B。原始输出数据如下所示:

    % reply_graph_mapper --run naw_tweets.json a_replies_b.tsv mrflip tom_e_whitewattsteve     josephkellymrflip	      mzanealrichter tlipcon

    这条边读作“a回复b”,并且我们把这个关系翻译成一条有向“出”边: @wattsteve向勒@josephkelly传了社会资本。

    边对(edge)与邻接表(list)

    上述网络是采用“边对”(edge pair)的方式来表示网络方法。它很简单,并且对入(in) 和出(out)边来说,它们有同样的起始点,但是这样会引入一些重复数据。从节点的 角度来看,把信息都集中到链接源节点可以表达相同的信息(并节省一些磁盘空 间)。我们把这个称作“邻接列表”(adjacency list),它能用Pig工具通过一个简单的 GROUP BY操作产生。加载数据文件:

    a_replies_b = LOAD 'a_replies_b.tsv' AS (src:chararray, dest:chararray);

    在源节点上进行分组,我们可以找到从每个节点出来的边:

    replies_out = GROUP a_replies_b BY src;DUMP replies_out(cutting,{(tom_e_white)})(josephkelly,{(wattsteve)})(mikeolson,{(LusciousPear), (kevinweil), (LusciousPear), (tlipcon)})(mndoci,{(mrflip),(peteskomoroch),(LusciousPear),(mrflip)})(mrflip,{(LusciousPear),(mndoci),(mndoci),(esammer),(ogrisel),(esammer),(watt steve)})(peteskomoroch,{(CMastication),(esammer),(DataDunkie),(mndoci),(nealrichter),(tlipcon,{(LusciousPear),(LusciousPear),(nealrichter),(mrflip),(kevinweil)}) (tom_e_white,{(mrflip),(lenbust)})

    度(degree)

    对影响力,一种简单而有用的度量就是一个用户收到的回帖数。用图的术语来说, 是度(degree)(因为这是一个有向图,所以入度(in-degree)尤其重要。Pig的嵌套FOREACH语法能使我们在一次数据扫描之后,计算参与进来的不同的回 帖者数、(邻居节点)以及回帧:

    a_replies_b = LOAD 'a_replies_b.tsv' AS (src:chararrayj dest:chararray); replies_in = GROUP a_replies_b BY dest; -- group on dest to get in-links replies_in_degree = FOREACH replies_in { nbrs = DISTINCT a_replies_b.src;GENERATE group, COUNT(nbrs), COUNT(a_replies_b);};DUMP replies_in_degreefcutting,1L,1L)(josephkelly,1L,1L)(mikeolson,3L,4L)(mndoci,3L,4L)(mrflip,5L,9L)(peteskomoroch,9L,18L)(tlipcon,4L,8L)(tom_e_white,2L,2L)

    在这个示例里,@peteskomoroch有9个邻居节点和18个回帖,远远多于其他大 多数节点的数据。社交网络中度的大小通常存在很大的区别。大多数用户都只有少 数几个回帖,但是少数的名人——如@THE_REAL_SHAQ(篮球明星Shaquille O’Neill) 或@sockington(一只虚构的猫)一能收到上百万的回帖。相比之下,公路地图上几 乎每个交叉点都是十字形的。由于度的巨大偏差而产生的偏斜数据流对如何处理 这样的图数据有很大的影响——后面会有更多介绍。


    对称链接

    有几百万人在twitter上给@THE_REAL_SHAQ回帖声援支持的时候,他不回复这几 百万者是可以理解的。如图所示,我经常和@咖此6交流,让我们之间的边是 “对称链接”(symmetric link)。这精确地反映了我和@mndoci有更多共同兴趣(相 较于@THE_REAL_SHAQ)。

    找到对称链接的一个方法是获取那些同时出现在A Replied To B(A回帖给B)边 集合和A Replied By B(B回帖给A)的边集合的边。我们能通过内部“自连接” (inner self-jion)的操作来实现交集操作,以此来发现对称链接:

    a_repl_to_b = LOAD 'a_replies_b.tsv' AS (user_a:chararray, user_b:chararray); a_repl_by_b = LOAD 'a_replies_b.tsv' AS (user_b:chararray, user_a:chararray); -- symmetric edges appear in both sets a_symm_b_j = JOIN a_repl_to_b BY (user_a, user_b),                   a_repl_by_b BY (user_a, user_b);... 

    但是,这个过程结束之后,它将发送两个完全的边-对列表给reduce阶段,这要求系统提供双倍内存。如果从一个节点的角度来看,能看出一个对称链接等同于一对边的话:一个出一个进,那么我们能做得更好。按照升序把节点排放在第一个存储槽内,我们可以得到这个无向图一一但是我们把链接的方向保存为边的一种元数据:

    a_replies_b = LOAD 'a_replies_b.tsv' AS (src:chararray, dest:chararray); a:b_rels_= FOREACH a_replies_b GENERATE ((src <= dest) ? src : dest) AS user_a,((src >= dest) ? dest : src) AS user_b,((src <= dest) ? 1 : 0) AS a_re_b:int((src <= dest) ? 0 : 1) AS b_re_a: intDUMP a_b_rels(mrflip,tom_e_white,1,0)(josephkelly,wattsteve,0,1)(mrflip,mza,1,0)(nealrichter,tlipcon,0,1)

    现在我们收集每对节点间的所有边。一个对称边在每个方向至少有一个回帖:

    a_b_rels_g = GROUP a_b_rels BY (user_a, user_b);a_symm_b_all = FOREACH a_b_rels_g GENERATEgroup.user_a AS user_a,group.user_b AS user_b,(((SUM(a,b_rels.a_re_b) > 0) AND(SUM(a_b_rels.b_re_a) > 0) ) ? 1 : 0) AS is_symmetric:int;DUMP a_symm_b_all(mrflip,tom_e_white,1)(mrflip,mza,0)(josephkelly,wattsteve,0)(nealrichter,tlipcon,1)...a_symm_b = FILTER a_symm_b_all BY (is_symmetric == 1);STORE a_symm_b INTO 'a_symm_b.tsv';

    这里有一部分输出,显示@mrflip和@tom_e_white之间存在一个对称链接:

    (mrflip,tom_e_white,1)(nealrichter,tlipcon,1)

    社区提取

    到目前为止,我们已经提供了节点度进(入度)和边度里的方法(对称链接判定)。让 我们进一步看看如何度设邻居关系:一个指定用户的朋友中有多少人彼此之间是朋 友?同时,我们将产生一个边集来实现前一个例子那样的可视化展示。

    获取邻居

    选择一个种子节点(这里是@hadoop)。首先,“收集”(round up)种子节点的邻居 节点:

    a_replies_b = LOAD 'a_replies_b.tsv' AS (src:chararray, dest:chararray);-- Extract edges that originate or terminate on the seed n0_edges = FILTER a_replies_b BY (src == 'hadoop') OR (dest == 'hadoop');-- Choose the node in each pair that *isn't* our seed: n1_nodes_all = FOREACH n0_edges GENERATE((src == 'hadoop') ? dest : src) AS screen_name; n1_nodes = DISTINCT nl_nodes_all;DUMP n1_nodes

    现在我们把这个邻居集和开始节点集进行交集处理,从而找出所有始于n1_nodes 集合的边:

    n1_edges_out_j = J0IN a_replies_b BY src,n1_nodes BY screen_name USING 'replicated'; n1_edges_out = FOREACH n1_edges_out_j GENERATE src, dest;

    我们得到的图的复本数据(超过10亿条边)仍然太大而不能搬入内存。但另一方 面,一个单独用户的邻居人数很少会超过百万,所以它可以轻松地读入内存。在 JOIN操作里包含USING'replicated'是用来指导Pig做一个map端的连接操作(也称 作fragment replicate join,片断复制连接)。Pig把n1_nodes关系读入内存当作一个 査找表,然后把整个边集的数据连续地载入内存。只要连接条件满足——src在n1_nodes査询表中 它就产生输出。没有reduce步骤意味着速度得以显著提升。

    为了只留下源和目标节点都是种子节点的邻居的边,重复执行如下连接操作:

    n1_edges_j : J0IN n1_edges_out BY dest,n1_nodes BY screen_name USING * replicated'; n1_edges = FOREACH n1_edges_j GENERATE src, dest;DUMP n1_edges(mrflip,tom_e_white)(mrflip,mza)(wattsteve,josephkelly)(nealrichter,tlipcon)(bradfordcross,lusciouspear)(mrflip,jeromatron)(mndoci,mrflip)(nealrichter,datajunkie)

    社区度量标准和1百万×1百万数量级的问题

    把@hadoop,@cloudera和@infochimps作为种子节点,我把类似的脚本应用到 20亿条消息集中来创建图16-24。

    你可以看到,这种大数据社区的关联度是很高的。名人(如@THE_REAL_SHAQ)的链 接邻居更稀疏。我们可以用“集聚系数”(clustering 00#«<^如)来表示这样的关 系,定义为:实际的n1_edges和可能的最大数目的n1_edges的比值。值的范围 是从0(邻居节点互不关联)到1(邻居节点两两互相关联)。集聚系数值高,表明它是 一个凝聚性高的社区。集聚系数值较低的表明社区节点的兴趣很分散(如 @THE_REAL_SHAQ节点的情况),或表明这是一个非有机组织社区,可能存在垃圾 帐户。

    图16-24.Twitter上的大数据社区

    在全局数据上利用局部属性

    我们已经算出了对一个节点、一条边以及一个邻居的社区度量标准。那么面对整个 网络该怎么做?这里没有足够的篇幅来讲述这个问题,但是通过在图上产生每个 “三角关系”,你能同时测量每个节点的集聚系数值。对每个用户,比较他们所属 的三角关系的个数及其链接度,就可以得到集聚系数值。

    请注意,还记得我们前面对节点度巨大差异性的讨论吗?不假思索地扩展前面的方法会导致数据的激增 流行音乐明星@britneyspears(在2010年7月有520万粉丝,42万关注)或@WholeFoods(170万粉丝,60万关注),每个人都会产生上万 亿的数据记录。更糟糕的是,因为大社区具有稀疏的集聚系数,所以所有这些数据 都几乎会被扔掉!我们有更优雅的方法在整个图上做数据处理。但一定要牢记现实 世界是如何描述这个问题的。如果你断言@britneyspears和这42万人不是朋 友,你可以只保留强链接。给每条边赋权重(考虑关注数、是否是对称链接等因素) 并且给来自某个节点的链接数设置上限。这将大幅缩减中间数据的规模,但仍然可 以合理地估计社区凝聚性。

    关注微信获取最新动态