Distributed Coordination With ZooKeeper Part 1: Introduction

Posted on June 25, 2013 by Scott Leberknight

This is the first in a series of blogs that introduce Apache ZooKeeper. This blog provides an introduction to ZooKeeper and its core concepts and use cases. In later blogs you will test drive ZooKeeper, see some examples of the Java API, learn about its architecture, build a distributed data structure which can be used across independent processes and machines, and finally get a brief introduction to a higher-level API on top of ZooKeeper.

Consider a distributed system with multiple servers, each of which is responsible for holding data and performing operations on that data. This could be a distributed search engine, a distributed build system, or even something like Hadoop which has both a distributed file system and a Map/Reduce data processing framework that operates on the data in the file system. How would you determine which servers are alive and operating at any given moment in time? Or, how would you determine which servers are available to process a build in a distributed build system? Or for a distributed search system how would you know which servers are available to hold data and handle search requests? Most importantly, how would you do these things reliably in the face of the difficulties of distributed computing such as network failures, bandwidth limitations, variable latency connections, security concerns, and anything else that can go wrong in a networked environment, perhaps even across multiple data centers?

These and similar questions are the focus of Apache ZooKeeper, which is a fast, highly available, fault tolerant, distributed coordination service. Using ZooKeeper you can build reliable, distributed data structures for group membership, leader election, coordinated workflow, and configuration services, as well as generalized distributed data structures like locks, queues, barriers, and latches.

Many well-known and successful projects already rely on ZooKeeper. Just a few of them include HBase, Hadoop 2.0, Solr Cloud, Neo4J, Apache Blur (incubating), and Accumulo.

Core Concepts

ZooKeeper is a distributed, hierarchical file system that facilitates loose coupling between clients and provides an eventually consistent view of its znodes, which are like files and directories in a traditional file system. It provides basic operations such as creating, deleting, and checking existence of znodes. It provides an event-driven model in which clients can watch for changes to specific znodes, for example if a new child is added to an existing znode. ZooKeeper achieves high availability by running multiple ZooKeeper servers, called an ensemble, with each server holding an in-memory copy of the distributed file system to service client read requests. Each server also holds a persistent copy on disk.

One of the servers is elected as the leader, and all other servers are followers. The leader is responsible for all writes and for broadcasting changes to to followers. Assuming a majority of followers commit a change successfully, the write succeeds and the data is then durable even if the leader then fails. This means ZooKeeper is an eventually consistent system, because the followers may lag the leader by some small amount of time, hence clients might not always see the most up-to-date information. Importantly, the leader is not a master as in a master/slave architecture and thus is not a single point of failure; rather, if the leader dies, then the remaining followers hold an election for a new leader, and the new leader takes over where the old one left off.

Each client connects to ZooKeeper, passing in the list of servers in the ensemble. The client connects to one of the servers in the ensemble at random until a connection is established. Once connected, ZooKeeper creates a session with the client-specified timeout period. The ZooKeeper client automatically sends periodic heartbeats to keep the session alive if no operations are performed for a while, and automatically handles failover. If the ZooKeeper server a client is connected to fails, the client automatically detects this and tries to reconnect to a different server in the ensemble. The nice thing is that the same client session is retained during this failover event; however during failover it is possible that client operations could fail and, as with almost all ZooKeeper operations, client code must be vigilant and detect errors and deal with them as necessary.

Partial Failure

One of the fallacies of distributed computing is that the network is reliable. Having worked on a project for the past few years with multiple Hadoop, Apache Blur, and ZooKeeper clusters including hundreds of servers, I can definitely say from experience that the network is not reliable. Simply put, things break and you cannot assume the network is 100% reliable all the time. When designing distributed systems, you must keep this in mind and handle things you ordinarily would not even consider when building software for a single server. For example, assume a client sends an update to a server, but before the response is received the network connection is lost for a brief period. You need to ask several questions in this case. Did the message get through to the server? If it did, then did the operation actually complete successfully? Is it safe to retry an operation for which you don't even know whether it reached the server or if it failed at the server, in other words is the operation idempotent? You need to consider questions like these when building distributed systems. ZooKeeper cannot help with network problems or partial failures, but once you are aware of the kinds of problems which can arise, you are much better prepared to deal with problems when (not if) they occur. ZooKeeper provides certain guarantees regarding data consistency and atomicity that can aid you when building systems, as you will see later.

