Hadoop 的 Pipes
Hadoop的Pipes是Hadoop MapReduce的C++接口代称。不同于使用标准输入和输出来实现map代码和reduce代码之间的Streaming, Pipes使用套接字作为 tasktracker与C++版本 map函数或reduce函数的进程之间的通道,而未使用JNI。
我们将用C++重写贯穿本章的示例,然后,我们将看到如何使用Pipe来运行它。
例2-12显示了用C++语言编写的map函数和reduce函数的源代码。
例2-12.用C++语言编写的MaxTemperature程序
#include#include#include#include#include "hadoop/Pipes.hh" #include "hadoop/TemplateFactony.hh" #include "hadoop/StningUtils.hh" class MaxTempenatuneMappen : public HadoopPipes::Mapper { public : MaxTempenatuneMappen(HadoopPipes::TaskContext& context) { } void map(HadoopPipes::MapContext& context) { std:: string line = context.getInputValue(); std::string yean = line.substn(15, 4); std::string airTemperature : line.substn(87, 5); std::string q = line.substn(92, 1); if (airTemperature != "+9999" && (q == -0-' || q == ’.l" || q == "4" || q == "5" || q == '.9")) { context.emit(year, airTemperature); } } }; class MapTempenatureReducen : public HadoopPipes::Reducer { public: MapTemperatureReducer(HadoopPipes::TaskContext& context) { } void reduce(HadoopPipes::ReduceContext& context) { int maxValue = INT_MIN; while (context.nextValue()) { maxValue = std::max(maxValue, HadoopUtils::toInt(context.getInputValue())); } context.emit(context.getInputKey(), HadoopUtils::toString(maxValue)); } } int main(int argc, char *argv[]) { returnHadoopPipes::runTask(HadoopPipes::TemplateFactory<MaxTemperatureMapper, MapTemperatureReducer>()); }
应用程序对Hadoop C++库链接提供了一个与tasktracker子进程进行通信的简单封装。通过扩展HadoopPipes命名空间中定义的mapper和reducer两个类,我们定义了 map()和reduce()方法,同时我们提供各种情况下map()和 reduce()方法的实现。这些方法采用了上下文对象(MapContext类型或ReduceContext类型),进而提供了读取输入数据和写入输出数据,以及通过JobConf类来访问作业配置信息的功能。本例中的处理过程类似于Java的处理方式。
与Java接口不同,C++接口中的键和值按字节缓冲,用标准模板库(Standard Template Library, STL)中的字符串表示。这样做简化了接口,但把更重的负担留给了应用程序开发人员,因为开发人员必须来回封送(marshall)字符串与特定应用领域内使用的具体类型。这一点在鬥MapTemperatureReducer中有所体现,我们必须把输入值转换为整型值(通过HadoopUtils中定义的方法),然后将找到的最大值转化为字符串后再输出。在某些情况下,我们可以省略这类转化,如 MaxTemperatureMapper 中的 airTemperature 值无需转换为整型,因为 map() 方法并不将它当作数值类型来处理。
这个应用程序的入口点是main()方法。它调用HadoopPipes::runTask,该函数连接到Java父进程,并在mapper和reducer之间来回封送数据。runTask()方法 被传入一个Factory参数,由此新建mapper或reducer实例。新建mapper创建reducer, Java父进程可通过套接字连接进行控制。我们可以用重载模板 factory 来设置 combiner、partitioner、record reader 或 record writer。
编译运行
现在我们可以用Makerfile编译连接例2-13中的程序。
例 2-13. C++版本 MapReduce 程序的 Makefile
CC = g++ CPPFLAGS = -m32 -I$(HADOOP_INSTALL)/c++/$(PLATFORM)/include max_temperature: max_temperature.cpp $ (CC) $(CPPFLAGS) $< -Wall -L$(HADOOP_INSTALL)/c++/$(PLATFORM)/lib -lhadooppipes \ -lhadooputils -lpthread -g -02 -o $@
在Makefile中需要设置许多环境变量。除了 HADOOP_INSTALL变量(如果遵循附录 A中的安装说明,应该已经设置好),还需要定义PLATFORM变量,该变量指定了 操作系统、体系结构和数据模型(例如,32位或64位)。我在32位Linux系统的 机器编译运行了如下内容:
% export PLATF0RM=Linux-iB86-B2 % make
成功编译之后,可以在当前目录中找到名为max_temperature的可执行文件。
我们需要以伪分布式(pseudo_distrinuted)模式(其中所有守护进程运行在本地计算机上)运行Hadoop来运行Pipes作业。Pipes不能在独立模式(本地运行)下运行,因为它依赖于Hadoop的分布式缓存机制,而该机制只有在HDFS运行时才起作用。
Hadoop守护进程开始运行后,第一步是把可执行文件复制到HDFS,以便在启动map和reduce任务时,tasktracker能够找到关联的可执行程序:
% hadoop fs -put max_temperature bin/max_temperature
示例数据同样也需要从本地文件系统复制到HDFS。
现在可以运行这个作业。我们用hadoop Pipes命令使其运行,使用-program参 数来传递在HDFS中可执行文件的URI:
% hadoop pipes \ -D hadoop.pipes.java.recordreader=true \ -D hadoop.pipes.java.recordwriter=true \ -input sample.txt \ -output output \ -program bin/max_temperature
我们使用-D选项来指定两个属性:hadoop.pipes.java.recordreader和 hadoop.pipes.java.recordwriter,这两个属性都被设置为true,表示我们并不指定C++记录读取函数或记录写入函数,而是使用默认的Java设置(用来设置文本输入和输出)。Pipes还允许我们设置一个Java mapper、reducer、合并函数或分区函数。事实上,在任何一个作业中,都可以混合使用Java类或C++类。
结果和其他语言版本的结果一样。