Distributed Coordination With ZooKeeper Part 2: Test Drive

Posted on June 27, 2013 by Scott Leberknight

This is the second in a series of blogs that introduce Apache ZooKeeper. In the first blog, you got an introduction to ZooKeeper and its core concepts. In this blog, you'll take a brief test drive of ZooKeeper using its command line shell. This is a really fast and convenient way to get up and running with ZooKeeper immediately.

To get an idea of some of the basic building blocks in Apache ZooKeeper, let's take a test drive. ZooKeeper comes with a command-line shell that you can connect to and interact with the service. The following listing shows connecting to the shell, listing the znodes at the root level, and creating a znode named /sample-group which will serve as a parent znode for some other znodes that we'll create in a moment. All paths in ZooKeeper must be absolute and begin with a /. The first argument to the create command is the path, while the second is the data that is associated with the znode. Note also that when a connection is established, the default watcher sends the SyncConnected event, which you see in the listing below.

$ ./zkCli.sh
Connecting to localhost:2181
Welcome to ZooKeeper!
JLine support is enabled

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0] ls /
[zookeeper]
[zk: localhost:2181(CONNECTED) 1] create /sample-group a-sample-group
Created /sample-group
[zk: localhost:2181(CONNECTED) 2] ls /
[sample-group, zookeeper]

At this point we want to create some child znodes under /sample-group. ZooKeeper znodes can be either persistent or ephemeral. Persistent znodes are permanent and once created, stick around until they are explicitly deleted. On the other hand, ephemeral znodes exist only as long as the client who created them is alive; once the client goes away for any reason, all ephemeral znodes it created are automatically destroyed. As you might imagine, if we want to build a group membership service for a distributed system, the client ( which is a group member) should indicate its status via ephemeral znodes, so that if it dies, the znode representing its membership is destroyed thus indicating the client is no longer a member of the group. When we created the group, we created a persistent znode. To create an ephemeral znode we use the -e option. In addition, maybe we'd like to know the order in which clients joined our group. ZooKeeper znodes can be automatically and uniquely ordered by their parent. In the shell we use -s to indicate we want to create the child znode as a sequential znode. Note also that we named the child nodes /sample-group/child- in each case. When creating sequential znodes, it is typical to end the name with a dash, to which a unique, monotonically increasing integer is automatically appended.

[zk: localhost:2181(CONNECTED) 3] create -s -e /sample-group/child- data-1
Created /sample-group/child-0000000000
[zk: localhost:2181(CONNECTED) 4] create -s -e /sample-group/child- data-2
Created /sample-group/child-0000000001
[zk: localhost:2181(CONNECTED) 5] create -s -e /sample-group/child- data-3
Created /sample-group/child-0000000002

Now let's set a watch on the /sample-group znode in order to receive change notifications whenever a child znode is added or removed. Setting the watch lets us monitor the group for changes and react accordingly. For example, if we are building a distributed search engine and a server in the search cluster dies, we need to know about that event and move the data held by the (now dead) server across the remaining servers, assuming the data is stored redundantly such as in Hadoop. This is exactly what the Apache Blur distributed search engine does in order to ensure data is not lost and that the cluster continues operating when one or more servers is lost. In ZooKeeper you set watches on read operations, for example when listing a znode or getting its data. We'll list the children under /sample-group and set a watch, indicated by using true as the second argument.

[zk: localhost:2181(CONNECTED) 6] ls /sample-group true
[child-0000000001, child-0000000002, child-0000000000]

Now if we create another child znode, the watch event will fire and notify us that a NodeChildrenChanged event occurred.

[zk: localhost:2181(CONNECTED) 7] create -s -e /sample-group/child- data-4

WATCHER::

WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/sample-group
Created /sample-group/child-0000000003

The event does not tell us what actually changed, however. To get the updated list of children we need to again list the contents of /sample-group. In addition, watchers are one-time events, and clients must re-register the watch to continue receiving change notifications. So if we now create another child znode, no watch will fire.

