输入格式
从一般的文本文件到数据库,Hadoop可以处理很多不同类型的数据格式。本节将 探讨数据格式问题。
输入分片与记录
第2章中讲过,一个输入分片(split)就是由单个map处理的输入块。每一个map操 作只处理一个输入分片。每个分片被划分为若干个记录,每条记录就是一个键/值 对,map—个接一个地处理每条记录。输入分片和记录都是逻辑的,不必将它们对 应到文件,虽然常见的形式都是文件。在数据库的场景中,一个输入分片可以对应 于一个表上的若干行,而一条记录对应到一行(DBInputFormat正是这么做的,它 这种输入格式用于从关系数据库读取数据)。
输入分片在Java中被表示为InputSplit接口(和本章提到的所有类一样,它也在org.apache.hadoop.mapred包中)。
public interface InputSplit extends Writable { long getLength() throws IOException; Stning[] getLocations() throws IOException; }
InputSplit包含一个以字节为单位的长度和一组存储位置(即一组主机名)。注 意,一个分片并不包含数据本身,而是指向数据的引用(reference)。存储位置供 MapReduce系统使用以便将map任务尽量放在分片数据附近,而长度用来排序分 片,以便优先处理最大的分片,从而最小化作业运行时间(这也是贪婪近似算法的一个实例)。
MapReduce应用开发人员并不需要直接处理InputSplit ,因为它是由 InputFonmat创建的。InputFormat负责产生输入分片并将它们分割成记录。在 我们探讨InputFormat的具体例子之前,先来简单看一下它在MapReduce中的用 法。接口如下:
public interface InputFonmat<K,V> { InputSplit[] getSplits(3obConf job ,int numSplits) throws IOException; RecordReader<K, V> getRecondReaden(InputSplit split, JobConf job, Reporter reporter) throws IOException; }
JobClient调用getSplits()方法,以期望的map任务数numSplits作为参数传 入,这个参数将作为一个参考值,因为InputFormat实现可以自由地返回另一个 不同于numSplits指定值的分片数。在计算好分片数后,客户端将它们发送到 jobtracker, jobtracker便使用其存储位置信息来调度map任务从而在tasktracker上 处理这些分片数据。在tasktracker上,map任务把输入分片传给InputFormat的getRecordReader()方法来获得这个分片的 RecordReader。RecordReader 基本 就是记录上的迭代器,map任务用一个RecordReader来生成记录的键/值对,然 后再传递给map函数。以下代码片段(基于MapRunner里的代码)演示了该方法:
K key = neader.createKey(); V value = neader.createValue(); while (reader.next(key, value)) { mapper.map(key, value, output, reponten); }
RecordReader的next()方法被反复调用以便为mapper生成键/值对。当RecordReader达到输入流的尾部时,next()方法会返回false,map任务结束。
这段代码清楚展示了相同的键/值对象在每次调用map函数时使用,改 变的只是其内容(被reader的next()方法)。默认键/值是不变的,用户 对此可能有些不解。在rnap()函数之外有对键/值的引用时,这可能引 起问题,因为它的值会在没有警告的情况下被改变。如果确实需要这 样的引用,那么请保存你想保留的对象的一个副本,例如,对于Text 对象,可以使用其复制构造函数:new Text(value)。
这样的情况在reducer中也会发生。reducer的迭代器中的值对象被反复 使用,所以,在调用迭代器之间,一定要复制任何需要保留的任何对 象(参见例8-14)。
最后注意,MapRunner只是运行mapper的一种方式。MultithreadedMapRunner 是另一个MapRunnable接口的实现,它可以使用可配置个数的线程来并发地运行 mappers (使用mapred.map.multithreadedrunner.threads设置)。对于大多数 数据处理任务来说,MapRunner没有优势。但是,对于因为需要连接外部服务器 而造成单个记录处理时间比较长的mapper来说,它允许多个mapper在同一个 JVM下以尽量避免竞争的方式执行。
FiIelnputFormat 类
FileInputFormat是所有使用文件作为其数据源的InputFormat实现的基类(见 图7-2)。它提供了两个功能:一个定义哪些文件包含在一个作业的输入中;一个为 输入文件生成分片的实现。把分片分割成记录的作业由其子类来完成。
FileInputFormat类的输入路径
作业的输入被设定为一组路径,这对限定作业输入提供了很大的灵活性。 FileInputFormat提供四种静态方法来设定JobConf的输入路径:
public static void addInputPath(DobConf conf, Path path) public static void addInputPaths(JobConf conf, String commaSeparatedPaths) public static void setInputPaths(JobConf conf, Path... inputPaths) public static void setInputPaths(JobConf conf, String commaSeparatedPaths)
其中,addInputPath()和addInputPaths()方法可以将一个或多个路径加入路 径列表。这两个方法可以重复调用来建立路径列表。setInputPaths()方法一次 设定完整的路径列表(替换前面调用所设置的任意路径)。
图7-2. lnputFormat类的层次结构
一条路径可以表示一个文件、一个目录或是一个glob,即一个文件和目录的集 合。表示目录的路径包含这个目录下所有的文件,这些文件都作为作业的输入。
被指定为输入路径的目录中的内容不会被递归进行处理。事实上,这 些目录只包含文件:如果包含子目录,也会被当作文件,从而产生错 误。处理这个问题的方法是:使用一个文件glob或一个过滤器根据命 名模式(name pattern)限定选择目录中的文件。
add和set方法允许指定包含的文件。如果需要排除特定文件,可以使用 FileInputFormat的setInputPathFilter方法设置一个过滤器:
public static void setInputPathFilter(DobConf conf, Class<?extends PathFilter> filtep)
即使不设置过滤器,FileInputFormat也会使用一个默认的过滤器来排除隐藏文件(名 称中以“.”和“_”开头的文件)。如果通过调用set&putPathFilter()设置了过滤器,它会在默认过滤器的基础上进行过滤。换句话说,自定义的过滤器只能看 到非隐藏文件。路径和过滤器也可以通过配置属性来设置(见表7-3),这对Streaming和Pipes应用 很方便。Streaming和Pipes接口都使用-input选项来设置路径,所以通常不需要 直接进行手动设置。
表7-3.输入路径和过滤器属性
属性务称 | 类型 | 默认值 | 描述 |
---|---|---|---|
niapred.input.dir | 逗号分隔的路径 | 无 | 作业的输入文件。包含逗号的路径中的逗号由“\”符号转 义,例如:glob {a,b}变成了 {a\, b} |
mapred.input.path Filter.class | PathFilter类名 | 无 | 应用于作业输入文件的过滤器 |
FileInputFormat类的输入分片
给定一组文件,FileInputFormat是如何把它们转换为输入分片的? FileInputFormat只分割大文件。这里的“大”指的是超过HDFS块的大小。分 片通常与HDFS块大小一样,这在大多应用中是合理的,然而,这个值也可以通 过设置不同的Hadoop属性来改变,如表7-4所示。
属性务称 | 类型 | 默认值 | 描述 |
---|---|---|---|
mapred.min.spiit.size | int | 1 | 一个文件分片最小的有 效字节数 |
mapred.max.split.size* | Long | MAX_VALUE, 既 9223372036854775807 | 一个文件分片中最大的 有效字节数(以字节算) |
dfs.block.size | Long | 64 MB,即67108864 | HDFS中块的大小(按字节) |
*这个属性在老版本的MapReduce API中没有出现(除了 CombineFileInputFormat),然 而,这个值是被间接计算的。计算方法是作业总的输人大小除以map任务数,该值由 mapred.map.tasks(或JobConf 上的 SetNumMapTasks()方法)设罝。因为mapred.map.tasks默认值是1,所以,分片的烺大值就是输入的大小
最小的分片大小通常是1个字节,不过某些格式可以使分片大小有一个更低的下 界。(例如,顺序文件在流中每次插入一个同步入口,所以,最小的分片大小不得 不足够大以确保毎个分片有一个同步点,以便reader根据记录边界进行重新同步。
应用程序可以强制设置一个最小的输入分片大小:通过设置一个比HDFS块更大 一些的值,强制分片比文件块大。如果数据存储在HDFS上,那么这样做是没有 好处的,因为这样做会增加对map任务来说不是本地文件的块数。
最大的分片大小默认是由Java long类型表示的最大值。这样做的效果是:当它的 值被设置成小于块大小时,将强制分片比块小。
分片的大小由以下公式计算:
max(minimumSize,min (maximumSize, blockSize))
默认情况下:
minimumSize<blockSize<niaximumSize
所以分片的大小就是blocksize。这些参数的不同设置及其如何影响最终分片大 小请参见表7_5的说明。
表7-5.举例说明如何控制分片的大小
最小分片大小 | 最大分片大小 | 块的大小 | 分片大小 | 说明 |
---|---|---|---|---|
1(默认值) | Long.MAX_VALUE(默认值) | 64 MB(默认值) | 64 MB | 默认情况下,分片大小 与块大小相同 |
1(默认值) | Long.MAX_VALUE(默认值) | 128MB | 128MB | 增加分片大小最自然 的方法是提供更大的 HDFS块,通过dfs. block.size或在构建 文件时针对单个文件进 行设置 |
128MB | Long.MAX VALUE(默认值) | 64 MB(默认值) | 128 MB | 通过使最小分片大小的 值大于块大小的方法来 增大分片大小,但代价 是增加了本地操作 |
1(默认值) | 32 MB | 64 MB | 32 MB | 通过使最大分片大小的 值大于块大小的方法来 减少分片大小 |
小文件与 CombineRlelnputFormat
相对于大批量的小文件,Hadoop更合适处理少量的大文件。一个原因是 FileInputFormat生成的InputSplit是一个文件或该文件的一部分。如果文件 很小(“小”意味着比HDFS的块要小很多),并且文件数量很多,那么每次map任 务只处理很少的输入数据,(一个文件)就会有很多map任务,每次map操作都会 造成额外的开销。请比较分割成16个64MB块的1 GB的一个文件与100 KB的 10 000个文件。10 000个文件每个都需要使用一个map操作,作业时间比一个文 件上的16个map操作慢几十甚至几百倍。
CombineFileInputFormat可以缓解这个问题,它是针对小文件而设计的。 FileInputFormat为每个文件产生1个分片,而CombineFileInputFormat把多 个文件打包到一个分片中以便每个mapper可以处理更多的数据。关键是,决定哪 些块放入同一个分片时,CombineFileInputFormat会考虑到节点和机架的因 素,所以在典型MapReduCe作业中处理输入的速度并不会下降。
当然,如果可能,应该尽量避免许多小文件的情况,因为MapReduce处理数据的 最佳速度最好与数据在集群中的传输速度相同,而处理小文件将增加运行作业而必 需的寻址次数。还有,在HDFS集群中存储大量的小文件会浪费namenode的内 存。一个可以减少大量小文件的方法是使用SequenceFile将这些小文件合并成 一个或多个大文件:可以将文件名作为键(如果不需要键,可以用NullWritable 等常量代替),文件的内容作为值。参见例7-4。但如果HDFS中已经有大批小文 件,CombineFileInputFormat 方法值得一试。
由于CombineFileInputFormat是一个抽象类,没有提供实体类(不同于 FileInputFormat),所以使用的时候需要一些额外的工作(希望日后会有一些通用 的实现添加入库)。例如,如果要使CombineFileInputFormat与TextlnputFormat 相同,需要创建一个CombineFileInputFormat的具体子类,并且实现 getRecordReader()方法。
文本输入
Hadoop非常擅长处理非结构化文本数据。本节讨论Hadoop提供的用于处理文本 的不同 InputFormat。
TextInputFormat
TextInputFormat是默认的InputFormat。毎条记录是一行输入。键是 LongWritable类型,存储该行在整个文件中的字节偏移量。值是这行的内容,不 包括任何行终止符(换行符和回车符),它是Text类型的。所以,包含如下文本的 文件被切分为毎个分片4条记录:
On the top of the Crumpetty Tree The Quangle Wangle sat, But his face you could not see, On account of his Beaver Hat.
每条记录表示为以下键/值对:
(0, On the top of the Crumpetty Tree) (33, The Quangle Wangle sat,) (57, But his face you could not see,) (89, On account of his Beaver Hat.)
很明显,键并不是行号。一般情况下,很难取得行号,因为文件按字节而不是按行 切分为分片。每个分片单独处理。行号实际上是一个顺序的标记,即每次读取一行 的时候需要对行号进行计数。因此,在分片内知道行号是可能的,但在文件中是不 可能的》 然而,每一行在文件中的偏移量是可以在分片内单独确定的,而不需要分片,因为 每个分片都知道上一个分片的大小,只需要加到分片内的偏移量上,就可以获得在 整个文件中的偏移量了。通常,对于每行需要唯一标识的应用来说,有偏移量就足 够了。如果再加上文件名,那么它在整个文件系统内就是唯一的。当然,如果每一 行都是定长的,那么这个偏移量除以每一行的长度即可算出行号。
KeyVaIueTextlnputFormat
TextlnputFormat的键,即每一行在文件中的字节偏移量,通常并不是特别有 用。通常情况下,文件中的每一行是一个键/值对,使用某个分界符进行分隔,比 如制表符。例如以下由Hadoop默认OutputFormat(即TextOutputFormat)产生 的输出。如果要正确处理这类文件,KeyVaIueTextlnputFormat比较合适。
可以通过key.value.separator.in.input.line属性来指定分隔符。它的默认 值是一个制表符。以下是一个不例,其中一表不一个(水平方向的)制表符:
linel —On the top of the Crumpetty Tnee line2 —The Quangle Wangle sat, line3 —But his face you could not see, line4 —On account of his Beaver Hat.
与TextlnputFormat类似,输入是一个包含4 条记录的分片,不过此时的键是每 行排在Tab之前的Text序列:
(line1,On the top of the Crumpetty Tnee) (line2,The Quangle Wangle sat,) (line3,But his face you could not see,) (line4,On account of his Beaver Hat.)
NLineInputFOrmat
通过 TextlnputFormat 和 KeyVaIueTextlnputFormat,每个 mapper 收到的输入 行数不同。行数依赖于输入分片的大小和行的长度。如果希望mapper收到固定行 数的输入,需要使用 NLineInputFormat 作为 lnputFormat。与 TextlnputFormat 一样,键是文件中行的字节偏移量,值是行本身。
N是每个rapper收到的输入行数。N设置为1(默认值)时,每个mapper会正好收 到一行输入。mapred.line.input.format.linespermap 属性控制 N 的值。仍 然以刚才的4行输入为例:
On the top of the Crumpetty Tree The Quangle Wangle sat, But his face you could not see, On account of his Beaven Hat.
例如,如果N是2,则每个输入分片包含两行。一个mapper会收到前两行键/值 对:
(0, On the top of the Crumpetty Tnee) (33, The Quangle Wangle sat,) 另一个mapper会收到后两行: (57, But his face you could not see,) (89, On account of his Beaven Hat.)
键与值与TextInputFormat生成的一样。不同在于输入分片的构造方法。
通常来说,对少量输人行执行map任务是比较低效的(由于任务初始化的开销),但 有些应用程序会对少量数据做一些扩展的(也就是CPU密集型的)计算任务,然后 产生输出。仿真是一个不错的例子。通过生成一个指定输入参数的输入文件,每行 一个参数,便可以执行一个参数扫描分析(parameter sweep):并发运行一组仿真试 验,看模型是如何随参数不同而变化的。
另一个例子是用Hadoop引导从多个数据源(如数据库)加载数据。创建一个“种 子”输入文件,记录所有的数据源,一行一个数据源。然后每个mapper分到一个 数据源,并从这些数据源中加载数据到HDFS中。这个作业不需要reduce阶段, 所以reducer的数量应该被设成0(通过调用Job的setNumReduceTasks()来设 置)。MapReduce作业就可以处理加载到HDFS中的数据。
XML
大多数XML解析器会处理整个XML文档,所以如果一个大型XML文档由多个输 入分片组成,那么单独处理每个分片就有挑战了。当然,可以在一个mapper上(如 果这个文件不是很大)。
由很多“记录”(此处是XML文档片断)组成的XML文档,可以使用简单的字符 串匹配或正则表达式匹配的方法来查找记录的开始标签和结束标签,而得到很多记 录。这可以解决由MapReduce框架进行分割的问题,因为一条记录的下一个开始 标签可以通过简单地从分片开始处进行扫描轻松找到,就像TextInputFormat确 定新行的边界一样。
Hadoop 提供了 StreamXmlRecordReader类(在org .apache. hadoop. streaming 包中,它也可以在Streaming之外使用)。通过把输入格式设置为 StreamInputFormat , 把 stream.recordreader.class 属性设置为 org.apache . Hadoop•Streaming.StreamXmlRecordReader 来 使 用 StreamXmlRecOrdReader类。reader的配置方法是:通过作业配置属性来设置 reader开始标签和结束标签(详情参见该类的帮助文档)。
例如,维基百科用XML格式来提供大量数据内容,非常适合用MapReduCe来并行 处理。数据包含在一个大型的XML打包文档中,文档中有一些元素,例如包含每 页内容和相关元数据的page元素。使用StreamXmlRecordReader后,这些 page元素便可解释为一系列的记录,交由一个mapper来处理。
二进制输入
Hadoop的MapReduce不只是可以处理文本信息,它还可以处理二进制格式的数据。
SequenceFilelnputFormat
Hadoop的顺序文件格式存储二进制的键/值对的序列。由于它们是可分割的(它们 有同步点,所以reader可以从文件中的任意一点与记录边界进行同步,例如分片 的起点),所以它们很符合MapReduce数据的格式,并且它们还支持压缩,可以使 用一些序列化技术来存储任意类型。
如果要用顺序文件数据作为MapReduce的输入,应用sequenceFilelnputFormat。 键和值是由顺序文件决定,所以只需要保证map输入的类型匹配。例如,如果输 入文件中键的格式是IntWritable,值是Text,那么就像第4章生成的那样, mapper 的格式应该是 Mapper<IntWritable, Text, K, V>,其中 K 和 V 是这个 mapper输出的键和值的类型。
SequenceFileAsTextlnputFormat
sequenceFileAsTextInputFormat 是 sequenceFileInputFormat 的变体,它 将顺序文件的键和值转换为Text对象。这个转换通过在键和值上调用toString() 方法实现。这个格式使顺序文件作为Streaming的合适的输入类型。
SequenceFileAsBinarylnputFormat
SequenceFileAsBinaryInputFormat 是 SequenceFileInputFormat 的一种变 体,它获取顺序文件的键和值作为二进制对象。它们被封装为BytesWritable对 象,因而应用程序可以任意地解释这些字节数组。结合使用SequenceFile.Reader 的appendRaw方法,它提供了在MapReduce中可以使用任意二进制数据类型的 方法(作为顺序文件打包),然而,插入Hadoop序列化机制通常更简洁
多种输入
虽然一个MapReduce作业的输入可能包含多个输入文件(由文件glob、过滤器和路 径组成),但所有文件都由同一个InputFormat和同一个Mapper来解释。然而, 数据格式往往会随时间演变,所以必须写自己的mapper来处理应用中的遗留数据 格式。或,有些数据源会提供相同的数据,但是格式不同。对不同的数据集进行连 接00化,也称“联接”)操作时,便会产生这样的问题。例如,有些数据可能是使用制表符分隔的文本文件,另一 些可能是二进制的顺序文件。即使它们格式相同,它们的表示也可能不同,因此需 要分别进行解析。
这些问题可以用MultipleInputs类来妥善处理,它允许为每条输入路径指定 InputFormat和Mapper。例如,我们想把英国Met Office的气象数据和NCDC 的气象数据放在一起来分析最高气温,则可以按照下面的方式来设置输入路径:
MultipleInputs.addInputPath(conf, ncdcInputPath, TextInputFormat.class, MaxTemperatureMapper.class) MultipleInputs.addInputPath(conf,metofficeInputPath, TextInputFormat.class, MetofficeMaxTemperatureMapper.class);
这段代码取代了对 FileInputFormat.addInputPath()和 conf.setMapperClass() 的常规调用。Met Office和NCDC的数据都是文本文件,所以两者都使用 TextInputFormato但这两个数据源的行格式不同,所以我们使用了两个不一样的 mapper。MaxTemperatureMapper读取NCDC的数据并抽取年份和气温字段的 值。MetOfficeMaxTemperatureMapper 读取 Met Office 输入数据,抽取年份和 气温字段的值。重要的是两个mapper的输出类型一样,因此,reducer看到的是聚 集后的map输出,并不知道这些输入是由不同的mapper产生的。
数据库输入(和输出)
DBInputFormat这种输入格式用于使用JDBC从关系数据库中读取数据。因为它 没有任何共享能力,所以在访问数据库的时候必须非常小心,在运行太多的 mapper数据库中读数据可能会使数据库受不了。正是由于这个原因, DBInputFormat最好用于加载小量的数据集,如果需要与来自HDFS的大数据集 连接,要使用MultipleInputs。与之相对应的输出格式是DBOutputFormat,它 适用于将作业输出数据(中等规模的数据)转储到数据库。
在关系数据库和HDFS之间移动数据的另一个方法是:使用Sqoop,具体描述见第 15章。
HBase的TableInputFormat的设计初衷是让MapReduce程序操作存放在HBase 表的数据。TableOutputFormat的设计初衷是把MapReduce的输出写到HBase。