Hadoop教程

示例

假设有一组服务器用于为客户端提供某种服务。我们希望每个客户端都能找到其中 一台服务器,这样它们就可以使用这项服务。在这个例子中,一个挑战是如何维护 这组服务器的列表。

这组服务器的成员列表显然不能存储在网络中的单个节点上,否则该节点的故障将 意味着整个系统的故障(我们希望这个成员列表是高度可用的)。假设我们有一种可靠的方法解决了这个成员列表的存储问题。如果其中一台服务器出现故障,我们仍 然需要解决如何从服务器成员列表中将它删除的问题。某个进程需要负责删除故障 服务器,但注意不能由故障服务器自己来完成,因为故障服务器已经不再运行!

我们所描述的不是一个被动的分布式数据结构,而是一个主动的、能够在某个外部 事件发生时修改数据项状态的数据结构。ZooKeeper提供这种服务,所以让我们看 看如何使用它来实现这种众所周知的组成员管理应用。

ZooKeeper中的组成员关系

理解ZooKeeper的一种方法就是将其看作一个具有髙可用性的文件系统。但这个文 件系统中没有文件和目录,而是统一使用“节点”(node)的概念,称为znode。 znode既可以作为保存数据的容器(如同文件),也可以作为保存其他znode的容器 (如同目录)。所有的znode构成一个层次化的命名空间。一种自然的建立组成员列 表的方式就是利用这种层次结构,创建一个以组名为节点名的znode作为父节点, 然后以组成员名(服务器名)为节点名来创建作为子节点的znode。图14-1给出了一 组具有层次结构的znode。

图 14-1.ZooKeeper 中的 znode

在这个示例中,我们没有在任何znode中存储数据,但在一个真实的应用中,你可 以想象将关于成员的数据存储在它们的znode中,例如主机名。


创建组

让我们通过编写一段程序的方式来介绍ZooKeeper的Java API,这段示例程序用于 为组名为/zoo的组创建一个znode。参见例14-1。

例14-1.该程序在Zookeeper中新建表示组的znode

