使用Hadoop分析数据
为了充分发挥Hadoop提供的并行处理优势,我们需要将查询表示成MapReduce作业。经过一些本地的小规模测试,我们将能够在集群设备上运行Hadoop。
map阶段和reduce阶段
MapReduce任务过程被分为两个处理阶段:map阶段和reduce阶段。每个阶段都以键/值对作为输入和输出,并由程序员选择它们的类型。程序员还需具体定义两个函数:map函数和reduce函数。
map阶段的输入是原始NCDC数据。我们选择文本格式作为输入格式,以便将数 据集的每一行作为一个文本值进行输入。键为该行起始位置相对于文件起始位置的偏移量,但我们不需要这个信息,故将其忽略。
我们的map函数很简单。由于我们只对年份和气温这两个属性感兴趣,所以只需要取出这两个属性数据。在本例中,map函数只是一个数据准备阶段,通过这种方式来准备数据,使reduce函数能在该准备数据上继续处理:即找出每年的最髙气温。map函数还是一个比较适合去除已损记录的地方:此处,我们将筛选掉缺失的、可疑的或错误的气温数据。
为了全面了解map的工作方式,我们思考以下几行作为输入数据的示例数据(考 虑到页面篇幅,去除了一些未使用的列,并用省略号表示):
0067011990999991950051507004...9999999N9+00001+99999999999... 0043011990999991950051512004...9999999N9+00221+99999999999... 0043011990999991950051518004...9999999N9-00111+99999999999... 0043012650999991949032412004...0500001N9+01111+99999999999... 0043012650999991949032418004...0500001N9+00781+99999999999...
这些行以键/值对的方式来表示map函数:
(0, 0067011990999991950051507004...9999999N9+00001+99999999999...) (106, 0043011990999991950051512004...9999999N9+00221+99999999999...) (212, 0043011990999991950051518004...9999999N9-00111+99999999999...) (318, 0043012650999991949032412004...0500001N9+01111+99999999999...) (424, 0043012650999991949032418004...0500001N9+00781+99999999999...)
键(Key)是文件中的行偏移量,map函数并不需要这个信息,所以将其忽略。map函数的功能仅限于提取年份和气温信息(以粗体显示),并将它们作为输出(气温值已用整数表示):
(1950,0) (1950,22) (1950,-11) (1949,111) (1949,78)
map函数的输出经由MapReduce框架处理后,最后被发送到reduce函数。这一处理过程中需要根据键对键/值对进行排序和分组。因此,我们的示例中,reduce函数会看到如下输入:
(1949, [111, 78]) (1950, [0, 22, -11])
每一年份后紧跟着一系列气温数据。所有reduce函数现在需要做的是遍历整个列表并从中找出最大的读数:
(1949, 111) (1950, 22)
这是最终输出结果:每一年的全球最高气温记录。
整个数据流如图2-1所示。在图的底部是Unix的流水线(pipeline,也称管道或管线) 命令,用于模拟整个MapReduce的流程,部分内容将在讨论Hadoop Streaming时再次涉及。
图2-1. MapReduce的逻辑数据流
Java MapReduce
明白MapReduce程序的工作原理之后,下一步便是通过代码来实现它。我们需要三样东西:一个map函数、一个reduce函数和一些用来运行作业的代码。map函数由Mapper接口实现来表示,后者声明了一个map()方法。例2-3显示了我们的map函数实现。
例2-3.査找最高气温的Mapper
import java.io.IOException; import ong.apache.hadoop.io.IntWnitable; import org.apache.hadoop.io.LongWritable; import ong.apache.hadoop.io.Text; import ong.apache.hadoop.mapned.MapReduceBase; import ong.apache.hadoop.mapned.Mappen; import ong.apache.hadoop.mapned.OutputCollecton; import ong.apache.hadoop.mapned.Reporter; public class MaxTempenatuneMappen extends MapReduceBase implements Mappen < LongWnitable, Text, Text, IntWnitable > { private static final int MISSING = 9999; public void map(LongWnitable key.Text value, OutputCollecton < Text IntWnitable > output, Reporter reporter) throws IOException { String line = value.toStning(); String yean = line.substning(15, 19); int ainTempenatune; if (line.chanAt(87) == ' + ') { // panseInt doesn't like leading plus signs airTempenatune = Integer.panseInt(line.substning(88) } else { airTempenatune = Integer.panseInt(line.substning(87, 92)); } String quality = line.substning(92, 93); if (ainTempenatune != MISSING && quality.matches("[01459]")) { output.collect(new Text(year), new IntWnitable(airTempenature)); } } }
该Mapper接口是一个泛型类型,它有四个形参类型,分别指定map函数的输入 键、输入值、输出键和输出值的类型。就目前的示例来说,输入键是一个长整数偏 移量,输入值是一行文本,输出键是年份,输出值是气温(整数)。Hadoop自身提 供一套可优化网络序列化传输的基本类型,而不直接使用Java内嵌的类型。这些 类型均可在org.apache.hadoop.io包中找到。这里我们使用LongWritable类 型(相当于Java中的Long类型)、Text类型(相当于Java中的String类型)和 IntWritable类型(相当于Java中的Integer类型)。
map()方法的输入是一个键和一个值。我们首先将包含有一行输入的Text值转换 成Java的String类型,之后使用substring()方法提取我们感兴趣的列。
map()方法还提供了 OutputCollector实例用于输出内容的写入。在这种情况下,我们将年份数据按Text对象进行读/写(因为我们把年份当作键),将气温值封装在IntWritable类型中。
我们只在气温数据不缺失并且所对应质量代码显示为正确的气温读数时,才将其写入输出记录中。
Reduce函数通过Reducer进行类似的定义,如例2-4所示。
例2-4.查找最高气温的Reducer
import java.io.IOException; import java.util.Iterator; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reducer; import org.apache.hadoop.mapred.Reporter; public class MaxTemperatureReducer extends MapReduceBase implements Reducen < Text, IntWritable, Text, IntWritable > { public void neduce(Text key, Itenator < IntWnitable > values, OutputCollector < Text, IntWritable > output, Reporter reporter) throws IOException { int maxValue = Integen.MIN_VALUE; while (values.hasNext()) { maxValue = Math.max(maxValue, values.next().get()); } output.collect(key, new IntWritable(maxValue)); } }
同样,针对reduce函数也有四个形式参数类型用于指定其输入和输出类型。reduce 函数的输入类型必须与map函数的输出类型相匹配:即Text类型和 IntWritable类型。在这种情况下,reduce函数的输出类型也必须是Text和IntWritable这两种类型,分别输出年份和最高气温。该最高气溫是通过循环过 较当前气温与已看到的最髙气温获得的。
第三部分代码负责运行MapReduce作业(请参见例2-5)。
例2-5.该应用程序在气象数据集中找出最高气温
import java.io.IOException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; public class MaxTemperature { public static void main(String[]args)throws IOException { if (args.length != 2) { System.err.println("Usage: MaxTemperature <input path> <output path>"); System.exit(-1); } JobConf conf = new JobConf(MaxTemperature.class); conf.setDobName("Max temperature"); FileInputFormat.addInputPath(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); conf.setMapperClass(MaxTemperatureMapper.class); conf.setReducerClass(MaxTemperatureReducer.class); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); JobClient.runJob(conf); } }
JobConf对象指定了作业执行规范。我们可以用它来控制整个作业的运行。在Hadoop集群上运行这个作业时,我们需要将代码打包成一个JAR文件(Hadoop会在集群上分发这个文件)。我们无需明确指定认尺文件的名称,而只需在JobConf 的构造函数中传递一个类,Hadoop将通过该类查找包含有该类的JAR文件进而找到相关的JAR文件。
构造JobConf对象之后,需要指定输入和输出数据的路径。调用FileInputFormat类的静态函数addInputPath()来定义输入数据的路径,该路径可以是单个文件、目录(此时,将目录下所有文件当作输入)或符合特定文件模式的一组文件。由函数名可知,可以多次调用addInputPath()实现多路径的输入。
通过调用FileOutputFormat类中的静态函数setOutputPath()来指定输出路径。该函数指定了reduce函数输出文件的写入目录。在运行任务前该目录不应该存在,否则Hadoop会报错并拒绝运行该任务。这种预防措施是为了防止数据丢失 (一个长时间运行任务的结果被意外地覆盖将是非常恼人的)。
接着,通过 setMapperClass ()和 setReducerClass ()指定 map 和 reduce 类型。setOutputKeyClass ()和 setOutputValueClass ()控制 map 和 reduce 函数的输出类型,正如本例所示,这两个输出类型往往相同。如果不同,map函数的输出类型则 通过 setMapOutputKeyClass ()和 setOutputValueClass ()函数来设置。
输入的类型通过InputFormat类来控制,我们的例子中没有设置,因为使用的是默认的TextInputFromat(文本输入格式)。
在设置定义map和reduce函数的类后,便可以开始运行任务。JobClient类的静态函数runJob()会提交作业并等待完成,最后将其进展情况写到控制台。
运行测试
写好MapReduce作业后,通常会拿一个小型的数据集进行测试以排除代码相关问题。首先,以独立(本机)模式安装Hadoop,详细说明请参见附录A。在这种模式下,Hadoop在本地文件系统上运行作业运行程序。让我们用前面讨论过的5行采 样数据为例子来测试MapReduce作业(考虑到篇幅,这里对输出稍有修改):
% export HADOOP_CLASSPATH=build/classes % hadoop MaxTemperature input/ncdc/sample.txt output 09/04/07 12:34:35 INFO jvm.JvmMetnics: Initializing 〕VM Metrics with processName=〕ob Tracker, sessionId= 09/04/07 12:34:35 WARN mapned.JobClient: Use GenenicOptionsPansen for pansing the arguments. Applications should implement Tool for the same. 09/04/07 12:34:35 WARN mapred,JobClient: No job jar file set. User classes may not be found. See JobConf(Class) on JobConf#setJar(String). 09/04/07 12:34:35 INFO mapned.FileInputFonmat: Total input paths to process:1 09/04/07 12:34:35 INFO mapred.JobClient: Running job: job_local_0001 09/04/07 12:34:35 INFO mapned.FileInputFonmat: Total input paths to process:1 09/04/07 12:34:35 INFO mapred.MapTask: numReduceTasks: 1 09/04/07 12:34:35 INFO mapned.MapTask: io.sont.mb = 100 09/04/07 12:34:35 INFO mapred_MapTask: data buffer = 79691776/99614720 09/04/07 12:34:35 INFO mapred.MapTask: record buffer = 262144/327680 09/04/07 12:34:35 INFO mapned.MapTask: Starting flush of map output 09/04/07 12:34:36 INFO mapned.MapTask: Finished spill 0 09/04/07 12:34:36 INFO mapned.TaskRunnen: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting 09/04/07 12:34:36 INFO mapned.LocalJobRunner: file:/Usens/tom/wonkspace/htdg/input/n cdc/sample.txt:0+529 09/04/07 12:34:36 INFO mapned.TaskRunnen:Task'attempt_local_0001_m_000000_0' done. 09/04/07 12:34:36 INFO mapned.LocalJobRunnen: 09/04/07 12:34:36 INFO mapned.Mengen: Merging 1 sorted segments 09/04/07 12:34:36 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 57 bytes 09/04/07 12:34:36 INFO mapned.LocalJobRunner: 09/04/07 12:34:36 INFO mapned.TaskRunnen: Task:attempt_local_0001_n_000000_0 is done. And is in the process of commiting 09/04/07 12:34:36 INFO mapned.LocalJobRunner: 09/04/07 12:34:36 INFO mapned.TaskRunnen: Task attempt_local_0001_n_000000_0 is allowed to commit now 09/04/07 12:34:36 INFO mapned.FileOutputCommitten: Saved output of task 'attempt_local_0001_n_000000_0' to file:/Usens/tom/wonkspace/htdg/output 09/04/07 12:34:36 INFO mapned.LocalDobRunnen: reduce > reduce 09/04/07 12:34:36 INFO mapned.TaskRunnen:Task'attempt_local_0001_n_000000_01 done. 09/04/07 12:34:36 INFO mapned.DobClient: map 100% reduce 100% 09/04/07 12:34:36 INFO mapned.DobClient: Job complete: job_local_0001 09/04/07 12:34:36 INFO mapred.JobClient: Counters: 13 09/04/07 12:34:36 INFO mapned.DobClient: FileSystemCountens 09/04/07 12:34:36, INFO mapned.DobClient: FILE_BYTES_READ=27571 0 9/04/07 12:34:36 INFO mapned.DobClient: FILE_BYTES_WRITTEN=53907 09/04/07 12:34:36 INFO mapned.DobClient: Map-Reduce Framework 09/04/07 12:34:36 INFO mapned.DobClient Reduce input gnoups=2 09/04/07 12:34:36 INFO mapred.]obClient: Combine output neconds=0 09/04/07 12:34:36 INFO mapned.DobClient: Map input neconds=5 09/04/07 12:34:36 INFO mapned.DobClient: Reduce shuffle bytes=0 09/04/07 12:34:36 INFO mapned.DobClient: Reduce output neconds=2 09/04/07 12:34:36 INFO mapned.DobClient: Spilled Reconds=10 09/04/07 12:34:36 INFO mapred.]obClient: Map output bytes=45 09/04/07 12:34:36 INFO mapned.DobClient: Map input bytes=529 09/04/07 12:34:36 INFO mapned.JobClient: Combine input neconds=0 09/04/07 12:34:36 INFO mapned.JobClient: Map output neconds=5 09/04/07 12:34:36 INFO mapned.JobClient: Reduce input neconds=5
如果调用hadoop命令的第一个参数是类名,则Hadoop将启动一个JVM来运行这个类。使用hadoop命令运行作业比直接使用Java命令运行更方便,因为前者将 Hadoop库文件(及其依赖关系)路径加入到类路径参数中,同时也能获得Hadoop的配置文件。我们需要定义一个HADOOP_CLASSPATH环境变量用于添加应用程序类的路径,然后由hadoop脚本来执行相关操作。
以本地(独立)模式运行时,本书中所有程序均假设按照这种方式来设置HADOOP_CLASSPATH。命令的运行需要在示例代码所在的文件夹下进行。
运行作业所得到的输出提供了一些有用的信息。无法找到作业JAR文件的警告信息是意料之中的,因为我们没有使用JAR文件在本地模式下运行。在集群上运行时,将不会看到这个警告。例如,我们可以看到,这个作业有指定的标识,即 job_local_0001,并且执行了 一个map任务和一个reduce任务(使用 attempt_local_0001_m_000000_0 和 attempt_local_0001_r_000000_0 两个ID)。在调试MapReduce作业时,知道作业和任务的ID是非常有用的。
输出的最后一部分,以Counters为标题,显示在Hadoop上运行的毎个作业的一些统计信息。这些信息对检查这些大量的数据是否按照预期进行处理非常有用。例如,我们查看系统输出的记录信息可知:5个map输入产生了5个map的输出,然后5个reduce输入产生2个reduce输出。
输出数据写入output目录,其中毎个reduce都有一个输出文件。我们的例子中包 含一个reducer,所以我们只能找到一个文件,名为part-00000:
% cat output/part-00000 1949 111 1950 22
新增的Java MapReduce API
Hadoop的版本0.20.0包含有一个新的Java MapReduce API,有时也称为“上下文对象”(context object),旨在使API在今后更容易扩展。新的API在类型上不兼容先前的API,所以,需要重写以前的应用程序才能使新的API发挥作用。新增的API和旧的API之间,有下面几个明显的区别。
-
新的API倾向于使用虚类,而不是接口,因为这更容易扩展。例如,可以无需修改类的实现而在虚类中添加一个方法(即用默认的实现)。在新的API中,mapper和reducer现在都是虚类。
-
新的API放在org.apache.hadoop.mapreduce包(和子包)中。之前版本的 API 依旧放在 org.apache.hadoop.mapred 中。
-
新的API充分使用上下文对象,使用户代码能与MapReduce系统通信。例如,MapContext基本具备了Jobconf、OutputCollector和Reporter的功能。
-
新的API同时支持“推”(push)“拉”(pull)式的迭代。这两类API,均可以将键/值对记录推给mapper,但除此之外,新的API也允许把记录从map()方法中拉出。对reducer来说是一样的。“拉”式处理数据的好处是可以实现 数据的批量处理,而非逐条记录地处理。
-
新增的API实现了配置的统一。旧API通过一个特殊的JobConf对象配置作业,该对象是Hadoop配置对象的一个扩展。在新的API中,我们丢弃这种区分,所有作业的配置均通过Configuration来完成。
-
新API中作业控制由Job类实现,而非DobClient类,新API中删除了DobClient类。
-
输出文件的命名方式稍有不同。map的输出文件名为part-m-nnnnn,而reduce的输出为part-r-nnnnn(其中nnnnn分块序号,为整数,且从0开始算)。
例2-6显示了使用新API重写的MaxTemperature应用。不同之处已加粗显示。
将旧API写的Mapper和Reducer类转换为新API时,记住将map()和reducer()的签名转换为新形式。如果只是将类的继承修改为对新的 Mapper和Reducer类的继承,编译的时候也不会报错或显示警告信息,因为新的Mapper和Reducer类同样也提供了等价的map()和reduce()函数。但是,自己写的map和reducer代码是不会被调用的,这会导致难以诊断的错误。
例2-6.用新上下文对象MapReduce APl重写的MaxTemperature应用
public class NewMaxTemperature { static class NewMaxTemperatureMapper extends Mapper < LongMritable, Text)Text, IntMritable > { private static final int MISSING = 9999; public void map(Longhlnitable key, Text value)Context context) throws IOException, InterruptedException { String line = value.toString(); String year = line.substring(15, 19); int airTemperature; if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs airTemperature = Integer.parseInt(line.substring(88, 92)); } else { airTemperature = Integer.parseInt(line.substring(87, 92)); } String quality = line.substring(92, 93); if (airTemperature != MISSING && quality.matches("[01459]")) { context.write(new Text(year), new IntWritable(airTemperature)); } } } static class NewMaxTemperatureReducer extends Reducer < Text, IntWritable, Text, IntWritable > { public void reduce(Text key, Iterable < IntWritable > values, Context context) throws IOException, InterruptedException { int maxValue = Integer.MIN_VALUE; for (IntWritable value : values) { maxValue = Math.max(maxValue, value.get()); } context.write(key, new IntWritable(maxValue)); public static void main(String[]args)throws Exception { if (args.length != 2) { System.err.println("Usage: NewMaxTemperature <input path> <output path>"); System.exit(-1); Job job = new Job(); job.setJarByClass(NewMaxTemperature.class); FileInputFormat, addInputPath(job, new Path(args[0])); FileOutputFormat, setOutputPath(job, new Path(args[1])); job.setMapperClass(NewMaxTemperatureMapper.class); job.setReducerClass(NewMaxTemperatureReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); System.exit(job.waitForCompletion(true) ? 0 : 1); } } } } }