Building a Distributed Lock Revisited: Using Curator's InterProcessMutex

Posted on December 29, 2013 by Scott Leberknight

Last summer I wrote a series of blogs introducing Apache ZooKeeper, which is a distributed coordination service used in many open source projects like Hadoop, HBase, and Storm to manage clusters of machines. The fifth blog described how to use ZooKeeper to implement a distributed lock. In that blog I explained that the goals of a distributed lock are "to build a mutually exclusive lock between processes that could be running on different machines, possibly even on different networks or different data centers". I also mentioned that one significant benefit is that "clients know nothing about each other; they only know they need to use the lock to access some shared resource, and that they should not access it unless they own the lock." That blog described how to use the ZooKeeper WriteLock "recipe" that comes with ZooKeeper in the contrib modules to build a synchronous BlockingWriteLock with easier semantics in which you simply call a lock() method to acquire the lock, and call unlock() to release the lock. Earlier in the series, we learned how to connect to ZooKeeper in the Group Membership Example blog using a Watcher and a CountDownLatch to block until the SyncConnected event was received. All that code wasn't terribly complex but it also was fairly low-level, especially if you include the need to block until a connection event is received and the non-trival implementation of the WriteLock recipe.

In the wrap-up blog I mentioned the Curator project, originally opened sourced by Netflix and later donated by them to Apache. The Curator wiki describes Curator as "a set of Java libraries that make using Apache ZooKeeper much easier". In this blog we'll see how to use Curator to implement a distributed lock, without needing to write any of our own wrapper code for obtaining a connection or to implement the lock itself. In the distributed lock blog we saw how sequential ephemeral child nodes (e.g. child-lock-node-0000000000, child-lock-node-0000000001, child-lock-node-0000000002, etc.) are created under a persistent parent lock node. The client holding the lock on the child with the lowest sequence number owns the lock. We saw several potential gotchas: first, how does a client know whether it successfully created a child node in the case of a partial failure, i.e. a (temporary) connection loss, and how does it know which child node it created, i.e. the child with which sequence number? I noted that a solution was the embed the ZooKeeper session ID in the child node such that the client can easily identify the child node it created. Jordan Zimmerman (the creator of Curator) was kind enough to post a comment to that blog noting that using the session ID is "not ideal" because it "prevents the same ZK connection from being used in multiple threads for the same lock". He said "It's much better to use a GUID. This is what Curator uses."

Second, we noted that distributed lock clients should watch only the immediately preceding child node rather than the parent node in order to prevent a "herd effect" in which every client is notified for every single child node event, when in reality each client only need care about the child immediately preceding it. Curator handles both these cases plus adds other goodies such as a retry policy for connecting to ZooKeeper. So without further comment, lets see how to use a distributed lock in Curator.

First, we'll need to get an instance of CuratorFramework - this is an interface that represents a higher level abstraction API for working with ZooKeeper. It provides automatic connection management including retry operations, a fluent-style API, as well as a bunch of recipes you can use out-of-the-box for distributed data structures like locks, queues, leader election, etc. We can use the CuratorFrameworkFactory and a RetryPolicy of our choosing to get one.

String hosts = "host-1:2181,host-2:2181,host-3:2181";
int baseSleepTimeMills = 1000;
int maxRetries = 3;

RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMills, maxRetries);
CuratorFramework client = CuratorFrameworkFactory.newClient(hosts, retryPolicy);
client.start();

In the above code we first create a retry policy - in this case an ExponentialBackoffRetry using a base sleep time of 1000 milliseconds and up to 3 retries. Then we can use the CuratorFrameworkFactory.newClient() to obtain an instance of CuratorFramework. Finally we need to call start() (note we'll need to call close() when we're done with the client). Now that we have a client instance, we can use an implementation of InterProcessLock to create our distributed lock. The simplest one is the InterProcessMutex which is a re-entrant mutual exclusion lock that works across JVMs, by using ZooKeeper to hold the lock.

InterProcessLock lock = new InterProcessMutex(client, lockPath);
lock.acquire();
try {
  // do work while we hold the lock
} catch (Exception ex) {
  // handle exceptions as appropriate
} finally {
  lock.release();
}

The above code simply creates a InterProcessMutex for a specific lock path (lockPath), acquires the lock, does some work, and releases the lock. In this case acquire() will block until the lock becomes available. In many cases blocking indefinitely won't be a Good Thing, and Curator provides an overloaded version of acquire() which requires a maximum time to wait for the lock and returns true if the lock is obtained within the time limit and false otherwise.

InterProcessLock lock = new InterProcessMutex(client, lockPath);
if (lock.acquire(waitTimeSeconds, TimeUnit.SECONDS)) {
  try {
    // do work while we hold the lock
  } catch (Exception ex) {
    // handle exceptions as appropriate
  } finally {
    lock.release();
  }
} else {
  // we timed out waiting for lock, handle appropriately
}

The above code demonstrates using the timeout version of acquire. The code is slightly more complex since you need to check whether the lock is acquired or whether we timed out waiting for the lock. Regardless of which version of acquire() you use, you'll need to release() the lock in a finally block. The final piece is to remember to close the client when you're done with it:

client.close();

And that's pretty much it for using Curator's InterProcessMutex to implement a distributed lock. All the complexity in handling connection management, partial failures, the "herd effect", automatic retries, and so on are handled by the higher level Curator APIs. To paraphrase Stu Halloway, you should always understand at least one layer beneath the one you're working at - in this case you should have a decent understanding of how ZooKeeper works under the covers and some of the potential issues of distributed computing. But having said that, go ahead and use Curator to work at a higher level of abstraction and gain the benefits of all the distributed computing experience at Netflix as well as Yahoo (which created ZooKeeper). And last, Happy New Year 2014!



Post a Comment:
Comments are closed for this entry.