public class CreateGroup implements Watchen {private static final int SESSION_TIMEOUT = 5000;private ZooKeeper zk;private CountDownLatch connectedSignal = new CountDownLatch(1);public void connect(String hosts) throws IOException, InterruptedException { zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this); connectedSignal.await();}@Overridepublic void process(WatchedEvent event) { // Watcher interface if (event.getState() == KeeperState.SyncConnected) { connectedSignal.countDown();public void create(String groupName) throws KeeperException, InterruptedException {String path = "/" + groupName;String createdPath = zk.create(path, null/*data*/, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);System.out.println("Created " + createdPath);public void close() throws InterruptedException { zk.close();public static void main(String[] args) throws Exception { CreateGroup createGroup = new CreateGroup(); createGroup.connect(args[0]); createGroup.create(args[1]); createGroup.close();}}

main()方法执行时,创建了一个CreateGroup的实例并且调用了这个实例的connect()方法。connect方法实例化了一个新的ZooKeeper类的对象,这个类 是客户端API中的主要类,并且负责维护客户端和ZooKeeper服务之间的连接。 ZooKeeper类的构造函数有三个参数:第一个是ZooKeeper服务的主机地址(可指 定端口,默认端口是2181); 第二个是以毫秒为单位的会话超时参数(这里我们设 成5秒),后文中将给出该参数的详细解释;第三个参数是一个WatChe^对象的实 例。Watcher对象接收来自于ZooKeeper的回调,以获得各种事件的通知。在这个例子中,CreateGroup是一个watcher对象,因此我们将它传递给ZooKeeper的构造函数。

当一个ZooKeeper的实例被创建时,会启动一个线程连接到ZooKeeper服务。由 于对构造函数的调用是立即返回的,因此在使用新建的ZooKeeper对象之前一定 要等待其与ZooKeeper服务之间的连接建立成功。我们使用Java的 CountDownLatch类(位于java.util.concurrent包中)来阻止使用新建的ZooKeeper对象,直到这个ZooKeeper对象已经准备就绪。这就是Watcher类的 用途,在它的接口中只有一个方法:

public void process(WatchedEvent event);

客户端已经与ZooKeeper建立连接后,Watcher的process()方法会被调用,参 数是一个表示该连接的事件。在接收到一个连接事件(以Watcher.Event.KeeperState 的枚举型值SyncConnected来表示)时,我们通过调用CountDownLatch的countDown()方法来递减它的计数器。锁存器(latch)被创建时带有一个值为1的计 数器,用于表示在它释放所有等待线程之前需要发生的事件数。在调用一次countDown()方法之后,计数器的值变为0,则await()方法返回。

现在connect()方法已经返回,下一个执行的是CreateGroup的create()方法。在这个方法中,我们使用ZooKeeper实例中的create()方法来创建一个新 的ZooKeeper的znode。所需的参数包括:路径(用字符串表示)、znode的内容(字 节数组,本例中使用空值)、访问控制列表(简称ACL,本例中使用了完全开放的 ACL,允许任何客户端对znode进行读写)和创建znode的类型。

有两种类型的znode:短暂的和持久的。创建znode的客户端断开连接时,无论客 户端是明确断开还是因为任何原因而终止,短暂znode都会被ZooKeeper服务删 除。与之相反,当客户端断开连接时,持久znode不会被删除。我们希望代表一个 缉的znode存活的时间应当比创建程序的生命周期要长,因此在本例中我们创建了 一个持久的znode。

create()方法的返回值是ZooKeeper所创建的路径,我们用这个返回值来打印一 条表示路径成功创建的消息。当我们查看“顺序znode” (sequential znode)时,会 发现create()方法返回的路径与传递给该方法的路径不同。

为了观察程序的执行,我们需要在本地机器上运行ZooKeeper,然后可以输入以下 命令:

% export CLASSPATH=build/classes:$ZOOKEEPER_INSTALL/*: $ZOOKEEPER_INSTALL/lib/*:\$ZOOKEEPER_INSTALL/conf % java CreateGroup localhost zooCreated /zoo

加入组

这个应用的下一部分是一段用于注册组成员的程序。毎个组成员将作为一个程序运 行,并且加入到组中。当程序退出时,这个组成员应当从组中被删除。为了实现这 一点,我们在ZooKeeper的命名空间中使用短暂znode来代表一个组成员。 例14-2中的程序JoinGroup实现了这个想法。在基类ConnectionWatcher中, 对创建和连接Z00Keeper实例的程序逻辑进行了重构,参见例14-3。

例14-2.用于将成员加入组的程序

public class DoinGroup extends ConnectionWatcher {public void join(String groupName, String memberName) throws KeeperExceptiori, InterruptedException {String path = "/" + groupName + ••/*• + memberName;String createdPath = zk.create(path, null/*data*/, Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); System.out.println("Created " + createdPath);}public static void main(String[] args) throws Exception {DoinGroup joinGroup = new DoinGroup(); joinGroup.connect(args[0]); joinGroup.join(args[1], args[2]);// stay alive until process is killed or thread is interrupted Thread.sleep(Long.MAX_VALUE);}}

例14-3.用于等待建立与ZooKeeper连接的辅助类

public class ConnectionWatcher implements Watcher {private static final int SESSION_TIMEOUT = 5000;protected ZooKeeper zk;private CountDownLatch connectedSignal = new CountDownLatch(1);public void connect(String hosts) throws IOException_* InterruptedException { zk = new ZooKeeper(hosts, SESSION_TIMEOUT, this);   connectedSignal.await(),}@Overridepublic void process(WatchedEvent event) {if (event.getState() == KeeperState.SyncConnected) { connectedSignal.countDown();public void close() throws InterruptedException {zk.close();}}

JoinGroup的代码与CreateGroup的非常相似。在它的join()方法中,创建短 暂znode作为组znode的子节点,然后通过休眠来模拟正在做某种工作,直到该进 程被强行终止。接着,你会看到随着进程终止,这个短暂znode被ZooKeeper 刪除。


列出组成员

现在,我们需要一段程序来査看组成员(见例14-4)。

例14-4.用于列出组成员的程序

public class ListGroup extends ConnectionWatcher {public void list(String groupName) throws KeeperException, InterruptedException { String path = "/" + groupName;try {List<String> children = zk.getChildren(path, false); if (children.isEmpty()) {System.out.printf("No members in group %s\n", groupName);System.exit(1);}for (String child : children) {System.out.println(child);}} catch (KeeperException.NoNodeException e) {System.out.printf(MGroup %s does not exist\n", groupName);System.exit(1);}}public static void main(String[] args) throws Exception {ListGroup listGroup = new ListGroup(); listGroup.connect(args[0]); listGroup.list(args[1]); listGroup.close();}}

在list()方法中,我们调用了 getChildren()方法来检索并打印输出一个znode 的子节点列表,调用参数为该znode的路径和设为false的观察标志。如果在一 个znode上设置了观察标志,那么一旦该znode的状态改变,关联的观察(watcher)会被触发。虽然在这里我们可以不使用观察,但在查看一个znode的子节点时,也 可以设置观察,让应用程序接收到组成员加入.退出和组被删除的有关通知。

在这段程序中,我们捕捉了 KeeperException.NoNodeException异常,代表组 的2的如不存在时,这个异常就会被抛出。

让我们看看ListGroup程序是如何工作的。起初,由于我们还没有在组中添加任 何成员,因此zoo组是空的:

% java ListGroup localhost zooNo members in group zoo

我们可以使用JoinGroup来向组中添加成员。由于这些作为组成员的znode不会自己终止(因为sleep口语句),所以我们以后台进程的方式来启动它们:

% java 3oinGroup localhost zoo duck &% java 3oinGroup localhost zoo cow &% java 3oinGroup localhost zoo goat &% goat_pid=$!

最后一行命令保存了将goat添加到组中的Java进程的ID。我们需要保存这个进 程ID,以便能够在查看组成员之后杀死该进程:

% java ListGroup localhost zoogoatduckcow

为了从组中删除一个成员,我们杀死了goat所对应的进程:

%kill $goat_pid

几秒钟之后,由于进程的ZooKeeper会话已经结束(超时设置为5秒),所以goat 会从组成员列表中消失,并且所对应的短暂znode也已经被删除。

% java ListGroup localhost zooduckcow

让我们回顾一下,看看已经做到了哪一步。对于参与到一个分布式系统中的节点, 我们已经有了一个建立节点列表的方法。这些节点也许彼此并不了解。例如,一个 想使用列表中节点来完成某些工作的客户端,能够在这些节点不知道客户端的情况 下发现它们。

最后,注意,组成员关系管理并不能解决与节点通信过程中出现的网络问题。即使 一个节点是一个组中的成员,在与其通信的过程中仍然会出现故障,这种故障必须 以一种合适的方式解决(重试、使用组中另外一个成员等)。

ZooKeeper命令行工具

ZooKeeper提供了一个命令行工具用于在其命名空间内进行交互。我们可以使用这个工具列出/zoo znode之下的znode列表,如下所示:

% zkCli.sh localhost ls /zoo Processing lsWatchedEvent: Server state change. New state: SyncConnected [duck, cow]

不使用任何参数直接运行这个命令行工具,可以显示该工具的使用帮助。


删除组

为了使这个例子比较完整,让我们来看看如何删除一个组。ZooKeeper类提供了 一个delete()方法,该方法有两个参数:路径和版本号。如果所提供的版本号与 znode的版本号一致,ZooKeeper会删除这个znode。这是一种乐观的加锁机制, 使客户端能够检测出对znode的修改冲突。通过将版本号设置为-1,可以绕过这个 版本检测机制,不管znode的版本号是什么而直接将其删除。

ZooKeeper不支持递归的删除操作,因此在删除父节点之前必须先删除子节点。在 例14-5中,DeleteGroup类用于删除一个组及其所有成员。

例14-5.用于删除一个组及其所有成员的程序

public class DeleteGnoup extends ConnectionWatcher {public void delete(String groupName) throws KeeperException, InterruptedException {String path = "/" + groupName;try {List
 
   children = zk.getChildren(pathj false); for (String child : children) { zk.delete(path + "/" + child, -1);}zk.delete(path, -1);} catch (KeeperException.NoNodeException e) {System:OUt.printf("Group %s does not exist\n", groupName); System.exit(1);public static void main(Stning[] args) throws Exception { DeleteGroup deleteGroup = new DeleteGroup(); deleteGroup.connect(args[0]); deleteGroup.delete(args[l]); deleteGnoup.close();}}
 

最后,我们可以删除之前所创建的zoo组:

% java DeleteGroup localhost zoo % java ListGroup localhost zoo Group zoo does not exist

关注微信获取最新动态