Conclusion to Part 1

In this blog we've learned that ZooKeeper is a distributed coordination service that facilitates loose coupling between distributed components. It is implemented as a distributed, hierarchical file system and you can use it to build distributed data structures such as locks, queues, and so on. In the next blog, we'll take a test drive of ZooKeeper using its command line shell.

References

Hadoop Presentation at NOVA/DC Java Users Group

Posted on May 09, 2011 by Scott Leberknight

Last Thursday (on Cinco de Mayo) I gave a presentation on Hadoop and Hive at the Nova/DC Java Users Group. As several people asked about getting the slides, I've shared them here on Slideshare. I also posted the presentation sample code on Github at basic-hadoop-examples.

What's in JDK 7 Lightning Talk Slides

Posted on April 16, 2011 by Scott Leberknight

Yesterday at the Near Infinity 2011 Spring Conference I gave a talk on CoffeeScript (see here) and a very short lightning talk on what exactly is in JDK 7. You can find the slides for the JDK 7 talk here if you're interested.

CoffeeScript Slides

Posted on April 15, 2011 by Scott Leberknight

Today is the Near Infinity Spring Conference. We have one conference in the fall and one in the spring for all our developers as well as invited guests. Today I gave a presentation on CoffeeScript and shared the slides here.

Introducing RJava

Posted on March 31, 2011 by Scott Leberknight

You’ve no doubt heard about JRuby, which lets you run Ruby code on the JVM. This is nice, but wouldn’t it be nicer if you could write Java code on a Ruby VM? This would let you take advantage of the power of Ruby 1.9’s new YARV (Yet Another Ruby VM) interpreter while letting you write code in a statically-typed language. Without further ado, I’d like to introduce RJava, which does just that!

RJava lets you write code in Java and run it on a Ruby VM! And you still get the full benefit of the Java compiler to ensure your code is 100% correct. Of course with Java you also get checked exceptions and proper interfaces and abstract classes to ensure compliance with your design. You no longer need to worry about whether an object responds to a random message, because the Java compiler will enforce that it does.

You get all this and more but on the power and flexibility of a Ruby VM. And because Java does not support closures, you are ensured that everything is properly designed since you’ll be able to define interfaces and then implement anonymous inner classes just like you’re used to doing! Even when JDK 8 arrives sometime in the future with lambdas, you can rest assured that they will be statically typed.

As a first example, let’s see how you could filter a collection in RJava to find only the even numbers from one to ten. In Ruby you’d probably write something like this:

evens = (1..10).find_all { |n| n % 2 == 0 }

With RJava, you’d write this:

List<Integer> evens = new ArrayList<Integer>();
for (int i = 1; i <= 10; i++) {
  if (i % 2 == 0) {
    evens.add(i);
  }
}

This example shows the benefits of declaring variables with specific types, how you can use interfaces (e.g. List in the example) when declaring variables, and shows how you also get the benefits of Java generics to ensure your collections are always type-safe. Without any doubt you know that “evens” is a List containing Integers and that “i” is an int, so you can sleep soundly knowing your code is correct. You can also see Java’s powerful “for” loop at work here, to easily traverse from 1 to 10, inclusive. Finally, you saw how to effectively use Java’s braces to organize code to clearly show blocks, and semi-colons ensure you always know where lines terminate.

I’ve just released RJava on GitHub, so go check it out. Please download RJava today and give it a try and let me know what you think!

Database-Backed Refreshable Beans with Groovy and Spring 3

Posted on October 30, 2010 by Scott Leberknight

