Hadoop教程

序列化

所谓序列化(serialiZation),是指将结构化对象转化为字节流,以便在网络上传输或写到磁盘进行永久存储。反序列化(deserialization)的是指将字节流转回结构化对象的 逆过程。

序列化在分布式数据处理的两大领域经常出现:进程间通信和永久存储。

在Hadoop中,系统中多个节点上进程间的通信是通过“远程过程调用”(remote procedure call, RPC)实现的。RPC协议将消息序列化成二进制流后发送到远程节 点,远程节点接着将二进制流反序列化为原始消息。通常情况下,RPC序列化格式 如下。

紧凑

紧凑的格式能够使我们充分利用网络带宽(它是数据中心中最稀缺的资源)。

快速

进程间通信形成了分布式系统的骨架,所以需要尽量减少序列化和反序列化的性能开销,这是最基本的。

可扩展

协议为了满足新的需求而不断变化,所以在控制客户端和服务器的过程中,需 要直接引进相应的协议。例如,需要能够在方法调用的过程中增添新的参数, 并且新的服务器需要能够接受来自老客户端的老格式的消息(无新增的参数)。

互操作

对于某些系统来说,希望能支持以不同语言写的客户端与服务器交互,所以需 要设计需要一种特定的格式来满足这一需求。

初识数据永久存储时,为它选择的数据格式需要有来自序列化框架的不同需求。毕 竟,RPC的存活时间不到1秒钟,然而永久存储的数据可能会在写到磁盘若干年 后才会被读取。由此,数据永久存储所期望的4个RPC序列化属性非常重要。我 们希望存储格式比较紧凑(进而高效使用存储空间)、快速(进而读写数据的额外开 销比较小)、可扩展(进而可以透明地读取老格式的数据)且可以互操作(进而可以使 用不同的语言读写永久存储的数据)。

Hadoop使用自己的序列化格式Writable,它格式紧凑,速度快,但很难用Java 以外的语言进行扩展或使用。因为Writable是Hadoop的核心(大多数MapReduce 程序都会为键和值使用它),所以在接下来的三个小节中,我们要进行深入探讨, 然后再从总体上看看序列化框架和Avro,后者是一个克服了 Writable少许局限 性的序列化系统。

Writable 接口

Writable接口定义了两个方法:一个将其状态写到DataOutput二进制流,另一 个从DataInput 二进制流读取其状态:

package org.apache.hadoop.io;
import java.io.DataOutput;
import java.io.DataInput;
import java.io.IOException;
public interface Writable 
{ 
    void write(DataOutput out) throws IOException;
    void readFields(DataInput in) throws IOException;
}

让我们通过一个特殊的Writable类来看看它的具体用途。我们将使用 IntWritable来封装一个Java int。我们可以新建一个并使用set()方法来设置 它的值:

IntWritable writable = new IntWnitable();
writable.set(163);

我们也可以通过构造函数来新建一个整数值:

IntWritable writable = new IntWnitable(163);

为 了 检查 IntWritable 的序列化形式,我们在 java.io.DataOutputStream (java.io.DataOutput的一个实现)中加入一个帮助函数来封装 java.io.ByteArrayOutputSteam,以便在序列化流中捕捉字节:

public static byte[] serialize(Writable writable) throws IOException 
{
    ByteArrayOutputStream out = new ByteArrayOutputStream();
    DataOutputStneam dataOut = new DataOutputStneam(out);
    writable.write(dataOut);
    dataOut.close();
    return out.toByteArray;
}

一个整数占用4个字节(因为我们使用JUnit4进行声明):

byte[] bytes = serialize(writable);
assertThat(bytes.length, is(4));

毎个字节是按照大端顺序写入的(按照java.io.DataOutput接口中的声明,最重要的字节先写到流),并且通过Hadoop的StringUtils,我们可以看到这些字节 的十六进制表示:

assertThat(StringUtils.byteToHexString(bytes), is("000000a3"));

让我们试试反序列化。我们再次新建一个帮助方法,从一个字节数组中读取一个 Writable 对象:

public static byte[] deserialize(Writable writable, byte[] bytes)throws IOException 
{
    ByteArrayInputStream in = new ByteArrayInputStream(bytes);
    DataInputStream dataIn = new DataInputStream(in);
    writable.readFields(dataIn);
    dataIn.close();
    return bytes;
}

我们构建了一个新的、空值的IntWritable对象,然后调用deserialize()S 法从我们刚写的输出数据中读取数据。然后我们看到它的值(通过get()方法获取数值)是原始的数值163:

IntWritable newWritable » new IntWritable();
deserialize(newWritable, bytes);
assertThat(newWritable.get(), is(163));

WritableComparable 和 comparator

IntWritable 实现了 WritableComparable 接口,该接口继承自 Writable 和 java.lang.Comparable 接口 :

package org.apache.hadoop.io;
public interface WritableComparableextends Writable ,Comparable{
}

对MapReduce来说,类型的比较是非常重要的,因为中间有个基于键的排序阶 段。Hadoop提供的一个优化接口是继承自Java Comparator的RawComparator 接口:

package ong.apache.hadoop.io;
import java.util.Comparator;
public interface RawCompanatorextends Companator{
  public int compare(byte[] bl, int s1, int l1, byte[] b2, int s2, int l2);
}

该接口允许其实现直接比较数据流中的记录,无须先把数据流反序列化为对象,这样便避免了新建对象的额外开销。例如,我们根据IntWritable接口实现的 comparator实现了compare()方法,该方法可以从每个字节数组bl和b2中读取 给定起始位置(s1和s2)以及长度(l1和l2)的一个整数进而直接进行比较。

nitableComparator 是对继承自 WnitableCompanable 类的 RawCompanator 类的一个通用实现。它提供两个主要功能。第一,它提供了对原始compare()方法的 一个默认实现,该方法能够反序列化将在流中进行比较的对象,并调用对象的compare()方法。第二,它充当的是RawComparator实例的工厂(已注册 Writable的实现)。例如,为了获得IntWratable的comparator,我们直接如下 调用:

RawCompanatorcomparator = WnitableCompanator.get (IntWnitable.class);

这个comparator可以用于比较两个IntWritable对象:

IntWritable w1 = new IntWnitable(163);
IntWritable w2 = new IntWnitable(67);
assentThat(companaton.compane(wl, w2), gneatenThan(0));

或其序列化表示:

byte[] bl = senialize(wl);
byte[] b2 = senialize(w2);
assentThat(companaton.compane(bl, 0, bl.length, b2, 0, b2.1ength),gneaterThan(0));

Writable 类

Hadoop自带的org.apache.hadoop.io包中有广泛的Writable类可供选择。它 们形成如图4-1所示的层次结构。

Java基本类型的Writable封装器

Writable类对Java基本类型(参见表4-6)提供封装,short和char除外(两者可 以存储在IntWritable中)。所有的封装包含get()和set()两个方法用于读取或设置封装的值。

