Hadoop教程

数据流

文件读取剖析

为了了解客户端及与之交互的HDFS、namenode和datanode之间的数据流是什么样的,我们可参考图3-1,该图显示了在读取文件时一些事件的主要顺序。

图3-1.客户端读取HDFS中的数据

客户端通过调用FileSyste对象的open()方法来打开希望读取的文件,对于HDFS来说,这个对象是分布式文件系统(图3-1中的步骤1)的一个实例。 DistributedFileSystem通过使用RPC来调用namenode,以确定文件起始块的位置(步骤2)。对于每一个块,namenode返回存有该块复本的datanode地址。此外,这些datanode根据它们与客户端的距离来排序。如果该客户端本身就是一个datanode (比如,在一个MapReduce任务中),并保存有相应数据块的一个复本时,该节点将从本地datanode中读取数据。

DistributedFileSystem 类返回一个 FSDataInputStream 对象(一个支持文件定位的输入流)给客户端并读取数据。FSDataInputStream类转而封装 DFSInputStream 对象,该对象管理着 datanode 和 namenode 的 I/O。

接着,客户端对这个输入流调用read()方法(步骤3)。存储着文件起始块的datanode地址的DFSInputStream随即连接距离最近的datanode。通过对数据流反复调用read()方法,可以将数据从datanode传输到客户端(步骤4)。到达块的末端时,DFSInputStream会关闭与该datanode的连接,然后寻找下一个块的最佳datanode(步骤5)。客户端只需要读取连续的流,并且对于客户端都是透明的。

客户端从流中读取数据时,块是按照打开DFSInputStream与datanode新建连接的顺序读取的。它也需要询问namenode来检索下一批所需块的datanode的位置。 一旦客户端完成读取,就对FSDataInputStream调用close()方法(步骤6)。

在读取数据的时候,如果DFSInputStream在与datanode通信时遇到错误,它便会尝试从这个块的另外一个最邻近datanode读取数据。它也会记住那个故障datanode,以保证以后不会反复读取该节点上后续的块。DFSInputStream也会通 过校验和确认从datanode发来的数据是否完整。如果发现一个损坏的块,它就会在DFSInputStream试图从其他datanode读取一个块的复本之前通知namenode。

这个设计的一个重点是,namenode告知客户端每个块中最佳的datanode,并让客户端直接联系该datanode且检索数据。由于数据流分散在该集群中的所有datanode,所以这种设计能使HDFS可扩展到大量的并发客户端。同时,namenode仅需要响应块位置的请求(这些信息存储在内存中,因而非常髙效),而无需响应数据请求,否则随着客户端数量的增长,namenode很快会成为一个瓶颈。

网络拓扑与Hadoop

本地网络中,两个节点被称为"彼此近邻”是什么意思?在海量数据处理中, 其主要限制因素是节点之间数据的传输速率——带宽很稀缺。这里的想法是将 两个节点间的带宽作为距离的衡量标准。

衡量节点之间的带宽,实际上很难实现(它需要一个稳定的集群,并且在集群中 两两节点对数量是节点数量的平方),为此Hadoop采用一个简单的方法,把网 络看作一棵树,两个节点间的距离是它们到最近的共同祖先的距离总和。该树 中的层次是没有预先设定的,但是相对于数据中心、框架和正在运行的节点, 通常可以设定等级。具体的想法是对于以下毎个场景,可用带宽依次递减:

•同一节点中的进程
•同一机架上的不同节点
•同一数据中心中不同机架上的节点
•不同数据中心中的节点

图3-2给出详细的图式表达(爱好数学的读者会注意到这是一个距离度量的 例子)。


文件写入剖析

接下来我们看看文件是如何写入HDFS的。尽管比较详细,但对于理解数据流还是很有用的,因为它清楚地说明了HDFS的一致模型。

我们要考虑的情况是如何创建一个新文件,并把数据写入该文件,最后关闭该文件。参见图3-3。

客户端通过对DistributedFileSystem对象调用create()函数来创建文件(图 3-3 中的步骤 1)。DistributedFileSystem 对 namenode 创建一个 RPC 调用,在 文件系统的命名空间中创建一个新文件,此时该文件中还没有相应的数据块(步骤 2)。namenode执行各种不同的检查以确保这个文件不存在,并且客户端有创建该 文件的权限。如果这些检査均通过,namenode就会为创建新文件记录一条记录, 否则,文件创建失败并向客户端抛出一个IOException异常。DistributedFileSystem 想客户端返回一个FSDataOutputStream对象,由此客户端可以开始写入数据。

就像读取事件一样,FSDataOutputStream封装一个DFSoutPutstream对象,该对象负责处理datanode和namenode之间的通信。

