Hadoop教程

Java 接口

在本小节中,我们要深入探索Hadoop的Filesystem类:与Hadoop的某一文件系统进行交互的API。虽然我们主要关注的是HDFS的实例,即DistributedFileSystem,但总体来说,还是应该集成FileSystem抽象类,并编写代码,以保持其在不同文件系统中的可移植性。这对测试你编写的程序非常重要,例如,你可以使用本地文件系统中的存储数据快速进行测试。

从Hadoop URL中读取数据

要从Hadoop文件系统中读取文件,最简单的方法是使用Java.net.URL对象打开数据流,进而从中读取数据。具体格式如下:

InputStream in = null; 
try {
    in = new URL("hdfs://host/path").openStream();
    // process in 
} 
finally {
    IOUtils.closeStream(in);
}

让Java程序能够识别Hadoop的hdfs URL方案还需要一些额外的工作。这里采用的方法是通过 FsUrlStreamHandlerFactory 实例调用 URL 中的 setURLStreamHandlerFactory方法。由于Java虚拟机只能调用一次上述方法, 因此通常在静态方法中调用上述方法。这个限制意味着如果程序的其他组件——如不受你控制的第三方组件——已经声明了一个URLStreamHandlerFactory实例, 你将无法再使用上述方法从Hadoop中读取数据。下一节将讨论另一备选方法。

例3-1展示的程序以标准输出方式显示Hadoop文件系统中的文件,类似于Unix中的cat命令。

例3-1.通过URLStreamHandler实例以标准输出方式显示Hadoop文件系统的文件

public class URLCat { 
static {
URL.SGtURLStreamHandlerFactory(nGw FsUrlStreamHandlerFactoryO);
}
public static void main(String[] args) throws Excepticn
{ 
    InputStream in = null;
    try {
        in = new URL(args[0]).openStream();
        IOUtils.copyBytes(in, System.out, 4096, false);
    } finally {
        IOUtils.closeStream(in);
    }
}}

我们可以调用Hadoop中简洁的IOUtils类,并在finally子句中关闭数据流,同时也可以在输入流和输出流之间复制数据(本例中为System.out)。copyBytes 方法的最后两个参数,第一个用于设置复制的缓冲区大小,第二个用于设置复制结束后是否关闭数据流。这里我们选择自行关闭输入流,因而System.out不关闭输入流。

下面是一个运行示例:

% hadoop URLCat hdfs://localhost/user/tom/quangle.txt
On the top of the Crumpetty Tree
The Quangle Wangle sat.
But his face you could not see,
On account of his Beaver Hat.

通过FileSystem API读取数据

正如前一小节所解释的,有时无法在应用中设置URLStreamHandlerFactory实例。这种情况下,需要使用Filesystem API来打开一个文件的输入流。

Hadoop文件系统中通过Hadoop Path对象来代表文件(而非java.io.File对象, 因为它的语义与本地文件系统联系太紧密)。你可以将一条路径视为一个Hadoop文 件系统 UR1,如 hdfs:〃localhost/user/tom/quangle.txt。

FileSystern是一个通用的文件系统API,所以第一步是检索我们需要使用的文件系统实例,这里是HDFS。获取FileSystem实例有两种静态工厂方法:

public static FileSystem get(Configuration conf) throws IOException
Public static FileSystem get(URI uri, Configuration conf) throws IOException

Configuration对象封装了客户端或服务器的配置,通过设置配置文件读取类路径来实现(如conf/core-site.xml)。第一个方法返回的是默认文件系统(在conf/core-site.xml)中指定的,如果没有指定,则使用默认的本地文件系统)。第二个 方法通过给定的URI方案和权限来确定要使用的文件系统,如果给定URI中没有指定方案,则返回默认文件系统。

有了FileSystem实例之后,我们调用open()函数来获取文件的输入流:

Public FSDataInputStream open(Path f) throws IOException
Public abstract FSDataInputStream open(Path f, int bufferSize) throws IOException

例3-2.直接使用FileSystem以标准输出格式显示Hadoop文件系统中的文件