图4-1 ■ Writable类的层次结构

表4-6. Java基本类型的Writable类

Java基本类型Writable 实现序列化大小(字节)
booleanBooleanWritable1
byteByteWritable1
intIntWritable4

VintWritable1~5
floatFloatWritable4
longLongWritable8

VlongWritable1~9
doubleDoubleWritable8

对整数进行编码时,有两种选择,即定长格式(IntWritale和LongWritable)和变长格式(IntWritable和VLongWritable)。需要编码的数值如果相当小(在- 127和127之间,包括-127和127),变长格式就只用一个字节进行编码;否则,使 用第一个字节来表示数值的正负和后跟多少个字节。例如,值163需要两个字节:

byte[] data = serialize(new VIntWritable(163));
assertThat(StringUtils.byteToHexString(data), is("8fa3"));

如何在定长格式和变长格式之间进行选择呢?定长格式非常适合对整个值域空间中 分布非常均匀的数值进行编码,如精心设计的哈希函数。大多数数值变量的分布都 不均勻,而且变长格式一般更节省空间。变长编码的另一个优点是你可以在 VIntWritable和VLongWritable转换,因为它们的编码实际上是一致的。所以选 择变长格式之后,便有增长的空间,不必一开始就用8 字节的long表示。

Text类型

Text是针对UTF-8序列的Writable类。一般可以认为它等价于Java.lang.String的Writable。Text替代了UTF8类,这并不是一个很好的替代,一者因为不支持对字节数超过32767的字符串进行编码,二者因为它使用的是Java的UTF-8修订版。

索引由于它着重使用标准的UTF-8编码,因此Text类和Java String类之间存 在一定的差别。对Text类的索引是根据编码后字节序列中的位置实现的,并非字 符串中的Unicode字符,也不是Java Char的编码单元(如String)。对于ASCII字 符串,这三个索引位置的概念是一致的。charAt()方法的用法如下例所示:

Text t=new Text("hadoop");
assertThat(t•getLength(), is(6));
assertThat(t.getBytes().length, is(6));

assertThat(t.charAt(2), is((int) ' d '));
assentThat("Out of bounds", t.charAt(100), is(-1));

注意charAt()方法返回的是一个表示Unicode编码位置的int类型值,与 String变种不同,它返回的是一个char类型值。Text还有一个find()方法, 该方法类似于String的indexof()方法:

Text t = new Text("hadoop");
assertThat("Find a substring", t.find("do"), is(2));
assertThat("Finds first 'o,'", t.find("o"), is(B));
assertThat("Finds 'o' from position 4 on later", t.find("o", 4), is(4));
assertThat("No match", t.find("pig"), is(-l));

Unicode 一旦开始使用需要多个字节来编码的字符时,Text和String之间的区 别就昭然若揭了。考虑表4-7显示的Unicode字符。

表 4-7. Unicode 字符

Unicode编码点U+0041U+00DFU+6771U+10400
名称LATIN
CAPITAL
LETTER A
LATIN SMALL
LETTER SHARPS
N/A(统一表示的汉字)DESERET CAPITAL
LETTER LONG I
UTF-8编码单元41c39fe69db1F0909080
Java表示\u0041\u00DF\u6771\uuD801\uDC00

所有字符(除了表中最后一个字符U+10400),都可以使用单个Java char类型来表 示U+10400是一个候补字符,并且需要两个Java char类型来表示,称为“字符代理对”(surrogate pair)。例4-5中的测试显示了处理一个字符串(表4-7中的由4个字符组成的字符串)时String和Text之间的差别。

例4-5.该测试表明String和Text的不同

public class StringTextComparisonTest {
    @Test
    public void string() throws UnsupportedEncodingException {
        String s = "\u0041\u00DF\u6771\uD801\uDC00";
        assertThat(s.length( ), is(5));
        assertThat(s.getBytes("UTF-8").length, is(10));
    
        assentThat(s.indexOf("\u0041"), is(0));
        assertThat(s.indexOf("\u00DF"), is(l));
        assertThat(s.indexOf("\u6771"), is(2));
        assertThat(s.indexOf("\uD801\uDC00"), is(3));
    
        assertThat(s.charAt(0), is('\u0041'));
        assertThat(s.charAt(l), is('\u00DF'));
        assertThat(s.charAt(2), is('\u6771'));
        assertThat(s.charAt(3), is('\uD801'));
        assertThat(s.charAt(4)j is('\uDC00'));
        assertThat(s.codePointAt(0), is(0x0041));
        assertThat(s.codePointAt(l), is(0x00DF));
        assertThat(s.codePointAt(2), is(0x6771));
        assertThat(s.codePointAt(3), is(0xl0400));
    }
    @Test
    public void text() 
    {
        Text t = new Text("\u0041\u00DF\u6771\uD801\uDC00");
        assertThat(t.getLength(), is(10));
        assertThat(t.find("\u0041"), is(0));
        assertThat(t.find("\u00DF"), is(l));
        assertThat(t.find("\u6771"), is(3));
        assertThat(t.find("\uD801\uDC00"), is(6));
        assertThat(t.charAt(0), is(0x0041));
        assertThat(t.charAt(1), is(0x00DF));
        assertThat(t.charAt(3), is(0x6771));
        assertThat(t.charAt(6), is(0xl0400));
    }
}

这个测试证实了 String的长度是其所含char编码单元的个数(5,来自该字符串 的前三个字符和最后的一个代理对),但Text对象的长度却是其UTF-8编码的字 节数(10=1+2+3+4)。相似的,String类中的indexOf()方法返回的char编码单 元中的索引位置,Text类的find()方法返回的则是字节偏移量。

当代理对不能代表整个Unicode字符时,String类中的charAt()<input type="password" >方法会根据指 定的索引位置返回char编码单元。根据char编码单元索引位置,需要codePointAt()方法来获取表示成int类型的单个Unicode字符。事实上,Text类 中的charAt()方法与String中的codePointAt更加相似(相较名称而言)。唯 一的区别是通过字节的偏移量进行索引。

迭代通过字节偏移量进行位置索引来实现对Text类Unicode字符的迭代是非常 复杂的,因为你不能简单地通过增加位置的索引值来实现。同时迭代的语法有些模 糊(参见例4-6):将Text对象转换为Java.nio.ByteBuffer对象,然后利用缓冲区对Text对象反复调用bytesToCodePoint()静态方法。该方法能够获取下一代 码的位置,并返回相应的int值,最后更新缓冲区中的位置。通过bytesToCodePoint()方法可以检测出字符串的末尾,并返回-1值。

例4-6.遍历Text对象中的字符

public class TextIterator {
    public static void main(String[] args) {
        Text t = new Text("\u0041\u00DF\u6771\uD801\uDC00");
        ByteBuffer buf = ByteBuffer.wrap(t.getBytes(), 0, t.getLength());
        int cp;
        while(buf.hasRemaining() && (cp = Text.bytesTbCodePoint(buf))!=-l)
            System.out.println(Integer.toHexString(cp));
     }
}

