Hadoop教程

使用ZooKeeper来构建应用

在一定程度上了解ZooKeeper之后,我们接下来要用ZooKeeper编写一些有用的应 用程序。


配置服务

配置服务是分布式应用所需要的基本服务之一,它使集群中的机器可以共享配置信 息中那些公共的部分。简单地说,ZooKeeper可以作为一个具有髙可用性的配置存 储器,允许分布式应用的参与者检索和更新配置文件。使用ZooKeeper中的观察机 制,可以建立一个活跃的配置服务,使那些感兴趣的客户端能够获得配置信息修改 的通知。

让我们来编写一个这样的服务。我们通过两个假设来简化所需实现的服务(稍加修 改就可以取消这两个假设)。第一,我们唯一需要存储的配置数据是字符串,关键 字是znode的路径,因此我们在每个znode上存储了一个键/值对。第二,在任何 时候只有一个客户端会执行更新操作。除此之外,这个模型看起来就像是有一个主 人(类似于HDFS中的namenode)在更新信息,而他的工人则需要遵循这些信息。

我们在名为ActiveKeyValueStore的类中编写了如下代码:

public class ActiveKeyValueStore extends ConnectionWatcher {private static final Charset CHARSET = Charset.forName("L)TF-8");public void write(String path, String value) throws InterruptedException, KeeperException {Stat stat = zk.Gxists(path, false); if (stat == null) {zk.create(path, value.getBytes(CHARSET), Ids.OPEN_ACL_L)NSAFE, CreateMode.PERSISTENT);} else {zk.setData(path, value.getBytes(CHARSET), -1);}}}

write()方法的任务是将一个关键字及其值写到ZooKeeper。它隐藏了创建一个新 的znode和用一个新值更新现有znode之间的区别,而是使用exists操作来检测 znode是否存在,然后再执行相应的操作。其他值得一提的细节是需要将字符串值 转换为字节数组,因为我们只用了UTF-8编码的getBytes()方法。

为了说明ActiveKeyValueStore的用法,我们编写了一个用来更新配置属性值的 类ConfigUpdater,如例14-6所示。

例14-6.用于随机更新ZooKeeper中的属性

