Hadoop教程

Hadoop 在 Last.fm 的应用

Last.fm:社会音乐史上的革命

Last.fm创办于2002年,它是一个提供网络电台和网络音乐服务的社区网站,向用 户提供很多服务,例如免费听音乐和音乐下载,音乐及重大事件推荐,个性化图表 服务以及其他很多服务。每个月大约有2500万人使用Last.fm,因而产生大量需要 处理的数据。一个例子就是用户传输他们正在收听的音乐信息(也就是收藏 “scrobbling”)。Last.fm处理并且存储这些数据,以便于用户可以直接访问这些 数据(用图表的形式),并且可以利用这些数据来推断用户的个人音乐品味、喜好和 喜爱的艺术家,然后用于寻找相似的音乐。


Hadoop在Lastfm中的应用

随着Last.fm服务的发展,用户数目从数千增长到数百万,这时,存储、处理和管 理这些用户数据渐渐变成一项挑战。幸运的是,当大家认识到Hadoop技术能解决 众多问题之后,Hadoop的性能迅速稳定下来,并被大家积极地运用。2006年初,Last.fm开始使用Hadoop,几个月之后便投入实际应用。Last.fm使用Hadoop的理由归纳如下。

  • 分布式文件系统为它所存储的数据(例如,网志,用户收听音乐的数据)提供冗 余备份服务而不增加额外的费用。
  • 可以方便地通过增添便宜、普通的硬件来满足可扩展性需求。
  • 当时Last.fm财力有限,Hadoop是免费的。
  • 开源代码和活跃的社区团体意味着Last.fm能够自由地修改Hadoop,从而增添 一些自定义特性和补丁。
  • Hadoop提供了一个弹性的容易掌握的框架来进行分布式计算。
  • 现在,Hadoop已经成为Last.fm基础平台的关键组件,目前包括2个Hadoop集 群,涉及50台计算机、300个内核和100丁8的硬盘空间。在这些集群上,运行着 数百种执行各种操作的日常作业,例如日志文件分析、人出测试评测、即时处理和 图表生成。本节的例子將侧重于介绍产生图表的处理过程,因为这是Last.fm对 Hadoop的第一个应用,它展示出Hadoop在处理大数据集时比其他方法具有更强 的功能性和灵活性。


    用Hadoop产生图表

    Last.fm使用用户产生的音轨收听数据来生成许多不同类型的图表,例如计对每个 国家或个人音轨数据的一周汇总图表。许多Hadoop程序处理收听数据和产生这些 图表,它们可以以天、周或月为单位执行。图16-1展示了这些数据在网站上如何 显示的一个例子,本例是音乐的周排行统计数据。

    图16-1. Last.fm音乐排行统计图表

    通常情况下,Last.fm有两种收听信息。

    用户播放自己的音乐(例如,在PC机或其他设备上听MP3文件),这祌信息通 过Last.fm的官方客户端应用或一种第三方应用(有上百种)发送到Last.fm。

    用户收听Last.fm某个网络电台的节目,并在本地计算机上通过流技术缓冲一 首歌。Last.fm播放器或站点能被用来访问这些流数据,然后它能给用户提供 一些额外的功能,比如允许用户对她收听的音频进行喜爱、跳过或禁止等操作。

    在处理接收到的数据时,我们对它们进行分类:一类是用户提交的收听的音乐数据 从现在开始,第一类数据称为“scrobble”(收藏数据),另一类是用户收听的 Last.fm的电台数据(从现在开始,第二类数据称为“radio listen"(电台收听数 据)。为了避免Last.fm的推荐系统出现信息反馈循环的问题,对数据源的区分是非 常重要的,而Last.fm的推荐系统只使用scrobble数据。Last.fm的Hadoop程序的 一项重要任务就是接受这些收听数据,做统计并形成能够在Last.fm网站上进行显 示和作为其他Hadoop程序输入的数据格式。这一过程是Track Statistics(音轨统计) 程序实现的,它就是在以下几节描述的实例。


    Track Statistics 程序

    音乐收听信息被发送到Last.fm时,会经历验证和转换阶段,最终结果是一系列由 空格分隔的文本文件,包含的信息有用户ID(userIdX音乐(磁道)ID(trackIs)、这首 音乐被收藏的次数(Scrobble)、这首音乐在电台中收听的次数(Radio)以及被选择跳 过的次数(Skip)。表16-1包含一些采样的收听数据,后面介绍的例子将用到这些数 据,它是Track Statistics程序的输入(真实数据达GB数量级,并且具有更多的属 性字段,为了方便介绍,这里省略了其他的字段)。

    表16-1.收听数据

    Userld Trackid Scrobble Radio Skip
    111115 222 0 1 0
    111113 225 1 0 0
    111117 223 0 1 1
    111115 225 1 0 0

    这些文本文件作为初始输入提供给Track Statistics程序,它包括利用这个输入数 据计算各种数据值的两个作业和一个用来合并结果的作业(见图16-2)。

    Unique Listeners作业模块统计收听同一首音频的不同用户数,通过累计不同用户 对该音频文件的第一次访问而忽略同一用户对这一文件的多次访问,即可得到该数 值。Sum作业模块通过对所有用户的所有收听信息进行计数来为每个音频统计收 听总数、收藏总数、电台收听总数以及被跳过的总数。

    图16-2.音频状态统计作业

    尽管这两个作业模块的输入格式是相同的,我们仍然需要两个作业模块,因为 Unique Listeners作业模块负责为每个用户对每个音频产生统计值,而Sum作业模 块为每个音频产生统计值。最后Merge作业模块负责合并由这两个模块产生的中 间输出数据得到最终统计结果。运行这段程序的最终结果是对每个音频产生以下几 项数值:

  • 不同的听众数
  • 音频的收藏次数
  • 音频在电台中的点播次数
  • 音频在电台中被收听的总次数
  • 音频在电台广播中被跳过的次数
  • 计算不同的听众数

    Unique Listeners作业模块用于计算每个音频的不同收听用户数。

    UniqueListenerMaper UniqueListenerMaper程序处理用空格分隔的原始收听数据,然后对每个track ID(音频ID)产生相应的即userID(用户ID):

    public void map(LongWritable position, Text rawLine,OutputCollector<IntWritable, IntWritable> output, Reporter reporter) throws IOException {String[] parts = (rawLine.toString()).split("");int scrobbles =Integer.panseInt(pants[TnackStatisticsPnognam.COL_SCROBBLES]);int radioListens =IntegGn.panseInt(pants[TnackStatisticsPnognam.COL_RADIO]);// if track somehow is marked with zero plays - ignore if (scrobbles <= 0 && radioListens <= 0) { return;}// if we get to here then user has listened to track,// so output user id against track id IntWritable tnackId = new IntWnitable(Integer.panseInt(pants[TnackStatisticsPnognam.COL_TRACKID]));IntWritable usenId = new IntWnitable(Integer.panseInt(parts[TrackStatisticsPnognam.COL_USERID])); output.collect(trackId, userId);}

    UniqueListenersReducer UniqueListenensReducen 接收到每个 track ID 对应的user ID数据列表之后,把这个列表放入Set类型对象以消除重复的用户山数据。 然后输出毎个track ID对应的这个集合的大小(不同用户数)。但是如果某个键对应 的值太多,在set对象中存储所有的reduce值可能会有内存溢出的危险。实际上还 没有出现过这个问题,但是为了避免这一问题,我们可以引入一个额外的 MapReduce处理步骤来删除重复数据或使用辅助排序的方法。

    public void reduce(IntWnitable trackId, Iterator<IntWritable> values, OutputCollecton<IntWritable, IntWritable> output, Reporter reporter) throws IOException {Set<Integer> userIds = new HashSet<Integer>();// add all userIds to the set, duplicates automatically removed (set contract) while (values.hasNext()) {IntWritable usenId = values.next(); userIds.add(Integer.valueOf(userId.get()));}// output trackId -> number of unique listeners per track output.collect(trackId, new IntWritable(usenIds.size()));}

    表16-2是这一作业模块的样本输入数据。map输出结果如表16-3所示,reduce输出结果如表16-4所示。

    表16-2.作业的输入

    Line of file Userld Trackld Scrobbled Radioplay Skip
    LongWritable IntWritable IntWritable Boolean Boolean Boolean
    0 11115 222 0 1 0
    1 11113 225 1 0 0
    2 11117 223 0 1 1
    3 11115 225 1 0 0

    表16-3.map输出

    Trackld Userld
    IntWritable IntWritable
    222 11115
    225 11113
    223 11117
    225 11115

    表16-3.reduce输出

    Trackld #listeners
    IntWritable IntWritable
    222 1
    225 2
    223 1

    统计音频使用总数

    Sum作业相对简单,它只为毎个音轨累计我们感兴趣的数据。SumMapper输入数据仍然是原始文本文件,但是这一阶段对输入数据的处理完 全不同。期望的输出结果是针对每个音轨的一系列累计值(不同用户、播放次数、 收藏次数、电台收听次数和跳过次数)。为了方便处理,我们使用一个由Hadoop Record I/O类产生的TrackStats中间对象,它实现了WritableComparable方 法(因此可被用作输出)来保存这些数据。mapper创建一个TrackStats对象,根据 文件中的每一行数据对它进行值的设定,但是“不同的用户数”(unique listener count)这一项没有填写(这项数据由merge作业模块填写)。

    public void map(LongWritable position. Text rawLine,OutputCollector<IntWritable, TrackStats> output, Reporter reporter) throws IOException {String[] parts = (rawLine.toString()).split("");int trackld = Integer.parseInt(parts[TrackStatisticsProgram.COL_TRACKID]); int scrobbles = Integer.parseInt(parts[TrackStatisticsProgram.COL_SCROBBLES]); int radio = Integer.parseInt(parts[TrackStatisticsProgram.COL_RADIO]); int skip = Integer.parseInt(parts[TrackStatisticsProgram.COL_SKIP]);// set number of listeners to 0 (this is calculated later)// and other values as provided in text fileTrackStats trackstat = new TrackStats(0, scrobbles + radio, scrobbles, radio, skip); output.collect(new IntMritable(trackId), trackstat);}

    SumReducer在这一过程,reducer执行和mapper相似的函数一一对每个音频使用总数情况进行统计,然后返回一个总的统计数据:

    public void reduce(IntWnitable tnackId, Itenaton<TnackStats> values, OutputCollecton<IntWnitable, TnackStats> output, Reporter reporter) throws IOException {TnackStats sum = new TnackStats(); // holds the totals for this track while (values.hasNext()) {TnackStats tnackStats = (TrackStats) values.next(); sum.setListeners(sum.getListeners() + tnackStats.getListeners()); sum.setPlays(sum.getPlays() + trackStats.getPlays()); sum.setSkips(sum.getSkips() + trackStats.getSkips()); sum.setScnobbles(sum.getScnobbles() + tnackStats.getScrobbles()); sum.setRadioPlays(sum.getRadioPlays() + trackStats,getRadioPlays());}output.collect(tnackId, sum);}

    表16-5是这个部分作业的输入数据(和Unique Listener作业模块的输入一样)。map 的输出结果如表16-6所示,reduce的输出结果如表16-7所示。

    表16-5.作业输入

    Line Userld Trackld Scrobbled Radioplay Skip
    LongWritable IntWritable IntWritable Boolean Boolean Boolean
    0 11115 222 0 1 0
    1 11113 225 1 0 0
    2 11117 223 0 1 1
    3 11115 225 1 0 0

    表16-6. map输出

    Trackid #listeners #plays #scrobbles #radtoplays #skips
    IntWritable IntWritable IntWritable IntWritable IntWritable IntWritable
    222 0 1 0 1 0
    225 0 1 1 0 0
    223 0 1 0 1 1
    225 0 1 1 0 0

    表 16-7. reduce 输出

    Trackid #listeners #plays #scrobbles #radtoplays #skips
    IntWritable IntWritable IntWritable IntWritable IntWritable IntWritable
    222 0 1 0 1 0
    225 0 2 2 0 0
    223 0 1 0 1 1

    合并结果

    最后一个作业模块需要合并前面两个作业模块产生的输出数据:每个音频对应的不 同用户数和每个音频的使用统计信息。为了能够合并这两种不同的输入数据,我们 采用了两个不同的mapper(对每一种输入定义一个)。两个中间作业模块被配置之后 可以把他们的输出结果写入路径不同的文件,MultipleInputs类用于指定mapper和文件的对应关系。下面的代码展示了作业的JobConf对象是如何设置来 完成这一过程的:

    MultipleInputs.addInputPath(conf, sumInputDir,SequenceFilelnputFormat.class, IdentityMappen.class);MultipleInputs.addInputPath(conf, listenensInputDir,SequenceFilelnputFormat.class, MengeListenensMappen.class);

    虽然单用一个mapper也能处理不同的输入,但是示范解决方案更方便,更巧妙。

    MergeListenersMapper 这个 mapper 用来处理 UniqueListenerJob 输出的每个音轨的不同用户数据。它采用和SumMapper相似的方法创建TrackStats对象,但这次它只填写每个音轨的不同用户数信息,不管其他字段:

    public void map(IntWritable trackld, IntWritable uniqueListenerCount, OutputCollector<IntWritable, TrackStats> output, Reporter reporter) throws IOException {TrackStats trackStats = new TrackStats();trackStats.setListeners(uniqueListenerCount.get()); output.collect(trackId, trackStats);}

    表16-8是mapper的一些输入数据;表16-9是对应的输出结果。

    表16-8. MergeUstenersMapper的输入

    Trackid #listeners
    IntWritable IntWritable
    222 1
    225 2
    223 1

    表16-9. MergeListenersMapper的输出

    Trackid #listeners #plays #scrobbles #radio #skips
    222 1 0 0 0 0
    225 2 0 0 0 0
    223 1 0 0 0 0

    IdentityMapper IdentityMapper 被配置用来处理 SumJob 输出的 TrackStats对象,因为不要求对数据进行其他处理,所以它直接输出输入数据(见表16-10)。

    表16-10.ldentityMapper的输入和输出

    Trackid #listeners #plays #scrobbles #radio #skips
    IntWritable IntWritable IntWritable IntWritable IntWritable IntWritable
    222 0 1 0 1 0
    225 0 2 2 0 0
    223 0 1 0 1 1

    SumReducer 前面两个mapper产生同一类型的数据:每个音轨对应一个 TrackStats对象,只是数据赋值不同。最后的reduce阶段能够重用前面描述的 SumReducer来为每个音轨创建一个新的TrackStats对象,它综合前面两个 TrackStats对象的值,然后输出结果(见表16-11)。

    表16-11. SumReducer的最终输出

    Trackid #listeners #plays #scrobbles #radio #skips
    IntWritable IntWritable IntWritable IntWritable IntWritable IntWritable
    222 1 1 0 1 0
    225 2 2 2 0 0
    223 1 1 0 1 1

    最终输出文件被收集后复制到服务器端,在这里一个web服务程序使Last.fm网站 能得到并展示这些数据。如图16-3所示,这个网页展示了一个音频的使用统计信 息:接听者总数和播放总次数。

    图 16-3. TrackStats结果


    总结

    Hadoop已经成为Last.fm基础框架的一个重要部件,它用于产生和处理各种各样 的数据集,如网页日志信息和用户收听数据。为了让大家能够掌握主要的概念,这 里讲述的例子已经被大大地简化;在实际应用中输入数据具有更复杂的结构并且数 据处理的代码也更加繁琐。虽然Hadoop本身已经足够成熟可以支持实际应用,但 它仍在被大家积极地开发,并且每周Hadoop社区都会为它增加新的特性并提升它 的性能。Last.fm很髙兴是这个社区的一分子,我们是代码和新想法的贡献者,同 时也是对大量开源技术进行利用的终端用户。

    关注微信获取最新动态