运行这个程序,打印出字符串中四个字符的编码点(code point):

% hadoop TextIterator
41
df
6771
10400

易变性与String相比,Text的另一个区别在于它是可变的(与所有Hadoop的 Writable接口实现相似,NullWritable除外,它是单实例对象)。可以通过调用 其中一个set()方法来重用了text实例。例如:

Text t = new Text("hadoop");
t.set("pig");
assertThat(t.getLength(), is(3));
assertThat(t.getBytes().length, is(3));

在某些情况下,getBytes()方法返回的字节数组可能比getLength()函数 返回的长度更长:

Text t = new Text("hadoop");
t.set(new Text("pig"));
assertThat(t.getLength(), is(3));
assertThat("Byte length not shortened", t.getBytes().length, is(6));

以上代码说明了为什么在调用getBytes()之前始终要调用getLength() 法,因为可以由此知道字节数组中多少字符是有效的。

对String重新排序Text类并不像java.lang.String类那样有丰富的字符串操 作API,所以在多数情况下,需要将Text对象转换成String对象。这一转换通 常通过调用toString()方法来实现:

assertThat(new Text("hadoop").toString(), is("hadoop"));

BytesWritable

BytesWritable是对二进制数据数组的封装。它的序列化格式为一个用于指定后面数据字节数的整数域(4字节),后跟字节本身。例如,长度为2的字节数组包含 数值3和5,序列化形式为一个4字节的整数(00000002)和该数组中的两个字节(03 和 05):

BytesWritable b = new BytesMritable(new byte[] { 3, 5 });
byte[] bytes = serialize(b);
assertThat(StringUtils.byteToHexString(bytes), is("000000020305"));

BytesWritable是可变的,并且它的值可以通过set()方法进行修改。和Text 相似,BytesWritable类的getBytes()方法返回的字节数组长度容量可 能无法体现BytesWritable所存储数据的实际大小。可以通过其getLength()方法来 确定BytesWritable的大小。示例如下:

b.setCapacity(11);
assertThat(b.getLength(), is(2));
assertThat(b.getBytes().length, is(11));

NullWritabie

NullWritable是Writable的一个特殊类型,它的序列化长度为0。它并不从数 据流中读取数据,也不写入数据。它充当占位符;例如,在MapReduce中,如果你不需要使用键或值,就可以将键或值声明为NullWritable——结果是存储常量 空值。如果希望存储一系列数值,与键/值对相对,NullWritable也可以用作在 SequenceFile中的键。它是一个不可变的单实例类型:通过调用NullWritable.get()方法可以获取这个实例。

ObjectWritable 和 GenericWritable

ObjectWritable 是对 Java 基本类型(String,enum, Writable, null 或这些类型组成的数组)的一个通用封装。它在Hadoop RPC中用于对方法的参数和返回类型进行封装和解封装。

当一个字段中包含多个类型时,ObjectWritable是非常有用的:例如,如果 SequenceFile中的值包含多个类型,就可以将值类型声明为ObjectWritable, 并将每个类型封装在一个ObjectWritable中。作为一个通用的机制,每次序列化都写封装类型的名称,这非常浪费空间。如果封装的类型数量比较少并且能够提 前知道,那么可以通过使用静态类型的数组,并使用对序列化后的类型的引用加入 位置索引提髙性能。这是GenericWritable类采取的方法,并且你可以在继承的子类中指定需要支持的类型。

Writable集合类

在 org.apache.hadoop.io 包中,有 4 个 Writable 集合类:ArrayWritable, TwoDArrayWritable, MapWritable 和 SortedMapWritable。

ArrayWritable和TwoDArrayWritable是对Writable的数组和两维数组(数组 的数组)的实现。ArrayWritable或TwoDArrayWritable中所有元素必须是同一 类的实例(在构造函数中指定),如下所示:

ArrayWritable writable = new ArrayWritable(Text.class);

如果writable根据类型来定义,例如SequenceFile的键或值,或更多时候作为MapReduce的输入,则需要继承众ArrayWritable(或相应的TwoDArray Writable 类)并设置静态类型。示例如下:

public class TextArrayWritable extends ArrayWritable {
    public TextArrayWritable() {
        super(Text.class);
    }
}

ArrayWritable 和 TwoDArrayWritable 都有 get(), set()和 toArray()方法, toArray()方法用于新建该数组(或二维数组)的一个“浅拷贝”(shallowcopy)。

MapWritable 和 SortedMapWritable 分别实现了 java.util.Map和 java.util/SortedMap。每个 键/值字段使用的类型是相应字段序列化形式的一部分。类型存储为单个字节(充当 类型数组的索引)。在org.apache.hadoop.io包中,数组经常与标准类型结合使 用,而定制的writable类型也通常结合使用,但对于非标准类型,则需要在包头 中指明所使用的数组类型。根据实现,MapWritable类和SortedMapWritable 类通过正byte值来指示定制的类型,所以在MapWritable和 SortedMapWritable实例中最多可以使用127个不同的非标准Wirtable类。下 面显示的是使用了不同键和值类型的MapWritable实例:

MapWritable src = new MapWritable();
src.put(new IntWritable(1), new Text("cat"));
src.put(new VIntWritable(2), new LongWritable(163));

MapWritable dest = new MapWritable();
WritableUtils.cloneInto(dest, src);
assertThat((Text) dest.get(new IntWritable(l)), is(new Text("cat")));
assertThat((LongWritable) dest.get(new VIntWritable(2)), is(new
LongWritable(163)));

显然,可以通过Writable集合类来实现集合和列表。可以使用MapWritable类 型(或针对排序集合,使用SortedMapWritable类型)来枚举集合中的元素,用 NullWritable类型枚举值。对于单类型的Writable列表,使用 ArrayWritable就足够了,但如果需要把不同的Writable类型存储在单个列表 中,可以用GenericWritable将元素封装在一个ArrayWritable中。另一个可 选方案是,可以使用与MapWritable相似的观点来构造一个通用的 ListWritable。

实现定制的Writable类型

Hadoop有一套非常有用的Writable实现可以满足大部分需求,但在有些情况 下,我们需要根据自己的需求构造一个新的实现。有了定制的writable类型,就 可以完全控制二进制表示和排序顺序。由于Writable是MapReduce数据路径的核心,所以调整二进制表示能对性能产生显著效果。Hadoop自带的Writable实现已经过很好的性能调整,但如果希望将结构调整得更好,更好的做法往往是新建一 个Writable类型(而不是构建一个已有类型的组合)。

为了演示如何新建一个定制的Writable类型,我们需要写一个表示一对字符串的 实现,名为TextPair。例4-7显示了最基本的实现。

例4-7.存储一对Text对象的Writable类型