In 2009 I published a two-part series of articles on IBM developerWorks entitled Groovier Spring. The articles showed how Spring supports implementing beans in Groovy whose behavior can be changed at runtime via the "refreshable beans" feature. This feature essentially detects when a Spring bean backed by a Groovy script has changed, recompiles it, and replaces the old bean with the new one. This feature is pretty powerful in certain scenarios, for example in PDF generation; mail or any kind of template generation; and as a way to implement runtime modifiable business rules. One specific use case I showed was how to implement PDF generation where the Groovy scripts reside in a database, allowing you to change how PDFs are generated by simply updating Groovy scripts in your database.

In order to load Groovy scripts from a database, I showed how to implement custom ScriptFactoryPostProcessor and ScriptSource classes. The CustomScriptFactoryPostProcessor extends the default Spring ScriptFactoryPostProcessor and overrides the convertToScriptSource method to recognize a database-based script, e.g. you could specify a script source of database:com/nearinfinity/demo/GroovyPdfGenerator.groovy. There is also DatabaseScriptSource that implements the ScriptSource interface and which knows how to load Groovy scripts from a database.

In order to put these pieces together, you need to do a bit of configuration. In the articles I used Spring 2.5.x which was current at the time in early 2009. The configuration looked like this:

<bean id="dataSource"
  class="org.springframework.jdbc.datasource.DriverManagerDataSource">
    <!-- set data source props, e.g. driverClassName, url, username, password... -->
</bean>

<bean id="scriptFactoryPostProcessor"
  class="com.nearinfinity.spring.scripting.support.CustomScriptFactoryPostProcessor">
    <property name="dataSource" ref="dataSource"/>
</bean>

<lang:groovy id="pdfGenerator"
  script-source="database:com/nearinfinity/demo/DemoGroovyPdfGenerator.groovy">
    <lang:property name="companyName" value="Database Groovy Bookstore"/>
</lang:groovy>

In Spring 2.5.x this works because the <lang:groovy> tag looks for a Spring bean with id "scriptFactoryPostProcessor" and if one exists it uses it, if not it creates it. In the above configuration we created our own "scriptFactoryPostProcessor" bean for <lang:groovy> tags to utilize. So all's well...until you move to Spring 3.x at which point the above configuration no longer works. This was pointed out to me by João from Brazil who tried the sample code in the articles with Spring 3.x, and it did not work. After trying a bunch of things, we eventually determined that in Spring 3.x the <lang:groovy> tag looks for a ScriptFactoryPostProcessor bean whose id is "org.springframework.scripting.config.scriptFactoryPostProcessor" not just "scriptFactoryPostProcessor." So once you figure this out, it is easy to change the above configuration to:

<bean id="org.springframework.scripting.config.scriptFactoryPostProcessor"
  class="com.nearinfinity.spring.scripting.support.CustomScriptFactoryPostProcessor">
    <property name="dataSource" ref="dataSource"/>
</bean>

<lang:groovy id="pdfGenerator"
  script-source="database:com/nearinfinity/demo/DemoGroovyPdfGenerator.groovy">
    <lang:property name="companyName" value="Database Groovy Bookstore"/>
</lang:groovy>

Then, everything works as expected and the Groovy scripts can reside in your database and be automatically reloaded when you change them. So if you download the article sample code as-is, it will work since the bundled Spring version is 2.5.4, but if you update to Spring 3.x then you'll need to modify the configuration in applicationContext.xml for example #7 (EX #7) as shown above to change the "scriptFactoryPostProcessor" bean to be "org.springframework.scripting.config.scriptFactoryPostProcessor." Note there is a scheduled JIRA issue SPR-5106 that will make the ScriptFactoryPostProcessor mechanism pluggable, so that you won't need to extend the default ScriptFactoryPostProcessor and replace the default bean, etc. But until then, this hack continues to work pretty well.

Rack Lightning Talk

Posted on October 21, 2010 by Scott Leberknight

I gave a short lightning talk on Rack tonight at the NovaRUG. It's on slideshare here. Rack is really cool because it makes creating modular functionality really easy. For example, if you want to have exceptions mailed to you you can use the Rack::MailExceptions middleware, or if you want responses compressed you can add one line of code to a Rails app to use Rack::Deflater. Cool.

