Hadoop教程

Hadoop 的 Streaming

Hadoop提供了MapReduce的API,并允许你使用非Java的其他语言来写自己的map和reduce函数。Hadoop的Streaming使用Unix标准流作为Hadoop和应用程 序之间的接口,所以我们可以使用任何编程语言通过标准输入/输出来写 MapReduce 程序。

Streaming天生适合用于文本处理(尽管到0.21.0版本时,它也可以处理二进制流),在文本模式下使用时,它有一个数据的行视图。map的输入数据通过标准输入流传递给map函数,并且是一行一行地传输,最后将结果行写到标准输出。map输出的键/值对是以一个制表符分隔的行,它以这样的形式写到标准输出。reduce 函数的输入格式相同——通过制表符来分隔的键/值对——并通过标准输入流进行传输。reduce函数从标准输入流中读取输入行,该输入已由Hadoop框架根据键排过序,最后将结果写入标准输出。

下面使用streaming来重写按年份査找最高气温的MapReduce程序。

Ruby版本

例2-8显示了用Ruby编写的map函数。

例2-8.用Ruby编写査找最高气温的map函数

#!/usr/bin/env ruby
STDIN.each_line do |line|
val = line
year, temp,q = val[15,4], val[87,5], val[92,1]
puts "#{year}\t#{temp}" if (temp != "+9999” && q =~/[01459]/)
end

程序通过程序块执行STDIN(一个 IO 类型的全局常量)中的每一行来迭代执行标准输入中的每一行。该程序块从输入的毎一行中取出相关字段,如果气温有效,就将年份以及气温写到标准输出(使用puts),其中年份和气温之间有一个制表符\t。

值得一提的是Streaming和Java MapReduce API之间的设计差异。Java API控制的map函数一次只能处理一条记录。针对输入数据中的每一条记录,该框架均需调用Mapper的map()方法来处理,然而在Streaming中,map程序可以自己决定如何处理输人数据,例如,它可以轻松读取并同时处理若干行,因为它受读操作的控制。用户的Javamap实现的是“推”记录方式,但它依旧可以同时处理多行,具体做法是通过mapper中实例变迸将之前读取的多行汇聚在一起。在这种情况下,需要实现close()方法,以便知道何时读到最后一条记录, 进而完成对最后一组记录行的处理。

由于该脚本只能在标准输入和输出上运行,所以最简单的方式是在Unix管道上进行测试,而不是在Hadoop中进行测试:

% cat input/ncdc/sample.txt | ch02/spc/main/ruby/max_temperaturejnap. rb
1950  +0000
1950  +0022
1950  -0011
1949  +0111
1949  +0078

例2-9显示的reduce函数更复杂一些。

例2-9.用Ruby编写的査找最高气温的reduce函数

#!/usr/bin/env ruby

last_key, max_val = nil, 0
STDIN.each_line do |line|
key, val = line.split("\t")
if last_key && last_key != key
puts "#{last_key}\t#{max_val}"
last_key, max_val = key, val.to_i
else
last_key, max_val = key, [max_val, val.to_i].max
end
end
puts "#{last_key}\t#{max_val}" if last_key

同样地,程序遍历标准输入中的行,但在我们处理每个键组时,要存储一些状态。 在这种情况下,键是气象站的标识符,我们存储看到的最后一个键和迄今为止见到的该键对应的最髙气温。MapReduce框架保证了键的有序性,由此我们知道,如果读到一个键与前一个键不同,就需要开始处理一个新的键组。相比之下,Java API系统提供一个针对毎个键组的迭代器,而在Streaming中,需要在程序中找出键组的边界。

我们从毎行取出键和值,然后如果正好完成一个键组的处理(last_key & last_key = key),就针对该键组写入该键及其最高气温,用一个制表符来进行分隔,最后开始处理新键组时我们需要重置最高气温值。如果尚未完成对一个键组的处理,那么就只有当前键的最髙气温被更新。

程序的最后一行确保了处理完输入的最后一个键组后会有一行输出。

现在可以用Unix管线来模拟整个MapReduce管线,该管线与图2-1中显示的Unix 管线是相同的:

% cat input/ncdc/sample.txt | ch02/src/main/ruby/max_tenq>eraturejnap.rb | \
sort I ch02/src/main/ruby/max_temperature_reduce.rb
1949   111
1950   22

输出结果和Java程序的一样,所以下一步是通过Hadoop运行它。

hadoop命令不支持Streaming函数,因此,我们需要在指定Streaming JAR文件流与jar选项时指定。Streaming程序的选项指定了输入和输出路径,以及map和reduce脚本。如下所示:

% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \
-input input/ncdc/sample.txt \
-output output \
-mapper ch02/src/main/ruby/max_temperature_map.rb \
-reducer ch02/src/main/ruby/max_temperature_reduce.rb

在一个集群上运行一个庞大的数据集时,我们要使用-combiner 选项来设置合并函数。

从0.21.0版开始,合并函数可以是任何一个Streaming命令。对于早期版本,合并函数只能用Java编写,所以一个变通的方法是在mapper中进行手动合并,进而避开Java语言。在这里,我们可以把mapper改成流水线:

% hadoop jar $HADOOP_INSTALL/contrib/streaming/hadoop-*-streaming.jar \
-input input/ncdc/all \
-output output \
-mapper "ch02/src/main/ruby/max_temperature_map.rb | sort |
 ch02/src/main/ruby/max_temperature_reduce,rb" \
-reducer ch02/src/main/ruby/max_temperature_reduce.rb \
-file ch02/src/main/ruby/max_temperature_map.rb \
-file ch02/src/main/ruby/max_temperature_reduce.rb

还需注意-file选项的使用,在集群上运行Streaming程序时,我们会使用这个选项,从而将脚本传输到集群。

Python版本

Streaming支持任何可以从标准输入读取和写入到标准输出中的编程语言,因此对于更熟悉Python的读者,下面提供了同一个例子的Python版本。map脚本参见 例2-10,比如06脚本参见例2-11。

例2-10.用Python编写用于査找最高气温的map函数

#!/usr/bin/env python
import re
import sys

for line in sys.stdin:
val = line.strip()
(year, temp,q) = (val[15:19], val[87:92], val[92:93])
if (temp != '*+9999'* and ne.match("[01459]", q)):
print "%s\t%s" % (yean, temp)

例2-11.用Python编写用于査找最高气温的reduce函数

#!/usr/bin/env python
import sys
(last_key, max_val) = (None,0)
for line in sys.stdin:
(key, val) = line.stnip().split("\t")
if last_key and last_key != key:
pnint "%s\t%s" % (last_key, max_val)
(last_key, max_val) = (key, int(val))
else:
(last_key, max—val) = (key,max(max_val, int(val)))
if last_key:
pnint "%s\t%s" % (last_key, max_val)

我们可以像测试Ruby程序那样测试程序并运行作业。例如,可以像下面这样运行测试:

% cat input/ncdc/sample.txt | ch02/src/main/python/max_tenperature_map.py | \
sort | ch02/src/main/python/max_temperature_reduce.py
1949    111
1950    22

关注微信获取最新动态