import java.io.*;
import org.apache.hadoop.io.*;

public class TextPair implements WnitableCompanable
{
    private Text first;
    private Text second;
    public TextPain() 
    {
        set(new Text(), new Text());
    }
    public TextPair(String first, String second) 
    {
        set(new Text(first), new Text(second));
    }
    public TextPair(Text first, Text second) 
    {
        set(first) second);
    }
    public void set(Text first, Text second) 
    {
        this.finst = first;
        this.second = second;
     }
    public Text getFinst() 
    {
        return first;
    }
    public Text getSecond() 
    {
        return second;
    }
    @Override
    public void wnite(DataOutput out) throws IOException 
    {
        first.write(out);
       second.wnite(out);
    }
    @Override
    public void readFields(DataInput in) throws IOException 
    {
        first.readFields(in);
        second.neadFields(in);
    }
    @Override
    public int hashCode() 
    {
        return first.hashCode() * 16B + second.hashCode();
    }
    @Override
    public boolean equals(Object o) 
    {
        if (o instanceof TextPair) 
        {
            TextPair tp = (TextPair) o;
            return first.equals(tp.finst) && second.equals(tp.second);
         }
         return false;
     }
    @Override
    public String toStning() 
    {
        return first + ''\t'' + second;
    }
    @Override
    public int compareTo(TextPain tp) 
    {
        int cmp = first.companeTo(tp.first),
        if (cmp != 0) {
            return cmp;
        }
        return second.compareTo(tp.second);
     }
}

这个定制的writable实现中,第一部分非常直观:有两个Text实例变量(first和second)和相关的构造函数(setter和getter,即设置函数和提取函数)。所有的 Writable实现都必须有一个默认的构造函数以便MapReduce框架可以对它们进行 实例化,然后再调用^3£^161£15()函数査看(填充)各个字段的值。Writable实 例具有易变性并且通常可以重用,所以应该尽量避免在write()和 readFields() 函数来分配对象。

通过让Text对象的自我表示,TextPair类的write()方法依次将每个Text对 象序列化到输出流中。类似的,通过每个Text对象的表示,readfileds()方法 对来自输入流的字节进行反序列化。DataOutput和DataInput接口有一套丰富 的方法可以用于对Java基本类型进行序列化和反序列化,所以,在通常情况下, 你可以完全控制Writable对象的在线上传输/交换(的数据)的格式(数据传输 格式)。

就相当于针对Java语言构造的任何值对象,需要重写java.lang.Object中的 hashCode(), equals(),toString()方法。HashPartitioner(MapReduce 中 的默认分区类)通常用hashCode()方法来选择reduce分区,所以你应确保有一个 比较好的哈希函数来确保每个reduce分区大小相似。

TextPair 是 WritableComparable 的一个实现,所以它提供了compareTo()方法,该方法可以强制数据排序:先按照第一个字符排序,如果第一个字符相同则按 照第二个字符排序。注意,前一小节中已经提到TextPair不同于TextArraywritable(可存储的Text对象数除外),因为TextArrayWritable只继承了 Writable,并没有继承 WritableComparable。

为速度实现一个RawComparator

例4-7中的TextPair代码按照其描述的方式运行;但我们可以进一步优化。当 TextPair 被用作 MapReduce中的键时,需要将数据流反序列化为对象,然后再调用compareTo() 方法进行比较。那么有没有可能看看它们的序列化表示就可以比较两个TextPair 对象呢?

事实证明,我们可以这样做,因为TextPair是两个Text对象连接而成的,而 Text对象的二进制表示是一个长度可变的整数,包含字符串之UTF-8表示的字节 数以及0了?-8字节本身。诀窍在于读取该对象的起始长度,由此得知第一个16<七 对象的字节表示有多长;然后将该长度传给Text对象的RawComparator方法, 最后通过计算第一个字符串和第二个字符串恰当的偏移量,这样便可以实现对象的 比较。详细过程参见例4-8(注意,这段代码已嵌入TextPair类)。

public static class Comparator extends WritableCompanator 
{
    private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();
    public Comparator() 
    {
        super(TextPair.class);
    }
    @Ovennide
    public int compare(byte[] b1,int s1, int l1,byte[] b2, int s2, int 12) 
    {
        try {
            int firstLl = WritableUtils.decodeVIntSize(bl[sl])+readVInt(b1, s1);
            int firstL2 = WnitableUtils.decodeVIntSize(b2[s2])+readVInt(b2, s2);
            int cmp = TEXT_COMPARATOR.compane(bl, sl, firstL1,b2,s2,firstL2);
            if (cmp != 0) {
                return cmp;
            }
            return TEXT_COMPARATOR.compare(bl, sl + firstLl, l1 - firstL1,b2, s2 + firstL2, 12 - firstL2);
        } catch (IOException e){
            throw new IllegalArgumentException(e);
       }
    }
}

 static {
   MritableComparator.define(TextPair.class,new Comparator());
 }

事实上,我们继承的是WritableComparable类而非直接实现RawComparator接 口,因为它提供了一些比较好用的方法和默认实现。这段代码最本质的部分是计算firstL1和firstL2,这两个参数表示毎个字节流中第一个Text字段的长度。两 者分别由变长整数的长度(由WritableUtils的decodeVIntSize()方法返回)和 编码值(在readVInt()方法返回)组成。

定制的 comparator

从Textpair可以看出,编写原始的comparator需要谨慎,因为必须要处理字节级别的细节。如果真的需要自己编写comparator,有必要参考org.apache.hadoop.io包中 对writable接口的实现。WritableUtils的工具方法也比较好用。

如果可能,定制的comparator也应该继承自RawComparator。这些comparator定义的排列顺序不同于默认comparator定义的自然排列顺序。例4-9显示了一个针对 TextPair 类型的 comparator,称为 FirstCompartator,它只考虑 TextPair 对 象的第一个字符串。注意,我们重载了针对该类对象的compare()方法,使两个compare()方法有相同的语法。

例4-9.定制的RawComparator用于比较TextPair对象字节表示的第一个字段

public static class FirstComparator extends WritableComparator {
     private static final Text.Comparator TEXT_COMPARATOR = new Text.Comparator();
     public FirstComparator() {
       super(TextPair.class);
    }
     @Override
     public int compare(byte[] bl,int sl, int l1,
     byte[] b2, int s2, int l2) {
     try {
       int firstL1 = WnitableUtils.decodeVIntSize(b1[s1]) + readVInt (b1, s1);
       int finstL2 = WnitableUtils.decodeVIntSize(b2[s2]) + readVInt (b2, s2);
       return TEXT„COMPARATOR.compane(b1, s1, firstL1, b2, s2, firstL2);
     } catch (IOException e) {
       throw new IllegalAngumentException(e);
     }
    }
    @Ovennide
    public int compane(WnitableCompanable a, WritableComparable b) {
    if (a instanceof TextPain && b instanceof TextPair) {
      return ((TextPair) a).first.compareTo(((TextPair) b).first);
    }
    return super.compare(a, b);
  }
 }

