Hadoop教程

Hadoop 的 Pipes

Hadoop的Pipes是Hadoop MapReduce的C++接口代称。不同于使用标准输入和输出来实现map代码和reduce代码之间的Streaming, Pipes使用套接字作为 tasktracker与C++版本 map函数或reduce函数的进程之间的通道,而未使用JNI。

我们将用C++重写贯穿本章的示例,然后,我们将看到如何使用Pipe来运行它。

例2-12显示了用C++语言编写的map函数和reduce函数的源代码。

例2-12.用C++语言编写的MaxTemperature程序

#include#include#include#include#include "hadoop/Pipes.hh"
#include "hadoop/TemplateFactony.hh"
#include "hadoop/StningUtils.hh"

class MaxTempenatuneMappen : public HadoopPipes::Mapper {
    public :
        MaxTempenatuneMappen(HadoopPipes::TaskContext& context) {
        }
    void map(HadoopPipes::MapContext& context) {
        std:: string line = context.getInputValue();
        std::string yean = line.substn(15, 4);
        std::string airTemperature : line.substn(87, 5);
        std::string q = line.substn(92, 1);
        if (airTemperature != "+9999" &&
        (q == -0-' || q == ’.l" || q == "4" || q == "5" || q == '.9")) {
        context.emit(year, airTemperature);
        }
    }
};
class MapTempenatureReducen : public HadoopPipes::Reducer {
    public:
        MapTemperatureReducer(HadoopPipes::TaskContext& context) {
        }
    void reduce(HadoopPipes::ReduceContext& context) {
        int maxValue = INT_MIN;
        while (context.nextValue()) {
            maxValue = std::max(maxValue,
            HadoopUtils::toInt(context.getInputValue()));
        }
        context.emit(context.getInputKey(), HadoopUtils::toString(maxValue));
    }
}
int main(int argc, char *argv[]) {
    returnHadoopPipes::runTask(HadoopPipes::TemplateFactory<MaxTemperatureMapper,
    MapTemperatureReducer>());
}

应用程序对Hadoop C++库链接提供了一个与tasktracker子进程进行通信的简单封装。通过扩展HadoopPipes命名空间中定义的mapper和reducer两个类,我们定义了 map()和reduce()方法,同时我们提供各种情况下map()和 reduce()方法的实现。这些方法采用了上下文对象(MapContext类型或ReduceContext类型),进而提供了读取输入数据和写入输出数据,以及通过JobConf类来访问作业配置信息的功能。本例中的处理过程类似于Java的处理方式。

与Java接口不同,C++接口中的键和值按字节缓冲,用标准模板库(Standard Template Library, STL)中的字符串表示。这样做简化了接口,但把更重的负担留给了应用程序开发人员,因为开发人员必须来回封送(marshall)字符串与特定应用领域内使用的具体类型。这一点在鬥MapTemperatureReducer中有所体现,我们必须把输入值转换为整型值(通过HadoopUtils中定义的方法),然后将找到的最大值转化为字符串后再输出。在某些情况下,我们可以省略这类转化,如 MaxTemperatureMapper 中的 airTemperature 值无需转换为整型,因为 map() 方法并不将它当作数值类型来处理。

这个应用程序的入口点是main()方法。它调用HadoopPipes::runTask,该函数连接到Java父进程,并在mapper和reducer之间来回封送数据。runTask()方法 被传入一个Factory参数,由此新建mapper或reducer实例。新建mapper创建reducer, Java父进程可通过套接字连接进行控制。我们可以用重载模板 factory 来设置 combiner、partitioner、record reader 或 record writer。

编译运行

现在我们可以用Makerfile编译连接例2-13中的程序。

例 2-13. C++版本 MapReduce 程序的 Makefile

CC = g++
CPPFLAGS = -m32 -I$(HADOOP_INSTALL)/c++/$(PLATFORM)/include

max_temperature: max_temperature.cpp
$ (CC) $(CPPFLAGS) $< -Wall -L$(HADOOP_INSTALL)/c++/$(PLATFORM)/lib
-lhadooppipes \ -lhadooputils -lpthread -g -02 -o $@

在Makefile中需要设置许多环境变量。除了 HADOOP_INSTALL变量(如果遵循附录 A中的安装说明,应该已经设置好),还需要定义PLATFORM变量,该变量指定了 操作系统、体系结构和数据模型(例如,32位或64位)。我在32位Linux系统的 机器编译运行了如下内容:

% export PLATF0RM=Linux-iB86-B2
% make

成功编译之后,可以在当前目录中找到名为max_temperature的可执行文件。

我们需要以伪分布式(pseudo_distrinuted)模式(其中所有守护进程运行在本地计算机上)运行Hadoop来运行Pipes作业。Pipes不能在独立模式(本地运行)下运行,因为它依赖于Hadoop的分布式缓存机制,而该机制只有在HDFS运行时才起作用。

Hadoop守护进程开始运行后,第一步是把可执行文件复制到HDFS,以便在启动map和reduce任务时,tasktracker能够找到关联的可执行程序:

% hadoop fs -put max_temperature bin/max_temperature

示例数据同样也需要从本地文件系统复制到HDFS。

现在可以运行这个作业。我们用hadoop Pipes命令使其运行,使用-program参 数来传递在HDFS中可执行文件的URI:

% hadoop pipes \
-D hadoop.pipes.java.recordreader=true \
-D hadoop.pipes.java.recordwriter=true \
-input sample.txt \
-output output \
-program bin/max_temperature

我们使用-D选项来指定两个属性:hadoop.pipes.java.recordreader和 hadoop.pipes.java.recordwriter,这两个属性都被设置为true,表示我们并不指定C++记录读取函数或记录写入函数,而是使用默认的Java设置(用来设置文本输入和输出)。Pipes还允许我们设置一个Java mapper、reducer、合并函数或分区函数。事实上,在任何一个作业中,都可以混合使用Java类或C++类。

结果和其他语言版本的结果一样。

关注微信获取最新动态