public class ConfigL)pdater {public static final String PATH = "/config";private ActiveKeyValueStore store; private Random random = new Random();public ConfigL)pdater(String hosts) throws IOException, InterruptedException { store = new ActiveKeyValueStone();  stone.connect(hosts);public void nun() throws InterruptedException, KeeperException { while (true) {String value = random.nextInt(100) + ""; stone.wnite(PATH, value);System.out.printf("Set %s to %s\n", PATH, value);TimeL)nit .SECONDS. slGep(nandom.nextInt(10));public static void main(Stning[] angs) throws Exception {ConfigUpdaten configUpdaten = new ConfigUpdaten(angs[0]); configUpdaten.nun();}}

这个程序很简单,ConfigUpdater中定义了一个ActiveKeyValueStore,它在ConfigUpdater的构造函数中连接到ZooKeeper。run()方法永远在循环,在随机 时间以随机值更新/config znode。

接下来,让我们看看如何读取/config配置属性。首先,我们在 ActiveKeyValueStore中添加一个读方法:

public String nead(Stning pathj Watchen watchen) throws InterruptedException, KeeperException { byte[] data = zk.getData(path, watcher, null/*stat*/); return new String(data_* CHARSET);}

ZooKeeper的getData()方法有三个参数:路径、一个观察对象和一个Stat对 象。Stat对象由getData()方法返回的值填充,用来将信息回传给调用者。通过 这个方法,调用者可以获得一个znode的数据和元数据,但在这个例子中,由于我 们对元数据不感兴趣,因此将Stat参数设为null。

作为配置服务的用户,ConfigWatcher(见例14-7)创建了一个ActiveKeyValueStore 对象store,并且在启动之后调用了 store的read()方法(在displayConfig() 方法中),将自身作为观察传递给store。displayConfig()方法用于显示它所读到的配置信息的初始值。

例14-7.该用应观察ZooKeeper中属性的更新情况,并将其打印到控制台

public class ConfigWatchen implements Watchen {private ActiveKeyValueStore store;public ConfigWatchen(Stning hosts) throws IOException, InterruptedException { stone = new ActiveKeyValueStone(); store.connect(hosts);}public void displayConfig() throws InterruptedException, KeeperException { String value = stone.nead(ConfigUpdaten.PATH, this);System.out.pnintf("Read %s as %s\n", ConfigUpdaten.PATH, value);}@Overridepublic void process(WatchedEvent event) {if (event.getType() == EventType.NodeDataChanged) { try {displayConfig();} catch (InterruptedException e) {System.err.println("Interrupted. Exiting.");Thread.cunnentThnead().intennupt();} catch (KeeperException e) { System.err.printf("KeeperException: %s. Exiting.\n", e);}}}public static void main(String[] args) throws Exception {ConfigWatcher configWatcher = new ConfigWatcher(args[0]); configWatcher.displayConfig();// stay alive until process is killed or thread is interrupted Thread.sleep(Long.MAX_VALUE);}}

当 ConfigUpdater 更新 znode时,ZooKeeper产生一个类型为 EventType.NodeDataChanged 的事件,从而触发观察。ConfigWatcher 在它的process()方法中对这个事件做出反应,读取并显示配置的最新版本。

由于观察仅发送单次信号,因此毎次我们调用ActiveKeyValueStore的read() 方法时,都将一个新的观察告知ZooKeeper——确保我们可以看到将来的更新。此 外,我们还是不能保证接收到每一个更新,因为在收到观察事件通知与下一次读之 间,znode可能已经被更新过,而且可能是很多次,由于客户端在这段时间没有注 册任何观察,因此不会收到通知。对于示例中的配置服务,这不是问题,因为客户 端只关心属性的最新值,最新值优先于之前的值。但是,一般情况下,这个潜在的 问题是不容忽视的。

让我们看看如何使用这个程序。在一个终端窗口中运行ConfigUpdater:

% java ConfigUpdater localhostSet /config to 79 Set /config to 14 Set /config to 78

可复原的ZooKeeper应用

关于分布式计算的第一个误区1>是“网络是可靠的”。按照他们的观点,程序总是 有一个可靠的网络,因此当程序运行在真正的网络中时,往往会出现各种各样的故 障。让我们看看各种可能的故障模式,以及能够解决故障的措施,使我们的程序在 面对故障时能够及时复原。

在Java API中的每一个ZooKeeper操作都在其throws子句中声明了两种类型的异 常,分别是 InterruptedException 和 KeeperException。

InterruptedException 异常

如果操作被中断,则会有一个InterruptedException异常被抛出。在仏?&语言 中有一个取消阻塞方法的标准机制,即针对存在阻塞方法的线程调用 interrupt()。一个成功的取消操作将产生一个InterruptedException异常。 ZooKeeper也遵循这一机制,因此你可以使用这种方法来取消一个ZooKeeper操 作。使用了 ZooKeeper的类或库通常会传播InterruptedException异常,使客 户端能够取消它们的操作。

InterruptedException异常并不意味着有故障,而是表明相应的操作已经被取 消,所以在配置服务的示例中,可以通过传播异常来中止应用程序的运行。

KeeperException异常

如果ZooKeeper服务器发出一个错误信号或与服务器存在通信问题,抛出的则是 KeeperException异常。针对不同的错误情况,KeeperException异常存在不同 的子类。例如,KeeperException.NoNodeException 是 KeeperException 的一个子类,如果你试图针对一个不存在的znode执行操作,抛出的则是该异常。

每一个KeeperException异常的子类都对应一个关于错误类型信息的代码。例如, KeeperException. NoNodeException 异常的代码是 KeeperException. Code. NONODE(一个枚举值)。

有两种方法被用来处理KeeperException异常:一种是捕捉KeeperException异常并且通过检测它的代码来决定采取何种补救措施;另一种是捕捉等价的 KeeperException子类并且在每段捕捉代码中执行相应的操作。

KeeperException异常分为三大类。

状态异常当一个操作因不能被应用于znode树而导致失败时,就会出现状态异 常。状态异常产生的原因通常是在同一时间有另外一个进程正在修改znode。例 如,如果一个znode先被另外一个进程更新了,根据版本号执行setData操作的进 程就会失败,并收到一个KeeperException.BadVersionException异常,这是因 为版本号不匹配。程序员通常都知道这种冲突总是存在的,也都会编写代码来进行处理。一些状态异常会指出程序中的错误,例如KeeperException.NoChildrenFor EphemeralsException异常,试图在短暂znode下创建子节点时就会抛出该异常。

可恢复的异常可恢复的异常是指那些应用程序能够在同一个ZooKeeper会话中恢复 的异常。一个可恢复的异常是通过 KeeperException.ConnectionLossException 来表示的,它意味着已经丢失了与ZooKeeper的连接。ZooKeeper会尝试重新连 接,并且在大多数情况下重新连接会成功,并确保会话是完整的。

但是 ZooKeeper 不能判断与 KeeperException.ConnectionLossException 异常相关的操作是否成功执行。这种情况就是部分失败的一个例子(在本章开始时提到 的)。这时程序员有责任来解决这种不确定性,并且根据应用的情况来采取适当的 操作。

在这一点上,就需要对“幂等”(idempotent)操作和“非幂等”(Nonidempotent)操作进行区分。幂等操作是指那些一次或多次执行都会产生相同结果的操作,例如读 请求或无条件执行的setData操作。对于幂等操作,只需要简单地进行重试即可。对于非幂等操作,就不能盲目地进行重试,因为它们多次执行的结果与一次执行是 完全不同的。程序可以通过在znode的路径和它的数据中编码信息来检测是否非幂 等操作的更新已经完成。

不可恢复的异常在某些情况下,ZooKeeper会话会失效——也许因为超时或因为 会话被关闭(两种情况下都会收到KeeperException. SessionExpinedException 异常),或因为身份验证失败(KeeperException. AuthFailedException异常)。 无论上述哪种情况,所有与会话相关联的短暂znode都将丢失,因此应用程序需要 在重新连接到ZooKeeper之前重建它的状态。


锁服务

分布式锁在一组进程之间提供了一种互斥机制。在任何时刻,只有一个进程可以持 有锁。分布式锁可以用于在大型分布式系统中实现领导者选举,在任何时间点,持 有锁的那个进程就是系统的领导者。

为了使用ZooKeeper来实现分布式锁服务,我们使用顺序znode来为那些竞争锁的 进程强制排序。思路很简单:首先指定一个作为锁的znode,通常用它来描述被锁 定的实体,称为/leader,然后希望获得锁的客户端创建一些短暂顺序znode,作为 锁znode的子节点。在任何时间点,顺序号最小的客户端将持有锁。例如,有两个 客户端差不多同时创建znode, 分别为/leader/lock-1 fQ/leader/lock-2,那么创建 /leader/lock-1的客户端将会持有锁,因为它的znode顺序号最小。ZooKeeper服务是顺序的仲裁者,因为它负责分配顺序号。

羊群效应

虽然这个算法是正确的,但还是存在一些问题。第一个问题是这种实现会受到“羊 群效应”(herd effect)的影响。考虑有成百上千客户端的情况,所有的客户端都在 尝试获得锁,每个客户端都会在锁znode上设置一个观察,用于捕捉子节点的变 化。毎次锁被释放或另外一个进程开始申请获取锁的时候,观察都会被触发并且每 个客户端都会收到一个通知。“羊群效应”就是指大量客户端收到同一事件的通 知,但实际上只有很少一部分需要处理这一事件。在这种情况下,只有一个客户端 会成功地获取锁,但是维护过程及向所有客户端发送观察事件会产生峰值流量,这 会对ZooKeeper服务器造成压力。

为了避免出现羊群效应,我们需要优化通知的条件。关键在于只有在前一个顺序号 的子节点消失时才需要通知下一个客户端,而不是删除(或创建)任何子节点时都需 要通知。在我们的例子中,如果客户端创建了 znode/leader/lock-1、/leader/lock-2 、/leader/hock-3,那么只有当/leader/lock-2消失时才需要通知/leader/lock-3对应的 客户端;/leader/hock-1消失或有新的znode/leader/lock-4加入时,不需要通知该客 户端。

可恢复的异常

这个申请锁的算法目前还存在另一个问题,就是不能处理因连接丢失而导致的 create操作失败。如前所述,在这种情况下,我们不知道操作是成功还是失败。由 于创建一个顺序2肋七是非幂等操作,所以我们不能简单地重试,因为如果第一次创建已经成功,重试会使我们多出一个永远删不掉的孤儿znode(至少到客户端会话 结束前)。不幸的结果是将会出现死锁。

问题在于,在重新连接之后客户端不能够判断它是否已经创建过子节点。解决方案 是在znode的名称中嵌入一个ID,如果客户端出现连接丢失的情况,重新连接之 后它便可以对锁节点的所有子节点进行检查,看看是否有子节点的名称中包含其 ID。如果有一个子节点的名称包含其ID,它便知道创建操作已经成功,不需要再 创建子节点。如果没有子节点的名称中包含其ID,则客户端可以安全地创建一个 新的顺序子节点。

客户端会话的ID是一个长整数,并且在ZooKeeper服务中是唯一的,因此非常适 合在连接丢失后用于识别客户端。可以通过调用Java ZooKeeper类的getSessionId() 方法来获得会话的ID。

在创建短暂顺序znode时应当采用lock-<sessionId>这样的命名方式,ZooKeeper 在其尾部添加顺序号之后,znode的名称会形如lock-<sessionId>-<sequenceNumber>。 由于顺序号对于父节点来说是唯一的,但对于子节点名并不唯一,因此采用这样的 命名方式可以让子节点在保持创建顺序的同时能够确定自己的创建者。

不可恢复的异常

如果一个客户端的ZooKeeper会话过期,那么它所创建的短暂znode将会被删除, 已持有的锁会被释放,或是放弃了申请锁的位置。使用锁的应用程序应当意识到它 已经不再持有锁,应当清理它的状态,然后通过创建并尝试申请一个新的锁对象来 重新启动。注意,这个过程是由应用程序控制的,而不是锁,因为锁是不能预知应 用程序需要如何清理自己的状态。

实现

正确地实现一个分布式锁是一件棘手的事,因为很难对所有类型的故障都进行正确 的解释处理。ZooKeeper带有一个Java语言编写的生产级别的锁实现,名为 WriteLock,客户端可以很方便地使用它。


更多分布式数据结构和协议

使用ZooKeeper可以实现很多不同的分布式数据结构和协议,例如“屏障” (barrier)、队列和两阶段提交协议。有趣的是它们都是同步协议,即使我们使用异 步ZooKeeper基本操作(如通知)来实现它们。

BookKeeper

BookKeeper是一个具有高可用性和可靠性的日志服务。它可以用来提供预写式日志(write-ahead logging),这是一项在存储系统中用于保证数据完整性的常用技术。 在一个使用预写式日志的系统中,每一个写操作在被应用前都先要写入事务日志。 使用这个技术,我们不必在每个写操作之后都将数据写到永久存储器上,因为即使 出现系统故障,也可以通过重新执行事务日志中尚未应用的写操作来恢复系统的最 后状态。

BookKeeper客户端所创建的日志被称为ledger,每一个添加到ledger的记录被称 为ledger entry,每个ledger entry就是一个简单的字节数组。ledger由复制ledger 数据的bookie服务器进行管理。注意,ledger数据不存储在ZooKeeper中,只有 元数据保存在ZooKeeper中。

传统上,为了让使用预写式日志的系统更加稳定,必须解决保存事务日志的节点故 障问题。通常通过某种方式复制事务日志来解决这个问题。例如,Hadmjp HDFS 中的namenode会将它的编辑日志写到多个磁盘上,每个磁盘都是一个典型的NFS 装入盘。然而,主节点出现故障时,还是需要手动完成故障恢复。通过提供具有髙 可用性的日志服务,BookKeeper承诺提供透明的故障恢复,因为它可以容忍 Bookie服务器的故障。

关注微信获取最新动态