序列化框架

尽管大多数MapReduce程序使用的都是Writable类型的键和值,但这并不是 MapReduCeAPI强制使用的。事实上,可以使用任何类型,只要能有一种机制对每 个类型进行类型与二进制表示的来回转换。

为了支持这一机制,Hadoop有一个计对可替换序列化框架(semlization framework) 的 API。一个序列化框架用一个 Serialization 实现(在 org.apache.hadoop. io.senializen 包)来表示。例如,WritableSerialization 类是对 Writable 类型的Serialization的实现。 Serialization对象定义了从类型到Serializer实例(将对象转换为字节流)和 Deserializer实例(将字节流转换为对象)的映射方式。

将io.serizalizations属性设置为一个由句点分隔的类名列表,即可注册 Serialization 实现。它的默认值是 org.apache.hadoop_io.serializer. WnitableSenialization,这意味着只有Writable对象才可以在外部序列化和 反序列化。

Hadoop包含一个名为JavaSerialization的类,该类使用的是Java Object Serialization。尽管它有利于在MapReduce程序中方便地使用标准的Java类型,如 Integer或String,但不如writable高效,所以不值得如此权衡。

为什么不用 Java Object Serialization?

Java有自己的序列化机制,称为Java Object Serialization(通常简称为“Java Serialization"),该机制与编程语言紧密相关,所以我们很自然会问为什么不 在Hadoop中使用该机制。针对这个问题,Doug Cutting是这样解释的:

“为什么开始设计Hadoop的时候我不用Java Serialization?因为它看起来太复杂,而我认为需要有一个至精至简的机制,可以用于精 确控制对象的读和写,因为这个机制是Hadoop的核心。使用Java Serialization后,虽然可以获得一些控制权,但用起来非常纠结。

不用RMI也出于类似的考虑。高效、高性能的进程间通信是Hadoop的关键。 我觉得我们需要精确控制连接、延迟和缓冲的处理方式,然而RMI对此无能为力。"问题在于Java Serialization不满足先前列出的序列化格式标准:精简、快速、 可扩展、可以互操作。

Java Serialization并不精简:毎个对象写到数据流时,它都要写入其类名——实 现 java.io.Serializable 或 java.io.Externalizable 接口的类确实如 此。同一个类后续的实例只引用第一次出现的句柄,这占5个字节。引用句柄 不太适用于随机访问,因为被引用的类可能出现在先前数据流的任何位置—— 也就是说,需要在数据流中存储状态。更糟的是,句柄引用会对序列化数据流 中的排序记录造成巨大的破坏,因为一个特定类的第一个记录是不同的,必须 当作特殊情况区别对待。

压根儿不把类名写到数据流,则可以避免所有这些问题,Writable接口采取 的正是这种做法。这需要假设客户端知道会收到什么类型。其结果是,这个格 式比Java Serialization格式更精简,同时又支持随机存取和排序,因为流中的 每一条记录均独立于其他记录(所以数据流是无状态的)。

Java Serialization是序列化图对象的通用机制,所以它有序列化和反序列化开 销。更有甚者,它有一些从数据流中反序列化对象时,反序列化程序需要为毎 个对象新建一个实例。另一方面,Writable对象可以(并且通常)重用。例 如,对于MapReduce作业(主要对只有几个类型的几十亿个对象进行序列化和 反序列化),不需要为新建对象分配空间而得到的存储节省是非常可观的。

至于可扩展性,Java Serialization支持演化而来的新类型,但是难以使用。不 支持Writable类型,程序员需要自行管理。

原则上讲,其他编程语言能够理解Java Serialization流协议(由Java Object Serialization定义),但事实上,其他语言的实现并不多,所以只有Java实现。 Writable的情况也不例外。

序列化IDI

还有许多其他序列化框架从不同的角度来解决该问题:不通过代码来定义类型,而 是使用“接口定义语言”(Interface Description Language,IDL)以不依赖于具体语 言的方式进行声明。由此,系统能够为其他语言生成类型,这种形式能有效提髙互 操作能力。它们一般还会定义版本控制方案(使类型的演化直观易懂)。

Hadoop 自己的 Record I/O(可以在 org.apache.hadoop.record 包中找到)有一个IDL以已编译到Writable对象中,有利于生成与MapReduce兼容的类型)。但是, 不知何故,Reduce I/O并未得到广泛应用,而且Avro也不支持它。

