Hadoop教程

基于文件的数据结构

对于某些应用而言,需要特殊的数据结构来存储自己的数据。对于基于 MapReduce的数据处理,将每个二进制数据的大对象(bolb)融入自己的文件中并不能实现很高的可扩展性,所以针对上述情况,Hadoop开发了一组更高层次的容器。

SequenceFile

考虑日志文件,其中每一条日志记录是一行文本。如果想记录二进制类型,纯文本 是不合适的。这种情况下,Hadoop的SequenceFile类非常合适,因为上述类提 供了二进制键/值对的永久存储的数据结构。当作为日志文件的存储格式时,你可 以自己选择键,比如由LongWritable类型表示的时间戳,以及值可以是writable类型,用于表示日志记录的数量。

SequenceFiles同样也可以作为小文件的容器。而HDFS和MapReduce是针对大 文件进行优化的,所以通过SequenceFile类型将小文件包装起来,可以获得更高效率的存储和处理。

SequenceFile的写操作

通过createWriter()静态方法可以创建SequenceFile对象,并返回 SequenceFile.Writer实例。该静态方法有多个重载版本,但都需要指定代写入的数据流(FSDataOutputStream 或 FileSystem 对象和 Path 对象), Configuration对象,以及键和值的类型。另外可选参数包括压缩类型以及相应 的codec,Progressable回调函数用与通知写入的进度,以及在SequenceFile 头文件中存储的Metadata实例。

存储在SequenceFile中的键和值并不一定需要时Writable类型。任一可以通过 Serialization类实现序列化和反序列化的类型均可被使用。

一旦拥有SequenceFile.Writer实例,就可以通过append()方法在文件末尾附 加键/值对。写完后,可以调用close()方法(SequenceFile.Writer实现了 java.io.Closeable 接 口)。

例4-12显示了通过上述API将键/值对写入SequenceFile的小段代码。

例 4-12.写入 SequenceFile 对象

public class SequenceFileWniteDemo {
    private static final Stning[] DATA = {
        "One, two, buckle my shoe",
        "Three, four, shut the door",
        "Five, six, pick up sticks",
        "Seven, eight, lay them straight",
        "Nine, ten, a big fat hen"
    };
    public static void main(Stning[] angs) throws IOException {
        String uni = angs[0];
        Configuration conf = new Configunation();
        FileSystem fs = FileSystem.get(URI.cneate(uni), conf);
        Path path = new Path(uni);
        IntWritable key = new IntWnitable();
        Text value = new Text();
        SequenceFile.Writer writer = null;
        try 
        {
            writer = SequenceFile.cneateWniten(fs, conf, path,
            key.getClass(), value.getClass());
            
            for (int i = 0; i < 100; i++) {
                key.set(100 - i);
                value.set(DATA[i % DATA.length]);
                System•out•printf("[Xs]\t%s\t%s\n",writer.getLength(), key, value);
                writer.append(key, value);
            }
        }finally {
            IOUtils.closeStream(writer);
        }
    }
}

顺序文件中存储的键是从100到1降序排列的整数,表示为IntWritable对象。 值为Text对象。在将每条记录追加到SequenceFile.Writer实例末尾之前,我 们需要使用getLength()方法来获取文件访问的当前位置。在下一小节,如果我 们并不是按顺序读取文件时,则使用上述信息作为记录的边界。我们把这个位置信息和键/值对输出到控制台。结果如下所示:

% hadoop SequenceFileMriteDemo numbers.seq
[128]  100 One, two, buckle my shoe
[173]  99  Three, four, shut the door
[220]  98  Five, six, pick up sticks
[264]  97  Seven, eight, lay them straight
[314]  96  Nine, ten, a big fat hen
[359]  95  One, two, buckle my shoe
[404]  94  Three, four, shut the door
[451]  93  Five, six, pick up sticks
[495]  92  Seven, eight, lay them straight
[545]  91  Nine, ten, a big fat hen
...
[1976] 60  One, two, buckle my shoe
[2021] 59  Three, four, shut the door
[2088] 58  Five, six, pick up sticks
[2132] 57  Seven, eight, lay them straight
[2182] 56  Nine, ten, a big fat hen
...
[4557] 5   One, two, buckle my shoe
[4602] 4   Three, four, shut the door
[4649] 3   Five, six, pick up sticks
[4693] 2   Seven, eight, lay them straight
[4743] 1   Nine, ten, a big fat hen

读取 SequenceFile