[zk: localhost:2181(CONNECTED) 8] create -s -e /sample-group/child- data-5
Created /sample-group/child-0000000004

To finish off our test drive, let's delete our test group.

[zk: localhost:2181(CONNECTED) 9] delete /sample-group
Node not empty: /sample-group

Oops. ZooKeeper won't allow znodes to be deleted if they have children. In addition updates, including deletes, are conditional upon a specific version, which is a form of optimistic locking that ensures a client update succeeds only if it passes the current version of the data. Otherwise the update fails with a BadVersionException. You can short-circuit the optimistic versioning behavior by passing -1 to updates, which tells ZooKeeper to perform the update unconditionally. So in order to delete our group, we first delete all the child znodes and then delete the group znode, all unconditionally.

[zk: localhost:2181(CONNECTED) 10] delete /sample-group/child-0000000000 -1
[zk: localhost:2181(CONNECTED) 11] delete /sample-group/child-0000000001 -1
[zk: localhost:2181(CONNECTED) 12] delete /sample-group/child-0000000002 -1
[zk: localhost:2181(CONNECTED) 13] delete /sample-group/child-0000000003 -1
[zk: localhost:2181(CONNECTED) 14] delete /sample-group/child-0000000004 -1
[zk: localhost:2181(CONNECTED) 15] delete /sample-group -1                 

In addition to the shell, ZooKeeper also provides commands referred to as the "four letter words". You issue the commands via telnet or nc (netcat). For example, let's ask ZooKeeper how it's feeling.

$ echo "ruok" | nc localhost 2181
imok

You can also use the stat command to get basic statistics on ZooKeeper.

$ echo "stat" | nc localhost 2181
Zookeeper version: 3.4.5-1392090, built on 09/30/2012 17:52 GMT
Clients:
 /0:0:0:0:0:0:0:1%0:63888[0](queued=0,recved=1,sent=0)

Latency min/avg/max: 0/0/157
Received: 338
Sent: 337
Connections: 1
Outstanding: 0
Zxid: 0xb
Mode: standalone
Node count: 17

In this test drive, we've seen some basic but important aspects of ZooKeeper. We created persistent and sequential ephemeral znodes, set a watch and received a change notification event when a znode's children changed, and deleted znodes. We also saw how znodes can have associated data. When building real systems you obviously won't be using the command line shell to implement behavior, however, so let's translate this simple group membership example into Java code.

Conclusion to Part 2

In this second part of the ZooKeeper series of blogs, you took a test drive using the command-line shell available in ZooKeeper. You created both persistent and ephemeral znodes. You created the ephemeral znodes as children of the persistent znode, and made them sequential as well so that ZooKeeper maintains a monotonically increasing, unique order. Finally you saw how to delete znodes and use a few of the "four letter words" to check ZooKeeper's status.

In the next blog, we'll recreate the group example you've just seen using the ZooKeeper Java API.

References

Distributed Coordination With ZooKeeper Part 1: Introduction

Posted on June 25, 2013 by Scott Leberknight

This is the first in a series of blogs that introduce Apache ZooKeeper. This blog provides an introduction to ZooKeeper and its core concepts and use cases. In later blogs you will test drive ZooKeeper, see some examples of the Java API, learn about its architecture, build a distributed data structure which can be used across independent processes and machines, and finally get a brief introduction to a higher-level API on top of ZooKeeper.

Consider a distributed system with multiple servers, each of which is responsible for holding data and performing operations on that data. This could be a distributed search engine, a distributed build system, or even something like Hadoop which has both a distributed file system and a Map/Reduce data processing framework that operates on the data in the file system. How would you determine which servers are alive and operating at any given moment in time? Or, how would you determine which servers are available to process a build in a distributed build system? Or for a distributed search system how would you know which servers are available to hold data and handle search requests? Most importantly, how would you do these things reliably in the face of the difficulties of distributed computing such as network failures, bandwidth limitations, variable latency connections, security concerns, and anything else that can go wrong in a networked environment, perhaps even across multiple data centers?