Apache Thrift(http://incubator.apache.org/thrift)和 Google 的 Protocol Buffers (http://code.google.com/p/protobuf/)是两个比较流行的序列化框架,但它们常用作二 进制数据的永久存储格式。MapReduce格式对该类的支持是有限的;0但在Hadoop 的有些部分,使用Thrift来提供跨语言的API,例如Thrifts定制功能模块,作为Hadoop文件系统提供的API。

在下一小节,我们深入探讨Avro,这是一个基于IDL的序列化框架,非常适用于 Hadoop的大规模数据处理。


Avro

Apache Avro是一个独立于编程语言的数据序列化系统。该 项目是由Doug Cutting(Hadoop之父)创建的,旨在解决Hadoop中Writable类型的 不足:缺乏语言的可移植性。拥有一个可被多种语言(当前是C,C++, Java, Python和Ruby)处理的数据格式与绑定到单一语言的数据格式相比,前者更易于与 公众共享数据集。允许其他编程语言能够读写数据,该类数据格式进行读写操作, 会使其具有更好的特性。

但为什么要有一个新的数据序列化系统?与Apache Thrift和Google的Protocol Buffers相比,Avro有其独有的特性。与前述系统及其他系统相似,Avro数据是用语言无关的模式定义的。但与其他系统不同的是,Avro的代码生成是可选的,这意味着你可以对遵循指定模式的数据进行读写操作,即使在此之前代码,从来没有 见过这个特殊的数据模式。为此,Avro假设数据模式总是存在的——在读写数据 时——形成的是非常精简的编码,因为编码后的数值不需要用字段标识符来打标 签。Avro模式通常用JSON编写,而数据通常用二进制格式编码,但也有其他选 择。还有一种称为Avro IDL的髙级语言,可以使用开发人员更熟悉的类似匚的语 言来编写模式。还有一个基于JSON的数据编码方式(对构建原型和调试Avro数据 很有用,因为它是我们人类可读的)。

AvrO规范精确定义所有实现都必须支持的二进制格式。同时它还指定这些实现还需要支持其他Avro特性。但是,该规 范并没有给API制定规范:实现可以根据自己的需求操作Avro数据并给出相应的 API,因为每个API都与语言相关。事实上,只有一种二进制格式比较重要,这表 明绑定一种新的编程语言来实现是比较容易的,可以避免语言和格式组合爆炸问题,否则将对互操作性造成一定的问题。

Avro有丰富的数据模式解析(data schema resolution)能力。在精心定义的约束条件 下,读数据所用的模式不必与写数据所用的模式相同。由此,Avro是支持模式演 化的。例如,通过在用于读取以前数据的模式中声明新的用于读取记录的选项字 段。新的和以前客户端均能以相似的方法读取按旧模式存储的数据,同时新的客户 端可以使用新的字段写入的新内容。相反的,如果老客户端读取新客户端写入的数 据,它将忽略新加入的字段并按照先前的数据模式进行处理。

Avro为一系列对象指定一个对象容器格式——类似于Hadoop的顺序文件。Avro数 据文件包含元数据项,模式数据存储在其中,这使文件可以自我声明。Avro数据 文件支持压缩,并且是可切分的,这对MapReduce的输入格式至关重要。另外, Avro本身是为MapReduce设计的,所以在不久的将来有可能使用Avro作为一流 的MapReduce API(即,比Streaming更丰富的API,就像Java API或C++管道一样) 融入其他编程语言。

Avro还可以用于RPC,但这里不进行详细说明。Hadoop项目计划移植到Avro RPC,这会带来诸多优势,包括支持滚动升级,对多语言客户端的支持,比如完全 用C语言实现的HDFS客户端。

Avro数据类型和模式

Avro定义了少数数据类型,它们可用于以写模式的方式来构建应用特定的数据结 构。考虑到互操作性,其实现必须支持所有的Avro类型。

表4-8列举了 Avro的基本类型。每个基本类型还可以使用更冗长的形式和使用type属性来指定,示例如下:

{ "type": "null" }

表4-8. Avro基本类型

类型名称描述模式示例
null空值“null”
boolean二进制值"boolean"
int32位带符号整数“int”
long64位带符号整数“long”
fload单精度(32位)IEEE754浮点数“float”
double双精度(64位)IEEE754浮点数“double”
bytes8位无符号字节序列“bytes”
stringUnicode字符序列“string”

表4-9列举了Avro的复杂类型,并为每个类型给出模式示例。

类型名称描述模式示例
array一个排过序的对象集 合。特定数组中的所有 对象必须模式相同{ "type": "array", "items": "long"}
map未排过序的键/值对。键 必须是字符串,值可以是 任何类型,但一个特定map 中所有值必须模式相同"type": "map","values":"string"
record任意类型的一个命名字 段集合{ "type”"record", WeatherRecord", ,weather reading. [ :"year", "type”:temperaturen, "int"}, "type": 'stationId", "type":"string"}]
enum一个命名的值集合{"type": "enum", "name": "Cutlery", "doc": "An eating utensil." "symbols": ["KNIFE", "FORK" "SPOON"]}
fixed一组固定数量的8位无符号字节{ "type”: “fixed", "name": HMdSHash,
union模式的并集。并集可以用JSON 数组表示,其中每个元素为一个 模式。并集表示的数据必须与其 其中一个模式相匹配[ "null", "string.., {"type": "map" "string"}]

毎个Avro语言API都包含该语言特定的Avro类型表示,例如,Avro的double 类型可以用C、C++和 Java语言的double类型表示,Python的float类型表 示,Ruby的float类型表示。

而且,一种语言可能有多种表示或映射。所有的语言都支持动态映射,即使运行前 并不知道具体模式,也可以使用动态映射。对此,Java称为“通用”(generic) 映射。

另外,Java和C++实现可以自动生成代码来表示符合某一Avro模式的数据。代码 生成(在Java中称为“特殊映射”),能优化数据处理,如果读写数据之前就有一个 模式备份。那么,为用户代码生成的类和为通用代码生成的类相比,前者更能提供 领域相关的AP1。

Java拥有第三类映射,即自反映射(reflect mapping,将Avro类型映射成事先已有 的Java类型)。它的速度比通用映射和特殊映射都慢,所以不推荐在新应用中使用。

表4-10列举了Java的类型映射。如表所示,特殊映射和通用映射相同,除非有特 别说明(同样的,自反映射与特殊映射也相同,除非特别说明)。特殊映射与通用映 射仅在record, enum和fixed三个类型方面有区别,所有其他类型均有自动生 成的类(类名由name属性和可选的namesapce属性决定)。

为什么不使用Java通用映射和特殊映射,而用Java String来表示 a Avro String?答案与效串相关:Avro Utf8类型是易变的,所以可以 重用它对一系列值进行读写操作。另外,Java String在新建对象时进 行UTF-8解码,但Avro执行Utf8解码更晚一些,某些情况下这样做 可以提高系统性能。注意,Java自反映射确实使用了 Java的String 类,其主要因素是Java的兼容性而非性能,

表4-10. Avro的Java类型映射

Avro类型Java通用映射Java特殊映射Java自反映射
nullnull类型booleanBoolean
intInt

longlong

floatfloat

doubledouble

bytesjava,nio,bytebuffer
字节数组
stringorg.apache.avro.util/utf8
java.lang.String
arrayorg.apache.avro. util/utf8
array 或 java.util/Collection
mapjava.util/map

recordorg.apache.avro.generic.genericrec ord生成实现org.apache.avro.specific/SpecificRecord 的类具有零参数构造函数的任意用户类。继承了所有不传递的实例字段
enumjava.lang,string生成Java enum类型任意Java enum类型
fixedorg.apache.avro.generic/genericfixed生成实现org.apache.avro.specific.Speci ficFixed 的类org.apache.avro.generic genericFixed
unionjava.lang.object

内存中的序列化和反序列化

Avro为序列化和反序列化提供了 API,如果想把Avro集成到现有系统(比如已定义帧格式的消息系统),这些八?1函数就很有用。其他情况,请考虑使用Avro的数据 文件格式。

让我们写一个Java程序从数据流读写Avro数据。首先以一个简单的Avro模式的 为例,它用于表示以记录形式出现的一对字符串:

{
  "type": *'record",
  "name": "Pair",
  "doc": "A pair of strings.",
  "fields":[
    {"name": "left", "type": "string"},
    {"name": "right", "type": "string"}
  ]
 }

如果这一模式存储在类路径下一个名为Pair.avsc的文件中(.avsc是Avro模式文件 的常用扩展名),然后我们可以通过下列声明进行加载:

Schema schema = Schema.parse(getClass().getResourceAsStream("Pair.avsc"));

我们可以使用以下通用API来创建Avro记录的实例:

genericRecord datum = new GenericData.Record(schema);
		datum.put("left", new Utf8("L"));
		datum.put("right", new Utf8("R"));

注意,我们为记录的String字段构造了一个Avro Utf8实例。 接下来,我们将记录序列化到输出流中:

ByteArrayOutputStream out = new ByteArrayOutputStream();
DatumWriter<GenericRecord> writer = new
GenericDatumWriter<GenericRecord>(schema);
Encoder encoder = new BinaryEncoder(out);
writer.write(datum, encoder);
encoder.flush();
out.close();

这里有两个重要的对象:DatumWriter和Encoder。DatumWriter对象将数据对 象翻译成Encoder对象可以理解的类型,然后由后者写到输出流。这里,我们使 用 GenericDatumWriter 对象,它将 GenericRecord 字段的值传递给 Encoder 对象,本例中是BinaryEncoder。

在本例中,只有一个对象被写到输出流,但如果需要写若干个对象,我们可以调用write()方法,然后再关闭输入流。 需要向GenericDatumWriter对象传递模式,因为该对象要根据模式来确定数据 对象中的哪些数值会被写到输出流中。在我们调用writer的write()方法后,需 要刷新encoder然后关闭输出流。 我们可以使用反向的处理过程从字节缓冲区中读回对象:

DatumReader<GenericRecord> reader = new 6enericDatumReader
<GenericRecord>(schema);
Decoder decoder = DecoderFactory.defaultFactory()
 .createBinaryDecoder(out.toByteArray(), null);
GenericRecord result = reader.read(null, decoder);
assertThat(result.get("left").toString(), is("L"));
assertThat(result.get("right").toString(), is("R"));

我们需要传递空值(null)并调用createBinaryDecoder()和read()方法,因为 这里没有重用对象(分别是decoder或recoder)。 让我们简单看看使用特定API的等价代码。通过使用Avro工具的JAR文件,我们可以根据模式文件生成pair类:

% java -jar $AVRO_HOME/avro-tools-*.jar compile schema \
> avro/src/main/resources/Pair.avsc avro/src/main/java

然后,我们构建一个Pair实例来替代GenericRecord对象,使用 SpecificDatumWriter类将它写到数据流中,使用SpecificDatumReader类来 读回数据:

Pair datum = new Pair(); datum.left = new Utf8("L'*);
datum.right = new Utf8(*'R");
ByteArrayOutputStream out = new ByteArrayOutputStream();
DatumMriterwriter = new SpecificDatumWriter<Pair>(Pair.class);
Encoder encoder = new BinaryEncoder(out);
writer.write(datum, encoder);
encoder.flush();
out.close();
DatumReaderreader = new SpecificDatumReader<Pair>(Pair.class);
Decoder decoder = DecoderFactony.defaultFactory()
.createBinanyDecoden(out.toByteArray(), null);
Pair result = neader.read(null, decoder);
assentThat(result.left.toString(), is("L"));
assentThat(result•right.toString(), is("R"));

Avro数据文件

Avro的对象容器文件格式主要用于存储Avro对象序列。这与Hadoop顺序文件的 设计非常相似。其主要区别在于Avro数据文 件主要是面向跨语言使用设计的,所以,可以用Python写入文件,而用C语言来 读取文件(我们在下一节中仔细探讨)。

数据文件的头部分包含元数据,包括一个Avro模式和一个sync marker(同步标 识),紧接着是一系列包含序列化Avro对象的数据块(压缩可选)。数据块由sync marker来分隔,它对该文件而言,是唯一的(特定文件的标识信息存储在文件头 部),并且允许在文件中搜索到任意位置之后通过块边界快速地重新进行同步。因 此,Avro数据文件是可切分的,因此适合Mapreduce的快速处理。

将Avro对象写到数据文件与写到数据流类似。像以前一样,我们使用一个 DatumWriter,但用 DatumWriter 来创建一个 DataFileWriter 实例,而非使用 一个encoder。由此新建一个数据文件(该文件一般有.avro扩展名)并向它附加新写入的对象:

File file = new File("data.avro");
DatumWriter writer = new GenericDatumWriter(schema);
DataFileWriten dataFileWniten = new DataFileWriter(writer);
dataFileWriten.create(schema, file);
dataFileWriter.append(datum);
dataFileWniten.close();

写入数据文件的对象必须遵循相应的文件模式,否则在调用append()方法时会抛 出异常。

这个例子演示了如何将对象写到本地文件(前面代码段中的java.io.File),但使用重载的DataFileWriter的create()方法,可以将数据对象写到任何一个java.io.OutPutStream对象中。例如,通过对FileSystem对象调用create()方法可以返回OutputStream对象,进而将文件写到HDFS中。

从数据文件中读取对象与前面例子中在内存数据流中读取数据类似,只有一个重要 的区别:我们不需要指定模式,因为可以从文件元数据中读取它。事实上,还可以 对DataFileReader实例调用GetSchema()方法来获取该模式,并验证该模式是 否和原始写入对象的模式相同:

DatumReaderreader = new GenericDatumReader();
DataFileReaderdataFileReader =new DataFileReader(file, reader);
assertThat("Schema is the same", schema, is(dataFileReader.getSchema()))

DataFileReader对象是一个正规的Java迭代器,由此我们可以调用其hashNext() 和Next()方法来迭代其数据对象。下面的代码检査是否只有一条记录,是否有期 望的字段值:

assertThat(dataFileReader.hasNext()3 is(true));
GenericRecord result = dataFileReader.next();
asassertThat(result.get("left").toString(), is("L"));
asassertThat(result.get("right"),toString(), is("R"));
assertThat(dataFileReader.hasNext(), is(false));

但是,我们并没有使用传统的next()方法,因为更适合的做法是使用重载函数并 返回对象实例(该例中,为GenericRecord对象),由此可以实现对该对象的重用 以及减少对象分配和回收所产生的开销,特别是文件中包含有很多对象时。代码如 下所示:

GenericRecord record = null;
while (dataFileReader.hasNext()) {
 record = dataFileReader.next(record);
  // process record
}

如果对象重用不是那么重要,则可以使用如下更简短的形式:

for (GenericRecord record : dataFileReader) {
  // process record
}

对于在Hadoop文件系统中读取文件的一般例子,可以使用Avro的FsInput对象 来指定使用Hadoop Path对象作为输入对象。事实上,DataFileReader对象提供了随机访问Avro数据文件的能力(通过seek()和sync()方法)。但对于大多数情 况,如果顺序访问数据流足够了,则使用DataFileStream对象。 DataFileStream对象可以从任意Java InputStream 对象中读取数据。

互操作性

为了说明Avro语言的互操作性,让我们试着用一种语言(typhon)来写入数据文件,再用另一种语言来读取这个文件。

Python API例4-10中的程序从标准输入中读取由句点分隔的字符串,并将它们 以Pair记录的方式写入Avro数据文件。与写数据文件的Java代码类似,我们新 建了一个DatumWriter对象和一个DataFileWriter对象,注意,我们在代码中 嵌入了Avro模式,尽管没有该模式,我们仍然可以从文件中正确读取数据。 Python以目录形式表示Avro记录,从标准输入中读取的每一行都转换为dict对 象并附加到DataFileWriter对象末尾。

例4-10.这个Python程序将Avro pair记录写到一个数据文件

在运行该程序之前,我们需要为Python安装Avro:

% easy_install avro

运行该程序,我们需要指定文件名(pairs.avro,输出结果将写到这个文件)和通过标 准输入发送输入的成对记录,结束文件输入时键入Control-D:

% python avro/src/main/py/write_pairs.py pairs.avro a,1
c,2
b,3
b,3
ˆD

C API下面转向CAPI,写程序来显示pairs.avro文件的内容,如例4-11。

例4-11. C语言程序从数据文件中读取Avro的成对记录

该程序的核心部分主要处理三件事情。

(1)通过调用 Avro 的 avro_file_reader 函数打开一个 avro_file_ reader_t 类型的文件读取实例。

(2)通过文件读取实例的avro_file_reader方法循环读取Avro数据直到读完所 有的成对记录(由返回值rval决定)。

(3)通过avro_file_reader_close方法关闭文件读取实例。

将数据模式作为avro_file_reader_read方法的第二个参数,便可以支持读取模 式不同于文件写入模式的情况(下一节将详细说明),但如果参数设为null,则说 明希望Avro使用数_文件的模式来读取数据。第三个参数为指向avro_datum_t 对象的指针,该指针的内容是从文件中读取的下一条记录的内容。通过调用 avro_record_get方法,我们将pair结构分解成两个字段,然后通过 avro_string_get方法抽取出每个字段的值,最后打印输出到控制台。

使用Python程序的输出来运行程序,打印原始输入:

% ./dump_pairs pairs.avro
a,1
c,2
b,3
b,2

这样,我们便成功交换了两个Avro实现的复杂数据。

模式的解析

我们可以选择一个与写入数据模式(写入模式)不同的模式来读回数据(读取模式)。 这是个非常有用的工具,因为它允许模式演化。例如,可以考虑对字符串对增加 description字段为一新模式:

{
“type" : "record",
"name": "Pair'S
"doc": "A pair of strings with an added field.",
"fields":[
{"name": "left", "type": "string"},
{"name": "right' "type": "string"},
{"name": "description", "type": "string", "default”:””}
]
}

我们可以使用该模式读取前面序列化的数据,因为我们给description字段指定 了一个默认值(空字符串),所以Avro在读取没有定义该字段的记录时就会使用这 个空值。如果忽略default属性,那么在读取旧数据时会报错。

读模式不同于写模式时,我们调用GenericDatumReader的构造函数,它取两个 模式对象,即读取对象和写入对象,并按照以下顺序:

DatumReaderreader =
new GenericDatumReader(schema, newSchema);
Decoder decoder = DecoderFactory.defaultFactory().createBinaryDecoder(out.toByteArray(), null);
GenericRecord result = reader.read(null, decoder);
assertThat(result.get("left").toString(), is("L"));
assertThat(result.get("right")•toString(), is("R"));
assertThat(result.get("description").toString(), is(""));

对于元数据中存储有写入模式的数据文件,我们只需要显示指定写入模式,具体做 法是向写入模式传递null值:

DatumReaderreader =
new GenericDatumReader(null, newSchema);

不同读取模式的另外一个应用是去掉记录中的某些字段,该操作可以称为“投影” (projection)。记录中有大量的字段,但如果你只需读取其中的一部分,它是非常有用的。例如,可以使用这一模式只读取pair对象的right字段:

{
 "type": "record",
 "name": "Pair",
 "doc": "The right field of a pain of strings.",
 "fields":[
 {"name”: "night", "type”: "string"}
 ]
}

模式解析规则可以直接解决模式由一个版本演化为另一个版本时产生的问题, Avro规范中对所有Avro类型均有详细说明。表4-11从类型读写(客户端和服务器 端)的角度对记录演化规则进行了总结。

表4-11.记录的模式演化

新模式写入读取操作
增加字段通过默认值读取新字段,因为写入时并没有该字段

读取时并不知道新写入的新字段,所以忽略该字段(投 影)
删除字段读取时忽略已删除的字段(投影)

写人时不写入已删除的字段。如果旧模式对该字段有默认值,那么读取时可以使用该默认值,否则产生错 误。这种情况下,最好同时更新读取模式,或在更新 写人模式之前更新读取模式

排列顺序

Avro定义了对象的排列顺序。对于大多数Avro类型来说,希望使用默认顺序——例如,数值型按照数值的升序进行排列。其他的都不重要——例如,枚举通过定义 的符号来排序而非符号字符串的值。

所有的类型,record除外,均按照Avro规范中预先定义的规则来排序,这些规则 不能被用户改写。但对于记录,你可以通过指定order属性来控制排列顺序。它 有三个值:ascending(默认值)、descending(反向顺序)或ignore(所以为了比较 的目的,可以忽略该字段。)

例如,通过将right字段设置为descending,下述模式定义了pair记录的顺序。为了排序的目的,忽略了left字段,但依旧保留在投影中:

{
"type": "record",
"name": "Pair",
"doc": "A pair of strings, sorted by right field descending.",
“fields":[
{“name": "left", "type”: "string", "order": "ignore"},
{"name":"right", "type": "string' "order": "descending"}
]
}

按照读取模式中的文档顺序,记录中的字段两两进行比较。这样,通过指定一个恰 当的读取模式,便可以对数据记录使用任意顺序。该模式首先 定义了一组right字段的顺序,然后是left字段的顺序:

{
"type": "record",
“name" : "Pair",
"doc": "A pair of strings, sorted by right then left.",
"fields":[
{"name": "right", "type": "string"},
{“name”: "left", "type": "string"}
]
}

Avro实现髙效的二进制比较。也就是说,Avro并不需要将二进制对象反序列化成 对象即可实现比较,因为它可以直接对字节流进行操作。在使用pair模式情况下 (没有order属性的情况下),例如,Avro按以下方式实现了二进制比较:

第一个字段,即left字段,使用UTF-8编码,由此Avro可以根据字母表顺序进 行比较。如果它们不同,那么它们直接的顺序就可以确定,由此Avro可以停止比 较。否则,如果这两个字节顺序是相同的,那么它们比较第二个字段(right),同样 在字节尺度上使用字母表序排列,应为该字段同样也使用UTF-8编码。

Avro MapReduce

Avro提供了一组让MapReduce程序在Avro数据上简单运行的类。例如,在 org.apache.avro.mapreduce 代码包中的 AvroMapper 类和 AvroReducer 类是 Hadoop规范(旧版)中的Mapper和Reducer类。上述两个类去除了作为输入和输 出的键/值对的不同,因为Avr0数据文件只是一系列顺序排列的值。但是,为了混 洗的目的依旧将中间结果数据划分为键/值对。

除 Java 语言外,Avor 提供了连接器框架(在 org.apache.avro.mapred.tether 代码包中)。在本书写作期间,对其他语言并没有其他绑定,但在以后的发行版本 中会加入上述绑定。

关注微信获取最新动态