在客户端写入数据时(步骤3), DFSOutputStream将它分成一个个的数据包,并写 入内部队列,称为“数据队列”(dataqueue)。DataStreamer处理数据队列,它的责任是根据datanode列表来要求namenode分配适合的新块来存储数据备份。这一组datanode构成一个管线————我们假设复本数为3,所以管线中有3个节点。 DataStreamer将数据包流式传输到管线中第1个datanode,该datanode存储数据包并将它发送到管线中的第2个datanode。同样地,第2个datanode存储该数据包并且发送给管线中的第3个(也是最后一个)datanode(步骤4)。

图3-3.客户端将数据写入HDFS

DFSOutputStream也维护着一个内部数据包队列来等待datanode的收到确认回执,称为“确认队列”(ackqueue)。当收到管道中所有datan0de确认信息后,该数据包才会从确认队列删除(步骤5)。

如果在数据写入期间,datanode发生故障,则执行以下操作,这对与写入数据的客户端是透明的。首先关闭管线,确认把队列中的任何数据包都添加回数据队列的最前端,以确保故障节点下游的datanode不会漏掉任何一个数据包。为存储在另一正常datanode的当前数据块指定一个新的标识,并将该标识传送给namenode,以 便故障datanode在恢复后可以删除存储的部分数据块。从管线中删除故障数据节 点并且把余下的数据块写入管线中的两个正常的的datanode。namenode注意到块复本量不足时,会在另一个节点上创建一个新的复本。后续的数据块继续正常接受处理。

在一个块被写入期间可能会有多个datanode同时发生故障,但非常少见。只要写入了 dfs.replication.min的复本数(默认为1),写操作就会成功,并且这个块可以在集群中异步复制,直到达到其目标复本数(dfs.replication的默认值为3)。

客户端完成数据的写入后,会对数据流调用close()方法(步骤6)。该操作将剩余的所有数据包写入datanode管线中,并在联系namenode且发送文件写入完成信号之前,等待确认(步骤7)。namenode已经知道文件由哪些块组成(通过Datastreamer询问数据块的分配),所以它在返回成功前只需要等待数据块进行最小量的复制。


一致模型

文件系统的一致模型(coherency model)描述了对文件读/写的数据可见性。HDFS为性能牺牲了一些POSIX要求,因此一些操作与你期望的可能不同。
在创建一个文件之后,希望它能在文件系统的命名空间中立即可见,如下所示:

Path p = new Path("p");
Fs.create(p);
assertThat(fs.exists(p) ,is(true));

但是,写入文件的内容并不保证能立即可见,即使数据流已经刷新并存储。所以文件长度显示为0:

Path p = new Path("p");
OutputStream out = fs.create(p);
out.write("content".getBytes("UTF-8"));
out.flush();
assertThat(fs.getFileStatus(p).getLen(),is(OL));

当写入的数据超过一个块后,新的reader就能看见第一个块。之后的块也不例外。总之,其他reader无看见当前正在写入的块。

HDFS提供一个方法来强制所有的缓存与数据节点同步,即对 FSDataOutputStream调用sync()方法。当sync()方法返回成功后,对所有新的reader而言,HDFS能保证文件中到目前为止写入的数据均可见且一致:

Path p = new Path("p");
FSDataOutputStream out = fs.create(p);
out.write("content".getBytes("UTF-8"));
out.flush();
out.sync();
assertThat(fs.getFileStatus(p).getLen(), is(((long) "content".length())));

该操作类似于POSIX中的fsync系统调用,该调用将提交一个文件描述符的缓冲数据。例如,利用标准Java API将数据写入本地文件,我们能够在刷新数据流且同步之后看到具体文件内容:

FileOutputStream out = new FileOutputStream(localFile);
out.write("content".getBytes("UTF-8"));
out.flush(); // flush to operating system
out.getFD().sync();// sync to disk
assertThat(localFile.length(), is(((long) "content".length())));

在HDFS中关闭文件其实还隐含执行了sync()方法:

Path p = new Path("p");
OutputStream out = fs.create(p);
out.write("content".getBytes("UTF-8"));
out.close();
assertThat(fs.getFileStatus(p).getLen(), is(((long) "content".length())));

应用设计的重要性

这个一致模型和你设计应用程序的具体方法息息相关。如果不调用sync()方法, 就需要准备好在客户端或系统发生故障时可能会丢失一个数据块。对很多应用来说,这是不可接受的,所以你需要在适当的地方调用sync()方法,例如在写入一 定的记录或字节之后。尽管sync()操作被设计成尽量减少HDFS负载,但它有许多额外开销,所以在数据鲁棒性和吞吐量之间就会有所取舍。选择什么样的权衡,这与具体的应用相关,通过设置不同调用sync()方法的频率来衡量应用程序的性能,最终找到一个合适的频率。

关注微信获取最新动态