These and similar questions are the focus of Apache ZooKeeper, which is a fast, highly available, fault tolerant, distributed coordination service. Using ZooKeeper you can build reliable, distributed data structures for group membership, leader election, coordinated workflow, and configuration services, as well as generalized distributed data structures like locks, queues, barriers, and latches.

Many well-known and successful projects already rely on ZooKeeper. Just a few of them include HBase, Hadoop 2.0, Solr Cloud, Neo4J, Apache Blur (incubating), and Accumulo.

Core Concepts

ZooKeeper is a distributed, hierarchical file system that facilitates loose coupling between clients and provides an eventually consistent view of its znodes, which are like files and directories in a traditional file system. It provides basic operations such as creating, deleting, and checking existence of znodes. It provides an event-driven model in which clients can watch for changes to specific znodes, for example if a new child is added to an existing znode. ZooKeeper achieves high availability by running multiple ZooKeeper servers, called an ensemble, with each server holding an in-memory copy of the distributed file system to service client read requests. Each server also holds a persistent copy on disk.

One of the servers is elected as the leader, and all other servers are followers. The leader is responsible for all writes and for broadcasting changes to to followers. Assuming a majority of followers commit a change successfully, the write succeeds and the data is then durable even if the leader then fails. This means ZooKeeper is an eventually consistent system, because the followers may lag the leader by some small amount of time, hence clients might not always see the most up-to-date information. Importantly, the leader is not a master as in a master/slave architecture and thus is not a single point of failure; rather, if the leader dies, then the remaining followers hold an election for a new leader, and the new leader takes over where the old one left off.

Each client connects to ZooKeeper, passing in the list of servers in the ensemble. The client connects to one of the servers in the ensemble at random until a connection is established. Once connected, ZooKeeper creates a session with the client-specified timeout period. The ZooKeeper client automatically sends periodic heartbeats to keep the session alive if no operations are performed for a while, and automatically handles failover. If the ZooKeeper server a client is connected to fails, the client automatically detects this and tries to reconnect to a different server in the ensemble. The nice thing is that the same client session is retained during this failover event; however during failover it is possible that client operations could fail and, as with almost all ZooKeeper operations, client code must be vigilant and detect errors and deal with them as necessary.

Partial Failure

One of the fallacies of distributed computing is that the network is reliable. Having worked on a project for the past few years with multiple Hadoop, Apache Blur, and ZooKeeper clusters including hundreds of servers, I can definitely say from experience that the network is not reliable. Simply put, things break and you cannot assume the network is 100% reliable all the time. When designing distributed systems, you must keep this in mind and handle things you ordinarily would not even consider when building software for a single server. For example, assume a client sends an update to a server, but before the response is received the network connection is lost for a brief period. You need to ask several questions in this case. Did the message get through to the server? If it did, then did the operation actually complete successfully? Is it safe to retry an operation for which you don't even know whether it reached the server or if it failed at the server, in other words is the operation idempotent? You need to consider questions like these when building distributed systems. ZooKeeper cannot help with network problems or partial failures, but once you are aware of the kinds of problems which can arise, you are much better prepared to deal with problems when (not if) they occur. ZooKeeper provides certain guarantees regarding data consistency and atomicity that can aid you when building systems, as you will see later.

Conclusion to Part 1

In this blog we've learned that ZooKeeper is a distributed coordination service that facilitates loose coupling between distributed components. It is implemented as a distributed, hierarchical file system and you can use it to build distributed data structures such as locks, queues, and so on. In the next blog, we'll take a test drive of ZooKeeper using its command line shell.

References

Hadoop Presentation at NOVA/DC Java Users Group

Posted on May 09, 2011 by Scott Leberknight

Last Thursday (on Cinco de Mayo) I gave a presentation on Hadoop and Hive at the Nova/DC Java Users Group. As several people asked about getting the slides, I've shared them here on Slideshare. I also posted the presentation sample code on Github at basic-hadoop-examples.