从头到尾读取顺序文件的过程是创建SequenceFile.Reader实例后反复调用next()方法迭代读取记录的过程。读取的是哪条记录与你使用的序列化框架相 关。如果你使用的是Writable类型,那么通过键和值作为参数的next()方法可以将数据流中的下一条键值对读入变量中:

public boolean next(Wnitable key. Writable val)

如果键值对成功读取,则返回ture,如果已读到文件末尾,则返回false。 对于其他的,非writable类型的序列化框架(比如Apache Thrift),则需要使用下 述方法:

public Object next(Object key) throws IOException
public Object getCunnentValue(Object val) throws IOException

在上述情况下,需要确保在io.serializations属性中已经设置了你想使用的序 列化框架,参见“序列化框架”小节。 如果next()方法返回的是非-null对象,则可以从数据流中读取键值对,并且可 以通过getCurrentValue()方法读取该值。否则,如果next()返回null值,则 表示已经读到文件末尾。

例4-13中的程序显示了如何读取包含有Writable类型的键值对的顺序文件。需 要注意如果通过调用getKeyClass()方法和getValueClass()方法或 SequenceFile中所使用的类型,然后通过ReflectionUtils对象常见键和值的 实例。通过这个技术,该程序可用于处理有writable类型键值对的顺序文件。

例4-13.读取SequenceFile

public class SequenceFileReadDemo {
    public static void main(String[] angs) throws IOException 
    {
        String uni = args[0];
        Configuration conf = new Configunation();
        FileSystem fs = FileSystem.get(URI.cneate(uni), conf);
        Path path = new Path(uni);
        SequenceFile.Readen readen = null; 
        try 
        {
            neader = new SequenceFile.Reader(fs^ path, conf);
            Writable key = (Writable)
            ReflectionUtils•newInstance(reader.getKeyClass(), conf);
            Writable value = (Writable)
            ReflectionUtils.newInstance(readen.getValueClass(), conf);
            long position = reader.getPosition(); 
            while (neader.next(keyj value)) 
            {
                String syncSeen = neaden.syncSeen() ? "*" : "";
                System.out.pnintf("[%s%s]\t%s\t%s\n",position,syncSeen,key,value);
                position = neader.getPosition(); // beginning of next record
            }
        } 
        finally 
        {
            IOUtils.closeStneam(reader);
        }
    }
}

该程序的另一特性是能够显示顺序文件中同步点的位置信息。所谓同步点是指当数 据读取的实例出错后能够再一次与记录边界同步的数据流中的一个位置一例如, 在数据流中搜索到任意位置后。同步点是由SequenceFile.Writer记录的,后者 在顺序文件写入过程中插入一个特殊项以便毎隔几个记录便有一个同步标识。这样 的特殊项非常小,因而只造成很小的存储开销,不到1%。同步点始终位于记录的 边界处。 运行例4-13中的程序可以显示由信号表示的顺序文件中的同步点。第一同步点位 于2021处(第二个位于4075处,但本例中并没有显示出来):

% hadoop SequenceFileReadDemo numbers.seq
[128]	100	One, two, buckle my shoe
[173]	99	Three, four, shut the door
[220]	98	Five, siXj pick up sticks
[264]	97	Seven, eight, lay them straight
[314]	96	Nine, ten, a big fat hen
[359]	95	One, two, buckle my shoe
[404]	94	Three, four, shut the door
[451]	93	Five, six, pick up sticks
[495]	92	Seven, eight, lay them straight
[545]	91	Nine, ten, a big fat hen
[590]	90	One, two, buckle my shoe
...
[1976] 60	One, two, buckle my shoe
[2021*] 59	Three, four, shut the door
[2088] 58	Five, six, pick up sticks
[2132] 57	Seven, eight, lay them straight
[2182] 56	Nine, ten, a big fat hen
...
[4557] 5       One, two, buckle my shoe
[4602] 4       Three, four, shut the door
[4649] 3       Five, six, pick up sticks
[4693] 2       Seven, eight, lay them straight
[4743] 1       Nine, ten, a big fat hen

在顺序文件中捜索给定位置有两种方法。第一种是调用seek()方法,由此可以读取文件中的给定位置。例如,可以按如下搜索记录的边界:

reader.seek(B59);
assertThat(reader.next(key, value), is(true));
assertThat(((IntWritable) key).get(),is(95));

但是如果给定的位置不是记录的边界,则在调用next()方法时发生错误:

