Hadoop 的 Streaming
Hadoop提供了MapReduce的API,并允许你使用非Java的其他语言来写自己的map和reduce函数。Hadoop的Streaming使用Unix标准流作为Hadoop和应用程 序之间的接口,所以我们可以使用任何编程语言通过标准输入/输出来写 MapReduce 程序。
Streaming天生适合用于文本处理(尽管到0.21.0版本时,它也可以处理二进制流),在文本模式下使用时,它有一个数据的行视图。map的输入数据通过标准输入流传递给map函数,并且是一行一行地传输,最后将结果行写到标准输出。map输出的键/值对是以一个制表符分隔的行,它以这样的形式写到标准输出。reduce 函数的输入格式相同——通过制表符来分隔的键/值对——并通过标准输入流进行传输。reduce函数从标准输入流中读取输入行,该输入已由Hadoop框架根据键排过序,最后将结果写入标准输出。
下面使用streaming来重写按年份査找最高气温的MapReduce程序。
Ruby版本
例2-8显示了用Ruby编写的map函数。
例2-8.用Ruby编写査找最高气温的map函数
#!/usr/bin/env ruby STDIN.each_line do |line| val = line year, temp,q = val[15,4], val[87,5], val[92,1] puts "#{year}\t#{temp}" if (temp != "+9999” && q =~/[01459]/) end
程序通过程序块执行STDIN(一个 IO 类型的全局常量)中的每一行来迭代执行标准输入中的每一行。该程序块从输入的毎一行中取出相关字段,如果气温有效,就将年份以及气温写到标准输出(使用puts),其中年份和气温之间有一个制表符\t。
值得一提的是Streaming和Java MapReduce API之间的设计差异。Java API控制的map函数一次只能处理一条记录。针对输入数据中的每一条记录,该框架均需调用Mapper的map()方法来处理,然而在Streaming中,map程序可以自己决定如何处理输人数据,例如,它可以轻松读取并同时处理若干行,因为它受读操作的控制。用户的Javamap实现的是“推”记录方式,但它依旧可以同时处理多行,具体做法是通过mapper中实例变迸将之前读取的多行汇聚在一起。在这种情况下,需要实现close()方法,以便知道何时读到最后一条记录, 进而完成对最后一组记录行的处理。
由于该脚本只能在标准输入和输出上运行,所以最简单的方式是在Unix管道上进行测试,而不是在Hadoop中进行测试:
% cat input/ncdc/sample.txt | ch02/spc/main/ruby/max_temperaturejnap. rb 1950 +0000 1950 +0022 1950 -0011 1949 +0111 1949 +0078
例2-9显示的reduce函数更复杂一些。
例2-9.用Ruby编写的査找最高气温的reduce函数
#!/usr/bin/env ruby last_key, max_val = nil, 0 STDIN.each_line do |line| key, val = line.split("\t") if last_key && last_key != key puts "#{last_key}\t#{max_val}" last_key, max_val = key, val.to_i else last_key, max_val = key, [max_val, val.to_i].max end end puts "#{last_key}\t#{max_val}" if last_key
同样地,程序遍历标准输入中的行,但在我们处理每个键组时,要存储一些状态。 在这种情况下,键是气象站的标识符,我们存储看到的最后一个键和迄今为止见到的该键对应的最髙气温。MapReduce框架保证了键的有序性,由此我们知道,如果读到一个键与前一个键不同,就需要开始处理一个新的键组。相比之下,Java API系统提供一个针对毎个键组的迭代器,而在Streaming中,需要在程序中找出键组的边界。
我们从毎行取出键和值,然后如果正好完成一个键组的处理(last_key & last_key = key),就针对该键组写入该键及其最高气温,用一个制表符来进行分隔,最后开始处理新键组时我们需要重置最高气温值。如果尚未完成对一个键组的处理,那么就只有当前键的最髙气温被更新。
程序的最后一行确保了处理完输入的最后一个键组后会有一行输出。
现在可以用Unix管线来模拟整个MapReduce管线,该管线与图2-1中显示的Unix 管线是相同的:
% cat input/ncdc/sample.txt | ch02/src/main/ruby/max_tenq>eraturejnap.rb | \ sort I ch02/src/main/ruby/max_temperature_reduce.rb 1949 111 1950 22
输出结果和Java程序的一样,所以下一步是通过Hadoop运行它。
hadoop命令不支持Streaming函数,因此,我们需要在指定Streaming JAR文件流与jar选项时指定。Streaming程序的选项指定了输入和输出路径,以及map和reduce脚本。如下所示:
% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \ -input input/ncdc/sample.txt \ -output output \ -mapper ch02/src/main/ruby/max_temperature_map.rb \ -reducer ch02/src/main/ruby/max_temperature_reduce.rb
在一个集群上运行一个庞大的数据集时,我们要使用-combiner 选项来设置合并函数。
从0.21.0版开始,合并函数可以是任何一个Streaming命令。对于早期版本,合并函数只能用Java编写,所以一个变通的方法是在mapper中进行手动合并,进而避开Java语言。在这里,我们可以把mapper改成流水线:
% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \ -input input/ncdc/all \ -output output \ -mapper "ch02/src/main/ruby/max_temperature_map.rb | sort | ch02/src/main/ruby/max_temperature_reduce,rb" \ -reducer ch02/src/main/ruby/max_temperature_reduce.rb \ -file ch02/src/main/ruby/max_temperature_map.rb \ -file ch02/src/main/ruby/max_temperature_reduce.rb
还需注意-file选项的使用,在集群上运行Streaming程序时,我们会使用这个选项,从而将脚本传输到集群。
Python版本
Streaming支持任何可以从标准输入读取和写入到标准输出中的编程语言,因此对于更熟悉Python的读者,下面提供了同一个例子的Python版本。map脚本参见 例2-10,比如06脚本参见例2-11。
例2-10.用Python编写用于査找最高气温的map函数
#!/usr/bin/env python import re import sys for line in sys.stdin: val = line.strip() (year, temp,q) = (val[15:19], val[87:92], val[92:93]) if (temp != '*+9999'* and ne.match("[01459]", q)): print "%s\t%s" % (yean, temp)
例2-11.用Python编写用于査找最高气温的reduce函数
#!/usr/bin/env python import sys (last_key, max_val) = (None,0) for line in sys.stdin: (key, val) = line.stnip().split("\t") if last_key and last_key != key: pnint "%s\t%s" % (last_key, max_val) (last_key, max_val) = (key, int(val)) else: (last_key, max—val) = (key,max(max_val, int(val))) if last_key: pnint "%s\t%s" % (last_key, max_val)
我们可以像测试Ruby程序那样测试程序并运行作业。例如,可以像下面这样运行测试:
% cat input/ncdc/sample.txt | ch02/src/main/python/max_tenperature_map.py | \ sort | ch02/src/main/python/max_temperature_reduce.py 1949 111 1950 22