This is the third in a series of blogs that introduce Apache ZooKeeper. In the second blog, you took a test drive of ZooKeeper using its command-line shell. In this blog, we'll re-implement the group membership example using the ZooKeeper Java API.

Apache ZooKeeper is implemented in Java, and its native API is also Java. ZooKeeper also provides a C language API, and the distribution provides contrib modules for Perl, Python, and RESTful clients. The ZooKeeper APIs come in two flavors, synchronous or asynchronous. Which one you use depends on the situation. For example you might choose the asynchronous Java API if you are implementing a Java application to process a large number of child znodes independently of one another; in this case you could make good use of the asynchronous API to simultaneously launch all the independent tasks in parallel. On the other hand, if you are implementing simple tasks that perform sequential operations in ZooKeeper, the synchronous API is easier to use and might be a better fit in such cases.

For our group membership example, we'll use the synchronous Java API. The first thing we need to do is connect to ZooKeeper and get an instance of ZooKeeper, which is the main client API through which you perform operations like creating znodes, setting data on znodes, listing znodes, and so on. The ZooKeeper constructor launches a separate thread to connect, and returns immediately. As a result, you need to watch for the SyncConnected event which indicates when the connection has been established. Listing 1 shows code to connect to ZooKeeper, in which we use a CountDownLatch to block until we've received the connected event. (Sample code for this blog is available on GitHub at

Listing 1 - Connecting to ZooKeeper

public ZooKeeper connect(String hosts, int sessionTimeout)
        throws IOException, InterruptedException {
  final CountDownLatch connectedSignal = new CountDownLatch(1);
  ZooKeeper zk = new ZooKeeper(hosts, sessionTimeout, new Watcher() {
    public void process(WatchedEvent event) {
      if (event.getState() == Watcher.Event.KeeperState.SyncConnected) {
  return zk;

The next thing we need to do is create a znode for the group. As in the test drive, this znode should be persistent, so that it hangs around regardless of whether any clients are connected or not. Listing 2 shows creating a group znode.

Listing 2 - Creating the group znode

public void createGroup(String groupName)
        throws KeeperException, InterruptedException {
  String path = "/" + groupName;
            null /* data */,

Note in Listing 2 that we prepended a leading slash to the group name since ZooKeeper requires that all paths be absolute. The create operation takes arguments for the path, a byte[] for data which is optional, a list of ACLs (access control list) to control who can access the znode, and finally the type of znode, in this case persistent. Creating the group member znodes is almost identical to creating the group znode, except we need to create an ephemeral, sequential znode. Let's also say that we need to store some information about each member, so we'll set data on the member znodes. This is shown in Listing 3.

Listing 3 - Creating group member znodes with data

public String joinGroup(String groupName, String memberName, byte[] data)
        throws KeeperException, InterruptedException {
  String path = "/" + groupName + "/" + memberName + "-";
  String createdPath = zk.create(path,
  return createdPath;

Now that we can create the group allow members to join the group, it would be nice to have some way to monitor the group membership. To do this we'll first need to list children for the group znode, then set a watch on the group znode, and whenever the watch triggers an event, we'll query ZooKeeper for the group's (updated) members, as shown in Listing 4. This process continues in an infinite loop, hence the class name ListGroupForever.

Listing 4 - Listing a group's members indefinitely

public class ListGroupForever {
  private ZooKeeper zooKeeper;
  private Semaphore semaphore = new Semaphore(1);

  public ListGroupForever(ZooKeeper zooKeeper) {
    this.zooKeeper = zooKeeper;

  public static void main(String[] args) throws Exception {
    ZooKeeper zk = new ConnectionHelper().connect(args[0]);
    new ListGroupForever(zk).listForever(args[1]);

  public void listForever(String groupName)
          throws KeeperException, InterruptedException {
    while (true) {

  private void list(String groupName)
          throws KeeperException, InterruptedException {
    String path = "/" + groupName;
    List<String> children = zooKeeper.getChildren(path, new Watcher() {
      public void process(WatchedEvent event) {
        if (event.getType() == Event.EventType.NodeChildrenChanged) {
    if (children.isEmpty()) {
      System.out.printf("No members in group %s\n", groupName);

The ListGroupForever class in Listing 4 has some interesting characteristics. The listForever method loops infinitely and uses a semaphore to block until changes occur to the group node. The list method calls getChildren to actually retrieve the child nodes from ZooKeeper, and critically sets a Watcher to watch for changes of type NodeChildrenChanged. When the NodeChildrenChanged event occurs, the watcher releases the semaphore, which permits listForever to re-acquire the semaphore and then retrieve and display the updated group znodes. This process continues until ListGroupForever is terminated.

To round out the example, we'll create a method to delete the group. As shown in the test drive, ZooKeeper doesn't permit znodes that have children to be deleted, so we first need to delete all the children, and then delete the group (parent) znode. This is shown in Listing 5.

Listing 5 - Deleting a group

public void delete(String groupName)
        throws KeeperException, InterruptedException {
  String path = "/" + groupName;
  try {
    List<String> children = zk.getChildren(path, false);
    for (String child : children) {
      zk.delete(path + "/" + child, -1);
    zk.delete(path, -1);
  catch (KeeperException.NoNodeException e) {
    System.out.printf("Group %s does not exist\n", groupName);

When deleting a group, we passed -1 to the delete method to unconditionally delete the znodes. We could also have passed in a version, so that if we have the correct version number, the znode is deleted but otherwise we receive an optimistic locking violation in the form of a BadVersionException.

Conclusion to Part 3

In this third blog on ZooKeeper, we implemented a group membership example using the Java API. You saw how to connect to ZooKeeper; how to create persistent, ephemeral, and sequential znodes; how to list znodes and set watches to receive events; and finally how to delete znodes.

In the next blog, we'll back off from the code level and get an overview of ZooKeeper's architecture.


Distributed Coordination With ZooKeeper Part 2: Test Drive

Posted on June 27, 2013 by Scott Leberknight

This is the second in a series of blogs that introduce Apache ZooKeeper. In the first blog, you got an introduction to ZooKeeper and its core concepts. In this blog, you'll take a brief test drive of ZooKeeper using its command line shell. This is a really fast and convenient way to get up and running with ZooKeeper immediately.

To get an idea of some of the basic building blocks in Apache ZooKeeper, let's take a test drive. ZooKeeper comes with a command-line shell that you can connect to and interact with the service. The following listing shows connecting to the shell, listing the znodes at the root level, and creating a znode named /sample-group which will serve as a parent znode for some other znodes that we'll create in a moment. All paths in ZooKeeper must be absolute and begin with a /. The first argument to the create command is the path, while the second is the data that is associated with the znode. Note also that when a connection is established, the default watcher sends the SyncConnected event, which you see in the listing below.

$ ./
Connecting to localhost:2181
Welcome to ZooKeeper!
JLine support is enabled


WatchedEvent state:SyncConnected type:None path:null
[zk: localhost:2181(CONNECTED) 0] ls /
[zk: localhost:2181(CONNECTED) 1] create /sample-group a-sample-group
Created /sample-group
[zk: localhost:2181(CONNECTED) 2] ls /
[sample-group, zookeeper]

At this point we want to create some child znodes under /sample-group. ZooKeeper znodes can be either persistent or ephemeral. Persistent znodes are permanent and once created, stick around until they are explicitly deleted. On the other hand, ephemeral znodes exist only as long as the client who created them is alive; once the client goes away for any reason, all ephemeral znodes it created are automatically destroyed. As you might imagine, if we want to build a group membership service for a distributed system, the client ( which is a group member) should indicate its status via ephemeral znodes, so that if it dies, the znode representing its membership is destroyed thus indicating the client is no longer a member of the group. When we created the group, we created a persistent znode. To create an ephemeral znode we use the -e option. In addition, maybe we'd like to know the order in which clients joined our group. ZooKeeper znodes can be automatically and uniquely ordered by their parent. In the shell we use -s to indicate we want to create the child znode as a sequential znode. Note also that we named the child nodes /sample-group/child- in each case. When creating sequential znodes, it is typical to end the name with a dash, to which a unique, monotonically increasing integer is automatically appended.

[zk: localhost:2181(CONNECTED) 3] create -s -e /sample-group/child- data-1
Created /sample-group/child-0000000000
[zk: localhost:2181(CONNECTED) 4] create -s -e /sample-group/child- data-2
Created /sample-group/child-0000000001
[zk: localhost:2181(CONNECTED) 5] create -s -e /sample-group/child- data-3
Created /sample-group/child-0000000002

Now let's set a watch on the /sample-group znode in order to receive change notifications whenever a child znode is added or removed. Setting the watch lets us monitor the group for changes and react accordingly. For example, if we are building a distributed search engine and a server in the search cluster dies, we need to know about that event and move the data held by the (now dead) server across the remaining servers, assuming the data is stored redundantly such as in Hadoop. This is exactly what the Apache Blur distributed search engine does in order to ensure data is not lost and that the cluster continues operating when one or more servers is lost. In ZooKeeper you set watches on read operations, for example when listing a znode or getting its data. We'll list the children under /sample-group and set a watch, indicated by using true as the second argument.

[zk: localhost:2181(CONNECTED) 6] ls /sample-group true
[child-0000000001, child-0000000002, child-0000000000]

Now if we create another child znode, the watch event will fire and notify us that a NodeChildrenChanged event occurred.

[zk: localhost:2181(CONNECTED) 7] create -s -e /sample-group/child- data-4


WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/sample-group
Created /sample-group/child-0000000003

The event does not tell us what actually changed, however. To get the updated list of children we need to again list the contents of /sample-group. In addition, watchers are one-time events, and clients must re-register the watch to continue receiving change notifications. So if we now create another child znode, no watch will fire.

[zk: localhost:2181(CONNECTED) 8] create -s -e /sample-group/child- data-5
Created /sample-group/child-0000000004

To finish off our test drive, let's delete our test group.

[zk: localhost:2181(CONNECTED) 9] delete /sample-group
Node not empty: /sample-group

Oops. ZooKeeper won't allow znodes to be deleted if they have children. In addition updates, including deletes, are conditional upon a specific version, which is a form of optimistic locking that ensures a client update succeeds only if it passes the current version of the data. Otherwise the update fails with a BadVersionException. You can short-circuit the optimistic versioning behavior by passing -1 to updates, which tells ZooKeeper to perform the update unconditionally. So in order to delete our group, we first delete all the child znodes and then delete the group znode, all unconditionally.

[zk: localhost:2181(CONNECTED) 10] delete /sample-group/child-0000000000 -1
[zk: localhost:2181(CONNECTED) 11] delete /sample-group/child-0000000001 -1
[zk: localhost:2181(CONNECTED) 12] delete /sample-group/child-0000000002 -1
[zk: localhost:2181(CONNECTED) 13] delete /sample-group/child-0000000003 -1
[zk: localhost:2181(CONNECTED) 14] delete /sample-group/child-0000000004 -1
[zk: localhost:2181(CONNECTED) 15] delete /sample-group -1                 

In addition to the shell, ZooKeeper also provides commands referred to as the "four letter words". You issue the commands via telnet or nc (netcat). For example, let's ask ZooKeeper how it's feeling.

$ echo "ruok" | nc localhost 2181

You can also use the stat command to get basic statistics on ZooKeeper.

$ echo "stat" | nc localhost 2181
Zookeeper version: 3.4.5-1392090, built on 09/30/2012 17:52 GMT

Latency min/avg/max: 0/0/157
Received: 338
Sent: 337
Connections: 1
Outstanding: 0
Zxid: 0xb
Mode: standalone
Node count: 17

In this test drive, we've seen some basic but important aspects of ZooKeeper. We created persistent and sequential ephemeral znodes, set a watch and received a change notification event when a znode's children changed, and deleted znodes. We also saw how znodes can have associated data. When building real systems you obviously won't be using the command line shell to implement behavior, however, so let's translate this simple group membership example into Java code.

Conclusion to Part 2

In this second part of the ZooKeeper series of blogs, you took a test drive using the command-line shell available in ZooKeeper. You created both persistent and ephemeral znodes. You created the ephemeral znodes as children of the persistent znode, and made them sequential as well so that ZooKeeper maintains a monotonically increasing, unique order. Finally you saw how to delete znodes and use a few of the "four letter words" to check ZooKeeper's status.

In the next blog, we'll recreate the group example you've just seen using the ZooKeeper Java API.


Distributed Coordination With ZooKeeper Part 1: Introduction

Posted on June 25, 2013 by Scott Leberknight

This is the first in a series of blogs that introduce Apache ZooKeeper. This blog provides an introduction to ZooKeeper and its core concepts and use cases. In later blogs you will test drive ZooKeeper, see some examples of the Java API, learn about its architecture, build a distributed data structure which can be used across independent processes and machines, and finally get a brief introduction to a higher-level API on top of ZooKeeper.

Consider a distributed system with multiple servers, each of which is responsible for holding data and performing operations on that data. This could be a distributed search engine, a distributed build system, or even something like Hadoop which has both a distributed file system and a Map/Reduce data processing framework that operates on the data in the file system. How would you determine which servers are alive and operating at any given moment in time? Or, how would you determine which servers are available to process a build in a distributed build system? Or for a distributed search system how would you know which servers are available to hold data and handle search requests? Most importantly, how would you do these things reliably in the face of the difficulties of distributed computing such as network failures, bandwidth limitations, variable latency connections, security concerns, and anything else that can go wrong in a networked environment, perhaps even across multiple data centers?

These and similar questions are the focus of Apache ZooKeeper, which is a fast, highly available, fault tolerant, distributed coordination service. Using ZooKeeper you can build reliable, distributed data structures for group membership, leader election, coordinated workflow, and configuration services, as well as generalized distributed data structures like locks, queues, barriers, and latches.

Many well-known and successful projects already rely on ZooKeeper. Just a few of them include HBase, Hadoop 2.0, Solr Cloud, Neo4J, Apache Blur (incubating), and Accumulo.

Core Concepts

ZooKeeper is a distributed, hierarchical file system that facilitates loose coupling between clients and provides an eventually consistent view of its znodes, which are like files and directories in a traditional file system. It provides basic operations such as creating, deleting, and checking existence of znodes. It provides an event-driven model in which clients can watch for changes to specific znodes, for example if a new child is added to an existing znode. ZooKeeper achieves high availability by running multiple ZooKeeper servers, called an ensemble, with each server holding an in-memory copy of the distributed file system to service client read requests. Each server also holds a persistent copy on disk.

One of the servers is elected as the leader, and all other servers are followers. The leader is responsible for all writes and for broadcasting changes to to followers. Assuming a majority of followers commit a change successfully, the write succeeds and the data is then durable even if the leader then fails. This means ZooKeeper is an eventually consistent system, because the followers may lag the leader by some small amount of time, hence clients might not always see the most up-to-date information. Importantly, the leader is not a master as in a master/slave architecture and thus is not a single point of failure; rather, if the leader dies, then the remaining followers hold an election for a new leader, and the new leader takes over where the old one left off.

Each client connects to ZooKeeper, passing in the list of servers in the ensemble. The client connects to one of the servers in the ensemble at random until a connection is established. Once connected, ZooKeeper creates a session with the client-specified timeout period. The ZooKeeper client automatically sends periodic heartbeats to keep the session alive if no operations are performed for a while, and automatically handles failover. If the ZooKeeper server a client is connected to fails, the client automatically detects this and tries to reconnect to a different server in the ensemble. The nice thing is that the same client session is retained during this failover event; however during failover it is possible that client operations could fail and, as with almost all ZooKeeper operations, client code must be vigilant and detect errors and deal with them as necessary.

Partial Failure

One of the fallacies of distributed computing is that the network is reliable. Having worked on a project for the past few years with multiple Hadoop, Apache Blur, and ZooKeeper clusters including hundreds of servers, I can definitely say from experience that the network is not reliable. Simply put, things break and you cannot assume the network is 100% reliable all the time. When designing distributed systems, you must keep this in mind and handle things you ordinarily would not even consider when building software for a single server. For example, assume a client sends an update to a server, but before the response is received the network connection is lost for a brief period. You need to ask several questions in this case. Did the message get through to the server? If it did, then did the operation actually complete successfully? Is it safe to retry an operation for which you don't even know whether it reached the server or if it failed at the server, in other words is the operation idempotent? You need to consider questions like these when building distributed systems. ZooKeeper cannot help with network problems or partial failures, but once you are aware of the kinds of problems which can arise, you are much better prepared to deal with problems when (not if) they occur. ZooKeeper provides certain guarantees regarding data consistency and atomicity that can aid you when building systems, as you will see later.

Conclusion to Part 1

In this blog we've learned that ZooKeeper is a distributed coordination service that facilitates loose coupling between distributed components. It is implemented as a distributed, hierarchical file system and you can use it to build distributed data structures such as locks, queues, and so on. In the next blog, we'll take a test drive of ZooKeeper using its command line shell.