Missing the each_line method in FakeFS version 0.2.1? Add it!

Posted on May 06, 2010 by Scott Leberknight

Recently we have been using the excellent FakeFS (fake filesystem) gem in some specs to test code that reads and writes files on the filesystem. We are using the latest release version of this gem which is 0.2.1 as I am writing this. Some of the code under test uses the IO each_line method to iterate lines in relatively largish files. But we found out quickly that is a problem, since in version 0.2.1 the FakeFS::File class does not extend StringIO and so you don't get all its methods such as each_line. (The version on master in GitHub as I write this does extend StringIO, but it is not yet released as a formal version.) As an example suppose we have the following code that prints out the size of each line in a file as stars (asterisks):

def lines_to_stars(file_path)
  File.open(file_path, 'r').each_line { |line| puts '*' * line.size }
end

Let's say we use FakeFS to create a fake file like this:

require 'fakefs/safe'
require 'stringio'

FakeFS.activate!

File.open('/tmp/foo.txt', 'w') do |f|
  f.write "The quick brown fox jumped over the lazy dog\n"
  f.write "The quick red fox jumped over the sleepy cat\n"
  f.write "Jack be nimble, Jack be quick, Jack jumped over the candle stick\n"
  f.write "Twinkle, twinkle little star, how I wonder what you are\n"
  f.write "The End."
end

So far, so good. But now if we call lines_to_stars we get an error:

NoMethodError: undefined method `each_line' for #<FakeFS::File:0x000001012c22b8>

Oops. No each_line. If you don't want to use an unreleased version of the gem, you can add each_line onto FakeFS::File using the following code:

module FakeFS
  class File
    def each_line
      File.readlines(self.path).each { |line| yield line }
    end
  end
end

Basically all it does is define each_line so that it reads all the lines from a (fake) file on the (fake) filesystem and then yields them up one by one, so you can have code under test that iterates a file and work as expected. So now calling lines_to_stars gives a nice pretty bar chart containing the line sizes represented by stars:

********************************************
********************************************
***************************************************************
*******************************************************
********

Since we're using RSpec, to make this work nicely we added the above code that defines each_line into a file named fakefs.rb in the spec/support directory, since spec_helper requires supporting files in the spec/support directory and its subdirectories. So now all our specs automatically get the each_line behavior when using FakeFS.

Using Keynote Remote When There's No Wi-Fi Around

Posted on January 24, 2010 by Scott Leberknight

I just bought Keynote Remote for my iPhone. It is very simple, has the option to display presenter notes, and is absurdly simple to link to Keynote and start controlling presentations. What was a little more involved was that I wanted to use it regardless of whether there is a wireless network around or not. Since Keynote Remote requires a wi-fi connection here's what I'm doing to get around this little annoyance. Basically all you need to do is set up an ad-hoc wireless network with your Mac, and have your iPhone connect to that network. Voila! So, when there's no wireless network available, I just use my handy-dandy Verizon Wireless USB760 Modem to connect to the Internet, and I can still control presentations with Keynote Remote.

Hibernate Performance Tuning Part 2 Article Published

Posted on December 21, 2009 by Scott Leberknight

I've just published the second article of a two-part series in the December 2009 NFJS Magazine on Hibernate Performance Tuning. Here's the abstract:

Tuning performance in Hibernate applications is all about reducing the number of database queries or eliminating them entirely using caching. In the first article in this two part series, you saw how to tune object retrieval using eager fetching techniques to optimize queries and avoid lazy-loads. In this second and final article, I’ll show you how inheritance strategy affects performance, how to eliminate queries using the Hibernate second-level cache, and show some simple but effective tools you can use to monitor and profile your applications.

If you are using Hibernate and want to know more about how inheritance affects performance, how to use the second-level cache, and some simple monitoring and profiling techniques, check it out and let me know what you think. Note that NFJS Magazine does require a subscription.