reader.seek(360);
reader.next(key, value); // fails with IOException

第二种方法是通过同步点找到记录边界。SequenceFile.Reader对象的 sync(long卩0511100方法可以将读取位置定位到position之后的下一个同步 点。如果position之后没有同步了,那么当前读取位置将指向文件末尾。这样, 我们对数据流中的任意位置调用sync()方法——例如非记录边界——而且可以从 新定位到下一个同步点并继续向后读取:

neaden.sync(360);
assentThat(reader.getPosition(), is(2021L));
assertThat(reader.next(key, value), is(true));
assertThat(((IntWnitable) key).get(), is(59));

SequenceFile.writer对象包含一个sync方法,该方法可以在数 据流的当前位置插入同步点。这里请不要把它和同名的Syncable接口中定义的sync()方法混为一谈,后者用于底层设备缓冲区的同步。

可以将加入同步点的顺序文件作为MapReduce的输入,因为该类顺序文件允许切 分,由此该文件的不同部分可以由独立的map任务处理。

通过命令行接口显示SequenceFile对象

hadoop fs 命令有一个-text选项,可以以文本形式显示顺序文件的内容。该选 项可以查看文件的代码,由此检测出文件的类型并适当将其转换成文本。该选项可 以识别gzip压缩的文件和顺序文件,否则,假设输入为纯文本文件。

对于顺序文件,如果键和值是有具体含义的字符串表示,上述命令是非常有用的 (通过toString()方法定义)。同样,如果有自己定义的键或值的类,则需要确保 它们在Hadoop类路径目录下。

对上一节中我们创建顺序文件执行上述命令,我们可以得到如下结果:

% hadoop fs -text numbers.seq | head
100 One, two, buckle my shoe
99  Three, four, shut the door
98  Five, six, pick up sticks
97  Seven, eight, lay them straight
96  Nine, ten, a big fat hen
95  One, two, buckle my shoe
94  Three, four, shut the door
93  Five, six, pick up sticks
92  Seven, eight, lay them straight
91  Nine, ten, a big fat hen

排序和合并顺序文件

MapReduce是对多个顺序文件进行排序(或合并)最有效的方法。MapReduce本身具 有并行执行能力,并且可由你指定reducer的数量(该数决定着输出分区数)。例 如,通过指定一个reducer,可以得到一个输出文件。我们可以使用Hadoop发行版 自带的例子,在该例中通过指定键和值的类型可以指定输入和输出为顺序文件:

% hadoop jar $HADOOP_INSTALL/hadoop-*-examples.jar sort -r 1 \
-inFormat org.apache.hadoop.mapred.SequenceFileInputFormat \
-outFormat org.apache.hadoop.mapred.SequenceFileOutputFormat \
-outKey org.apache.hadoop.io.IntMritable \
-outValue org.apache.hadoop.io.Text \
numbers.seq sorted
% hadoop fs -text sorted/part-00000 | head
1       Nine, ten, a big fat hen
2       Seven, eight, lay them straight
3       Five, six, pick up sticks
4       Three,four, shut the door
5       One,two, buckle my shoe
6       Nine, ten, a big fat hen
7       Seven, eight, lay them straight
8       Five, six, pick up sticks
9       Three, four, shut the door
1       One, two, buckle my shoe

通过MapReduce实现排序/归并的另一种方法是使用SequenceFile.Sorter类中sort()方法和marge()方法。上述方法比MapRedUCe更早出现,并且提供的功 能比MapRedUCe的更底层(例如,为了实现并行,你需要手动对数据进行分区),所 以更常见的情况是在传统的MapReduce中实现顺序文件的排序和合并。

顺序文件的格式

顺序文件由文件头和随后的一条或多条记录组成(参见图4-2)。顺序文件的前三个 字节为SEQ(顺序文件代码),紧随其后的一个字节表示顺序文件的版本号。文件头 还包括其他一些字段,包括键和值相应类的名称,数据压缩细节,用户定义的元数 据,以及同步标识。回想同步标识主要用于读取文件的时候能够从任意位置开始 识别记录边界。每个文件有随机生成的同步标识,该同步标识的内容存储在文件头 中。同步标识位于顺序文件中的记录与记录之间。同步标识的额外存储开销要求小 于1%,所以没有必要在每条记录末尾添加该标识(特别是比较短的记录)。

图4-2.文件和记录都未压缩的顺序文件的内部结构

记录的内部结构与是否启用压缩有关。如果启用,则与是记录压缩还是数据块压缩 有关。 如果没有启用压缩(默认情况),那么每条记录有记录长度(字节数)、键长度、键和 值组成。长度字段为4字节长的整数,并且需要遵循java.io.DataOutput类中 writeInt()方法的协定。通过为数据写入顺序文件而定义的Serialization 类,可以实现对键和值的序列化。

记录压缩的格式与无压缩情况相同,只不过值需要通过文件头中定义的压缩codec 进行压缩。注意,键是不会压缩的。

块压缩一次对多条记录进行压缩,因此相较于单条记录压缩,压缩效率更高,因为 可以利用记录间的相似性进行压缩。参见表4-3。可以不断向数据块中压缩记录, 直到块的字节数不小于io.seqfile.compress.blocksize属性中设置的字节 数:默认为1 MB。每一个新的块的开始处都需要插入同步标识。数据块的格式如 下:首先是一个指示数据块中字节数的字段,紧接着是4个压缩字段(键长度、 键、值长度和值。


MapFile

MapFile是已经排序的SequenceFile,它已加入用于捜索键的索引。可以将 MapFile视为java.util.Map的持久化形式(尽管它并没有实现该接口),它的大 小有可能超过保存在内存中一个map的大小。

图4-3.块压缩的顺序文件的内部结构

写入 MapFile

MapFile的写入类似于SequenceFile的写入。首先新建一个MapFile.Writer 实例,然后调用append()方法将条目顺序写入。如果不按顺序写入条目,将抛出一个IOException异常。键必须是WritableComparable类型的实例,值必须是writable类型的实例,这与SequenceFile中对应的正好相反,后者为其条目使 用任意序列化框架。

例4-14中的程序新建了一个MapFile对象,并写入一些记录。与例4-12中新建 SequenceFile对象的程序非常相似。

例4-14.写入AMapFile

public class MapFileWriteDemo {
   private static final String[] DATA = { 
       "One, two, buckle my shoe",
       "Three, four, shut the door",
       "Five, six, pick up sticks",
       "Seven, eight, lay them straight",
       "Nine, ten, a big fat hen"
   };
   public static void main(String[] args) throws IOException {
       String uri = args[0];
       Configuration conf * new Configuration();
       FileSystem fs = FileSystem.get(URI.create(uri), conf);
       IntWritable key = new IntWritable();
       Text value = new Text();
       MapFile.Writer writer = null;
       try {
           writer = new MapFile.Writer(conf, fs, uri,
           key.getClass(), value.getClass());
           for (int i = 0; i < 1024; i++) {
           key.set(i + 1);
           value.set(DATA[i X DATA.length]);
           writer.append(key, value);
      }
      finally {
          IOUtils.closeStneam(wniter);
      }
    }
}

使用上述程序构建MapFile对象:

% hadoop MapFileWniteDemo numbens.niap

如果我们观察MapFile,我们发现它实际上是一个其中包含data和index两个文 件的文件夹:

% ls -1 numbers.map
total 104
-rw-r--r-- 1 tom tom 47898 Jul 29 22:06 data
-rw-r--r-- 1 tom tom 251 Jul 29 22:06 index

两个文件均是SequenceFile,data文件包含所有记录,依次为:

% hadoop fs -text numbers.map/data | head
1    One, two, buckle my shoe
2    Three, four, shut the door
3    Five, six, pick up sticks
4    Seven, eight, lay them straight
5    Nine, ten, a big fat hen
6    One, two, buckle my shoe
7    Three, four, shut the door
8    Five, six, pick up sticks
9    Seven, eight, lay them straight
10   Nine, ten, a big fat hen

index文件包含一部分键和data文件中键到该键偏移量的映射:

% hadoop fs -text numbers.map/index
1   128
129 6079
257 12054
385 18030
513 24002
641 29976
769 35947
897 41922

从输出中我们可以看到,默认情况下只有每隔128个键才有一个包含在index文件 中,当然也可以通过调用MapFile.Writer实例中的setIndexInterval()方法 来设置io.map.index.interval属性即可。增加索引间隔数量可以有效减少 MapFile中用于存储索引的内存。相反,可以降低该间隔来提高随机访问时间(因 为减少了平均跳过的记录数),这是以提髙内存使用量为代价的。

因为索引只是键的一部分,所以MapFile无法枚举或计算它所包含的所有键。唯一的办法是读取整个文件。

读取 MapFile

在MapFile依次遍历文件中所有条目的过程类似于SequenceFile中的过程:首 先新建MapFile.Reader实例,然后调用next()方法,直到返回值为false,该 值表示没有条目返回,因为已经读到文件末尾:

public boolean next(WritableComparable key, Writable val) throws IOException

通过调用get()方法可以随机访问文件中的数据:

public Writable get(WritableComparable key,  Writable val) throws IOException

返回值可用于确定是否在MapFile中找到相应的条目;如果是null,说明指定 key没有相应的条目。如果找到相应的条目,则将该键对应的值读入val变量,通过方法调用的返回值。

这有助于理解实现过程。下面的代码是我们在前一小节中建立的,用于检索 MapFile中某一条目:

Text value = new Text();
reader.get(new IntWritable(496), value〉;
assertThat(value.toString(), is("One, twO, buckle my shoe"));

对于这个操作,MapFile.Reader首先将index文件读入内存(由于索引是缓存的, 所以后续的随机访问将使用内存中的同一索引)。接着对内存中的索引进行二分査 找,最后找到小于或等于搜索索引的键,496。在本例中,找到的键位385,对应 的值为18030,该值为data文件中的偏移量,接着顺序读取data文件中的键,知 道读取到496为止。至此,才找到键所对应的值,最后从data文件中读取相应的 值。整体而言,一次查找需要一次磁盘寻址和一次最多有128个条目的扫描。对于 随机访问而言,这是非常髙效的。

getClost()方法与get()方法类似,不同的是前者返回与指定键匹配的最近的 键,并不是在不匹配时返回null。更准确地说,如果的MapFile包含指定的键,则 返回对应的条目;否则,返回MapFile大于(或小于,由相应的boolean参数指定) 指定键的第一个键。

大型MapFile的索引会占据大量内存。可以不选择在修改索引间隔之后重建索 引,而是在读取索引时设置io.mao.index.skip属性来加载一部分索引键。该属 性通常设置为0,这意味着不跳过索引键;如果设置为1,则表示毎次跳过索引键 中的一个(也就是索引键中的每隔一个键),如果设置为2,则表示每次读取索引时跳过2个键(也就是说,只读索引三分之一的键),以此类推。设置大的跳跃值可以 节省大量的内存,但会增加搜索时间,因为平均而言,扫描的键更多。

将 SequenceFile 转换为 MapFile

在MapFile中搜索就相当于在索引和排过序的SequenceFile中搜索。所以我们 自然联想到把SequenceFile转换为MapFile。例4-15中的程序显示了对MapFile调用fix()静态方 法,该方法能够为MapFile重建索引。

例4-15.对MapFile再次创建索引

public class MapFileFixer {
    public static void main(String[] args) throws Exception {
        String mapUri = args[0];
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.create(mapUri), conf);
        Path map = new Path(mapUri);
        Path mapData = new Path(map, MapFile.DATA_FILE_NAME);
            // Get key and value types from data sequence file
        SequenceFile.Reader reader = new SequenceFile.Reader(fs, mapData, conf);
        Class keyClass = reader.getKeyClass();
        Class valueClass = reader.getValueClass();
        reader.close();
           // Create the map file index file
        long entries = MapFile.fix(fs, map, keyClass, valueClass, false, conf);
        System.out.printf("Created MapFile %s with %d entries\n", map, entries);
    }
}

Fix方法通常用于重建已损坏的索引,但是由于它能从头开始建立新的索引,所以此处我们可以使用该方法满足需求。具体的使用方法如下:

1.将名为numbers.seq的顺序文件排序后,保存到名为number.map的文件夹 下,该文件夹就是最终的MapFile(如果顺序文件已排过序,则可以眺过这一 步。只需要把这个文件复制到number.map/data文件,然后直接跳到第3步):

% hadoop jar $HADOOP_INSTALL/hadoop-*-examples.jar sort -r 1 \
-inFormat org.apache.hadoop.mapred.SequenceFileInputFormat \
-outFormat org.apache.hadoop.mapred.SequenceFileOutputFormat \
-outKey org.apache.hadoop.io.IntMritable \
-outValue org.apache.hadoop.io.Text \
numbers.seq numbers.map

2.将MapReduce的输出重命名为data文件:

% hadoop fs -mv numbers.map/part-00000 numbers.map/data

3.建立index文件:

% hadoop MapFileFixer numbers.map
Created MapFile numbers.map with 100 entries

关注微信获取最新动态