public class FileSystemCat {
public static void main(String[] args) throws Exception {
    String uri = args[0];
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(URI.create(uri), conf);
    InputStream in = null; 
    try {
        in = fs.open(new Path(uri));
        IOUtils.copyBytes(in, System.out, 4096, false);
    } finally 
    {
        IOUtils.closeStream(in);
    }
}

程序运行结果如下:

% hadoop FileSystemCat hdfs://localhost/user/tom/quangle.txt
On the top of the Crumpetty Tree The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat.

FSDataInputStream

实际上,FileSystem对象中的open()方法返回的是FSDataInputStream对象,而不是标准的java.io类对象。这个类是继承了java.io.DataInputStream接口的一个特殊类,并支持随机访问,由此可以从流的任意位置读取数据。

package org.apache.hadoop.fs;

public class FSDataInputStream extends DataInputStream
   implements Seekable, PositionedReadable {
   // implementation elided
}

Seekable接口支持在文件中找到指定位置,并提供一个查询当前位置相对于文件 起始位置偏移量(getpot())的查询方法:

public interface Seekable { void seek(long pos) throws IOException;
long getPos() throws IOException;
boolean seekToNewSource(long targetPos) throws IOException;

调用seek()来定位大于文件长度的位置会导致IOException异常。与 java.io.InputStream中的skip()不同,seek()可以移到文件中任意一个绝对位置,skip()则只能相对于当前位置定位到另一个新位置。

例3-3为例3-2的简单扩展,它将一个文件写入标准输出两次:在一次写完之后, 定位到文件的起始位置再次以流方式读取该文件。

例3-3.使用seek()方法,将Hadoop文件系统中的一个文件在标准输出上显示两次

public class FileSystemDoubleCat {
    public static void main(String[] args) throws Exception {
        String uni = angs[0];
        Configuration conf = new Configuration();
        FileSystem fs = FileSystem.get(URI.cneate(uni), conf);
        FSDataInputStneam in = null;
        try {
            in = fs.open(new Path(uri));
            IOUtils.copyBytes(in, System.out, 4096, false);
            in.seek(0);
            // go back to the start of the file IOUtils.copyBytes(in, System.out, 4096, false);
        } 
        finally 
        {
            IOUtils.closeStream(in);
        }
    }
}

在一个小文件上运行的结果如下:

% hadoop FileSystemDoubleCat hdfs://localhost/user/tom/quangle.txt
On the top of the Crumpetty Tree
The Quangle Wangle sat3
But his face you could not see,
On account of his Beaver Hat.
On the top of the Crumpetty Tree The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat.

FSDataInputStream类也实现了PositionReadable接口,从一个指定偏移量处读取文件的一部分:

public interface PositionedReadable {
    public int read(long position, byte[] buffer, int offset, int length)
    public void readFully(long position, byte[] buffer, int offset, int length) throws IOException;
    public void readFully(long position, byte[] buffer) throws IOException;
}

read()方法从文件的指定position处读取至多为length字节的数据并存入缓冲区buffer的指定偏离量offset处。返回值是实际读到的字节数:调用者需要检查这个值,它有可能小于指定的length长度。readFully方法将指定length长度的字节数数据读取到buffer中(或在只接受buffer字节数组的版本中,读取 buffer.length长度字节数据),除非已经读到文件末尾,这种情况下将抛出 EOFException 异常。

所有这些方法会保留文件当前偏移量,并且是线程安全的,因此它们提供了在读取文件——可能是元数据——的主体时访问文件的其他部分的便利方法。事实上,这只是按照以下模式实现的Seekable接口:

long oldPos = getPos();
try {
    seek(position);
    // read data
} finally { 
    seek(oldPos);
}

最后务必牢记,seek()方法是一个相对髙开销的操作,需要慎重使用。建议用流数据来构建应用的访问模式(如使用MapReduce),而非执行大量的seek()方法。

写入数据

FileSystem类有一系列创建文件的方法。最简单的方法是给准备创建的文件指定一个Path对象,然后返回一个用于写入数据的输出流:

    public FSDataOutputStream create(Path f) throws IOException

还有一个重载方法Progressable,用于传递回调接口,如此一来,可以把数据写入数据节点的进度通知到你的应用:

package org.apache.hadoop.util;
public interface Pnognessable {
    public void progress();
}

另一种新建文件的方法,是使用append()方法在一个已有文件末尾追加数据(还存在一些其他重载版本):

        public FSDataOutputStream append(Path f) throws IOException

该追加操作允许一个writer打开文件后在访问该文件的最后偏移量处追加数据。有了这个API,某些应用可以创建无边界文件,例如,日志文件可以在机器重启后在已有文件后面继续追加数据。该追加操作是可选的,并非所有Hadoop文件系统都 实现了该操作。例如,HDFS支持追加,但33文件系统就不支持。

例3-4显示了如何将本地文件复制到Hadoop文件系统。毎次Hadoop调用 progress()方法时一也就是每次将64 KB数据包写入datanode管线后——打印 一个时间点来显示整个运行过程。注意,这个操作并不是通过API实现的,因此 Hadoop后续版本能否执行该操作,取决于该版本是否修改过上述操作。API仅能让你知道到“正在发生什么事情”。

例3-4.将本地文件复制到Hadoop文件系统

public class FileCopyWithProgress {
    public static void main(String[] args) throws Exception {
        String localSrc = args[0]j String dst = args[1];
        InputStream in = new BufferedInputStneam(new FileInputStneam(localSnc));
        Configuration conf = new Configunation();
        FileSystem fs = FileSystem.get(URI.create(dst), conf);
        OutputStneam out = fs.cneate(new Path(dst)j new Progressable() {
        public void progress() {
        System.out.print("•");
        }
        });
        IOUtils.copyBytes(in, out, 4096, true);
    }
}

典型应用如下:

% hadoop FileCopyWithProgress input/docs/1480-8.txt hdfs://localhost/user/tom/1400-8.txt

目前,其他Hadoop文件系统写入文件时均不调用progress()方法。你将在后续 章节中看到进度对于MapReduce应用的重要性。

FSDataOutputStream 对象

FileSystem 实例的create()方法返回 FSDataOutputStream对象,与 FSDataInputStream类相似,它也有一个查询文件当前位置的方法:

package org.apache.hadoop.fs;
public class FSDataOutputStream extends DataOutputStream implements Syncable {
public long getPos() throws IOException {
// implementation elided
}
// implementation elided
}

但与FSDataInputStream类不同的是,FSDataInputStream类不允许在文件中定位。这是因为HDFS只允许对一个已打开的文件顺序写入,或在现有文件的末 尾追加数据。换句话说,它不支持在除文件末尾之外的其他位置进行写入,因此,写入时定位就没有什么意义。


目录

Filesystem实例提供了创建目录的方法:

            public boolean mkdirs(Path f) throws IOException

这个方法可以一次性新建所有必要但还没有的父目录,就像java.io.File类的mkdirs()方法。如果目录(以及所有父目录)都已经创建成功,则返回true。

通常,你不需要显式创建一个目录,因为调用create()方法写入文件时会自动创建父目录。


查询文件系统

文件元数据:FileStatus

任何文件系统的一个重要特征都是提供其目录结构浏览和检索它所存文件和目录相关信息的功能。FileStatus类封装了文件系统中文件和目录的元数据,包括文件长度、块大小、备份、修改时间、所有者以及权限信息。

FileSystem的getFileStatus()方法用于获取文件或目录的FileStatus对象。例3-5显示了它的用法。

public class ShowFileStatusTest {
    private MiniDFSCluster cluster; // use an in-process HDFS cluster for testing 
    private FileSystem fs;
    @Before
    public void setUp() throws IOException {
        Configuration conf = new Configuration();
        if (System.getProperty("test.build.data") == null) 
        {
            System.setProperty("test.build.data", "/tmp");
        }
        cluster = new MiniDFSCluster(conf, 1, true, null);
        fs = cluster.getFileSystem();
        OutputStream out = fs.create(new Path(M/dir/fileM));
        out.write("content".getBytes("UTF-8"));
        out.close();
     }
    @After
    public void tearDown() throws IOException {
         if (fs != null) { fs.close(); }
         if (cluster != null) {
         cluster.shutdown(); 
    }
    @Test(expected = FileNotFoundException.class)
    public void throwsFileNotFoundForNonExistentFile() throws IOException 
    {
        fs.getFileStatus(new Path("no-such-file"));
     }
    @Test
    public void fileStatusForFile() throws IOException {
        Path file = new Path(*7dir/file");
        FileStatus stat = fs.getFileStatus(file); 
        assertThat(stat.getPath().toUri().getPath(), is("/dir/file"));
        assertThat(stat.isDir(), is(false)); 
        assertThat(stat.getLen(), is(7L));
        assertThat(stat.getModificationTime(),
        is(lessThanOrEqualTo(System.currentTimeMillis())));
        assertThat(stat.getReplication(), is((short) 1));
      assertThat(stat.getBlockSize(), is(64 * 1024 * 1024L));
        assertThat(stat•getOwner(), is("tom"));
        asseptThat(stat.getGroup(), is("supergroup"));
        assertThat(stat• getPermission(). toString(), is(•• rw-r--r-• ••));
    }
    @Test
    public void fileStatusForDirectory() throws IOException 
    { 
        Path dir = new Path("/dir");
        FileStatus stat = fs.getFileStatus(dir);
        assertThat(stat.getPath().toUri().getPath(), is("/dir"));
        assertThat(stat•isDir(), is(true));
        assertThat(stat.getLen(), is(0L));
        assertThat(stat•getModificationTime(),
        is(lessThanOrEqualTo(System.currentTimeMillis())));
        assertThat(stat.getReplication(), is((short) 0));

        assentThat(stat.getBlockSize(), is(0L));
        assentThat(stat.getOwnen(), is("tom"));
        assertThat(stat.getGroup(), is("supergroup"));
        assentThat(stat.getPermission().toString(), is("rwxr-xr-x"));
    }
}

如果文件或目录均不存在,则会抛出FileNotFoundException异常。但是,如果 只需检查文件或目录是否存在,那么调用exists()方法会更方便:

public boolean exists(Path f) throws IOException

列出文件

查找一个文件或目录的信息很实用,但通常你还需要能够列出目录的内容。这就是 FileSystem的ListStatus()方法的功能:

public FileStatus[] listStatus(Path f) throws IOException
public FileStatus[] listStatus(Path f, PathFilter filter) throws IOException
public FileStatus[] listStatus(Path[] files) throws IOException
public FileStatus[] listStatus(Path[] files, PathFilten filter) throws IOException

当传入的参数是一个文件时,它会简单转变成以数组方式返回长度为1的 FileStatus对象。当传入参数是一个目录时,则返回Θ或多个FileStatus对象,表示此目录中包含的文件和目录。

一种重载方法是允许使用PathFilter来限制匹配的文件和目录。最后,如果指定一组路径,其执行结果相当于依次轮流传递每条路径并对其调用ListStatus()方法,再将FileStatus对象数组累积存入同一数组中,但该方法更为方便。这从文件系统树的不同分支构建输 入文件列表时,这是很有用的。例3-6简单显示了这种方法。注意FileUtil中 stat2Paths()方法的使用,它将一个FileStatus对象数组转换为Path对象数组。

例3-6.显示Hadoop文件系统中一组路径的文件信息

public class ListStatus {
public static void main(String[] args) throws Exception {
    String uni = angs[0];
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.get(URI.create(uri), conf);
    Path[] paths = new Path[args.length]; 
    for (int i = 0;i < paths.length; i++) 
    { 
        paths[i] = new Path(args[i]);
        FileStatus[] status = fs.listStatus(paths);
        Path[] listedPaths = FileUtil.stat2Paths(status);
        for (Path p : listedPaths) {
        System.out.println(p);
        }
    }
}}

我们可以通过这个程序显示一组路径集目录列表的并集:

% hadoop ListStatus hdfs://localhost/ hdfs://localhost/user/tom
hdfs://localhost/user
hdfs://localhost/user/tom/books
hdfs://localhost/user/tom/quangle.txt

文件模式

在单个操作中处理一批文件,这是一个常见要求。举例来说,处理日志的 MapReduce作业可能需要分析一个月内包含在大量目录中的日志文件。在一个表 达式中使用通配符来匹配多个文件是比较方便的,无需列举每个文件和目录来指定输入,该操作称为“通配”(globbing)。Hadoop为执行通配提供了两个 FileSystem 方法:

public FileStatus[] glob5tatus(Path pathPattern) throws IOException
public FileStatus[] glob5tatus(Path pathPattern, PathFilter filter) throws IOException

globStatus()方法返回与路径相匹配的所有文件的FileStatus对象数组,并按路径排序。PathFilter命令作为可选项可以进一步对匹配进行限制。

Hadoop支持的通配符与Unix bash相同(见表3-2)。

表3-2.通配符及其含义

通配符名称匹配
*星号匹配0或多个字符
?问号匹配单一字符
[ab]字符类匹配{a,b}集合中的一个字符
[^ab]非字符类匹配非恤{a,b}集合中的一个字符
[a-b]字符范围匹配一个在{a,b}范围内的字符(包括 ab),a在字典顺序上要小于或等于b
[^a-b]非字符范围匹配一个不在{a,b}范围内的字符(包括 ab), a在字典顺序上要小于或等于b
{a,b}或选择匹配包含a或b中的一个的表达式
/c转义字符匹配元字符c

假设有日志文件存储在按日期分层组织的目录结构中。如此一来,2007年最后一 天的日志文件就会存在以/2007/12/31命名的目录中。假设整个文件列表如下:

  • /2007/12/30

  • /2007/12/31

  • /2008/01/01

  • /2008/01/02

  • 一些文件通配符及其扩展如下所示。

    通配符扩展
    /*/2007/2008
    /*/*/2007/12/2008/01
    /*/12/*/2007/12/30/2007/12/31
    /200?/2007/2008
    /200[78]/2007/2008
    /200[7-8]/2007/2008
    /200[^01234569]/2007/2008
    /*/*/{31,01}/2007/12/31/2008/01/01
    /*/*/3{0,l}/2007/12/30/2007/12/31
    /*/{12/31,01/01}/2007/12/31/2008/01/01

    PathFilter对象

    通配符模式并不总能够精确地描述我们想要访问的文件集。比如,使用通配格式排除一个特定的文件就不太可能。FileSystem中的listStatus()和globStatus() 方法提供了可选的PathFilter对象,使我们能够通过编程方式控制通配符:

    org.apache.hadoop.fs;
    public interface PathFilter
    { 
    boolean accept(Path path);
    }

    PathFilter 与 java.io.FileFiltei—样,是 Path 对象而不是 File 对象。

    例3-7显示了用于排除匹配正则表达式路径的PathFilter。

    例3-7.用于排除匹配正则表达式路径的PathFilter

    public class RegexExcludePathFilter implements PathFilter {
      private final String regex;
      public RegexExcludePathFilter(String regex) {
        this.regex = regex;
      }
    
      public boolean accept(Path path) {
      return !path.toString().matches(regex);
      }
    }

    这个过滤器只传递不匹配正则表达式的文件。我们将该过滤器与预先去除文件的通配符相结合:过滤器可优化结果。如下示例将扩展到/2007/12/30:

    fs.globStatus(new Path("/2007/*/*"), new RegexExcludeFilter("^.*/2007/12/31$"))

    过滤器由Path表示,只能作用于文件名。不能针对文件的属性(例如创建时间)来构建过滤器。但是,通配符模式和正则表达式同样无法对文件属性进行匹配。例如,如果你将文件存储在按照日期排列的目录结构中(如同前一节中讲述的那样), 则可以根据Pathfilter在给定的时间范围内选出文件。


    删除数据

    使用FileSystem的delete()方法可以永久性删除文件或目录。

    public boolean delete(Path f, boolean recursive) throws IOException

    如果f是一个文件或空目录,那么recursive的值就会被忽略。只有在recrusive值为true时,一个非空目录及其内容才会被删除(否则会抛出IOException异常)。

关注微信获取最新动态