Towards More Functional Java using Lambdas as Predicates

Posted on September 13, 2016 by Scott Leberknight

Previously I showed an example that transformed a map of query parameters into a SOLR search string. The pre-Java 8 code used a traditional for loop with a conditional and used a StringBuilder to incrementally build a string. The Java 8 code streamed over the map entries, mapping (transforming) each entry to a string of the form "key:value" and finally used a Collector to join those query fragments together. This is a common pattern in functional-style code, in which a for loop transforms one collection of objects into a collection of different objects, optionally filters some of them out, and optionally reduce the collection to a single element. These are common patterns in the functional style - map, filter, reduce, etc. You can almost always replace a for loop with conditional filtering and reduction into a Java 8 stream with map, filter, and reduce (collect) operations.

But in addition to the stream API, Java 8 also introduced some nice new API methods that make certain things much simpler. For example, suppose we have the following method to remove all map entries for a given set of keys. In the example code, dataCache is a ConcurrentMap and deleteKeys is the set of keys we want to remove from that cache. Here is the original code I came across:

public void deleteFromCache(Set<String> deleteKeys) {
    Iterator<Map.Entry<String, Object>> iterator = dataCache.entrySet().iterator();
    while (iterator.hasNext()) {
        Map.Entry<String, Object> entry = iterator.next();
        if (deleteKeys.contains(entry.getKey())) {
            iterator.remove();
        }
    }
}

Now, you could argue there are better ways to do this, e.g. iterate the delete keys and remove each mapping using the Map#remove(Object key) method. For example:

public void deleteFromCache(Set<String> deleteKeys) {
    for (String deleteKey : deleteKeys) {
        dataCache.remove(deleteKey);
    }
}

The code using the for loop certainly seems cleaner than using the Iterator in this case, though both are functionally equivalent. Can we do better? Java 8 introduced the removeIf method as a default method, not in Map but instead in the Collection interface. This new method "removes all of the elements of this collection that satisfy the given predicate", to quote from the Javadocs. This method accepts one argument, a Predicate, which is a functional interface introduced in Java 8, and which can therefore be used in lambda expressions. Let's first implement this a regular old anonymous inner class, which you can always do even in Java 8. It looks like:

public void deleteFromCache(Set<String> deleteKeys) {
    dataCache.entrySet().removeIf(new Predicate<Map.Entry<String, Object>>() {
        @Override
        public boolean test(Map.Entry<String, Object> entry) {
            return deleteKeys.contains(entry.getKey());
        }
    });
}

As you can see, we first get the map's entry set via the entrySet method and call removeIf on it, supplying a Predicate that tests whether the set of deleteKeys contains the entry key. If this test returns true, the entry is removed. Since Predicate is annotated with @FunctionalInterface it can act as a lambda expression, a method reference, or a constructor reference according to the Javadoc. So let's take the first step and convert the anonymous inner class into a lambda expression:

public void deleteFromCache(Set<String> deleteKeys) {
    dataCache.entrySet().removeIf((Map.Entry<String, Object> entry) ->
        deleteKeys.contains(entry.getKey()));
}

In the above, we've replaced the anonymous class with a lambda expression that takes a single Map.Entry argument. But, Java 8 can infer the argument types of lambda expressions, so we can remove the explicit (and a bit noisy) type declarations, leaving us with the following cleaner code:

public void deleteFromCache(Set<String> deleteKeys) {
    dataCache.entrySet().removeIf(entry -> deleteKeys.contains(entry.getKey()));
}

This code is quite a bit nicer than the original code using an explicit Iterator. But what about compared to the second code example that looped through the keys using a simple for loop, and calling remove to remove each element? The lines of code really aren't that different, so assuming they are functionally equivalent then perhaps it is just a style preference. The explicit for loop is a traditional imperative style, whereas the removeIf has a more functional flavor to it. If you look at the actual implementation of removeIf in the Collection interface, it actually uses an Iterator under the covers, just as with the first example in this post.

So practically there is no difference in functionality. But, removeIf could theoretically be implemented for certain types of collections to perform the operation in parallel, and perhaps only for collections over a certain size where it can be shown that parallelizing the operation has benefits. But this simple example is really more about separation of concerns, i.e. separating the logic of traversing the collection from the logic that determines whether or not an element is removed.

For example, if a code base needs to remove elements from collections in many difference places, chances are good that it will end up having similar loop traversal logic intertwined with remove logic in many different places. In contrast, using the removeIf function leads to only having the remove logic in the different locations - and the removal logic is really your business logic. And, if at some later point in time the traversal logic in the Java collections framework were to be improved somehow, e.g. parallelized for large collections, then all the locations using that function automatically receive the same benefit, whereas code that combines the traversal and remove logic using explicit Iterator or loops would not.

In this case, and many others, I'd argue the separation of concerns is a much better reason to prefer functional style to imperative style. Separation of concerns leads to better, cleaner code and easier code re-use precisely since those concerns can be implemented separately, and also tested separately, which results in not only cleaner production code but also cleaner test code. All of which leads to more maintainable code, which means new features and enhancements to existing code can be accomplished faster and with less chance of breaking existing code. Until the next post in this ad-hoc series on Java 8 features and a functional style, happy coding!

This blog was originally published on the Fortitude Technologies blog here.

Towards more functional Java using Streams and Lambdas

Posted on August 23, 2016 by Scott Leberknight

In the last post I showed how the Java 7 try-with-resources feature reduces boilerplate code, but probably more importantly how it removes errors related to unclosed resources, thereby eliminating an entire class of errors. In this post, the first in an ad-hoc series on Java 8 features, I'll show how the stream API can reduce the lines of code, but also how it can make the code more readable, maintainable, and less error-prone.

The following code is from a simple back-end service that lets us query metadata about messages flowing through various systems. It takes a map of key-value pairs and creates a Lucene query that can be submitted to SOLR to obtain results. It is primarily used by developers to verify behavior in a distributed system, and it does not support very sophisticated queries, since it only ANDs the key-value pairs together to form the query. For example, given a parameter map containing the (key, value) pairs (lastName, Smith) and (firstName, Bob), the method would generate the query "lastName:Smith AND firstName:Bob". As I said, not very sophisticated.

Here is the original code (where AND, COLON, and DEFAULT_QUERY are constants):

public String buildQueryString(Map<String, String> parameters) {
    int count = 0;
    StringBuilder query = new StringBuilder();

    for (Map.Entry<String, String> entry : parameters.entrySet()) {
        if (count > 0) {
            query.append(AND);
        }
        query.append(entry.getKey());
        query.append(COLON);
        query.append(entry.getValue());
        count++;
    }

    if (parameters.size() == 0) {
        query.append(DEFAULT_QUERY);
    }

    return query.toString();
}

The core business logic should be very simple, since we only need to iterate the parameter map, join the keys and values with a colon, and finally join them together. But the code above, while not terribly hard to understand, has a lot of noise. First off, it uses two mutable variables (count and query) that are modified within the for loop. The first thing in the loop is a conditional that is needed to determine whether we need to append the AND constant, as we only want to do that after the first key-value pair is added to the query. Next, joining the keys and values is done by concatenating them, one by one, to the StringBuilder holding the query. Finally the count must be incremented so that in subsequent loop iterations, we properly include the AND delimiter. After the loop there is another conditional which appends DEFAULT_QUERY if there are no parameters, and then we finally convert the StringBuilder to a String and return it.

Here is the buildQueryString method after refactoring it to use the Java 8 stream API:

public String buildQueryString(Map<String, String> parameters) {
    if (parameters.isEmpty()) {
        return DEFAULT_QUERY;
    }

    return parameters.entrySet().stream()
            .map(entry -> String.join(COLON, entry.getKey(), entry.getValue()))
            .collect(Collectors.joining(AND));
}

This code does the exact same thing, but in only 6 lines of code (counting the map and collect lines as separate even though technically they are part of the stream call chain) instead of 15. But just measuring lines of code isn't everything. The main difference here is the lack of mutable variables, no external iteration via explicit looping constructs, and no conditional statements other than the empty check which short circuits and returns DEFAULT_QUERY when there are no parameters. The code reads like a functional declaration of what we want to accomplish: stream over the parameters, convert each (key, value) to "key:value" and join them all together using the delimiter AND.

The specific Java 8 features we've used here start with the stream() method to convert the map entry set to a Java 8 java.util.stream.Stream. We then use the map operation on the stream, which applies a function (String.join) to each element (Map.Entry) in the stream. Finally, we use the collect method to reduce the elements using the joining collector into the resulting string that is the actual query we wanted to build. In the map method we've also made use of a lambda expression to specify exactly what transformation to perform on each map entry.

By removing explicit iteration and mutable variables, the code is more readable, in that a developer seeing this code for the first time will have an easier and quicker time understanding what it does. Note that much of the how it does things has been removed, for example the iteration is now implicit via the Stream, and the joining collector now does the work of inserting a delimiter between the elements. You're now declaring what you want to happen, instead of having to explicitly perform all the tedium yourself. This is more of a functional style than most Java developers are used to, and at first it can be a bit jarring, but as you practice and get used to it, the more you'll probably like it and you'll find youself able to read and write this style of code much more quickly than traditional code with lots of loops and conditionals. Generally there is also less code than when using traditional looping and control structures, which is another benefit for maintenance. I won't go so far as to say Java 8 is a functional language like Clojure or Haskell - since it isn't - but code like this has a more functional flavor to it.

There is now a metric ton of content on the internet related to Java 8 streams, but in case this is all new to you, or you're just looking for a decent place to begin learning more in-depth, the API documentation for the java.util.stream package is a good place to start. Venkat Subramaniam's Functional Programming in Java is another good resource, and at less than 200 pages can be digested pretty quickly. And for more on lambda expressions, the Lambda Expressions tutorial in the official Java Tutorials is a decent place to begin. In the next post, we'll see another example where a simple Java 8 API addition combined with a lambda expression simplifies code, making it more readable and maintainable.

This blog was originally published on the Fortitude Technologies blog here.

Reduce Java boilerplate using try-with-resources

Posted on August 11, 2016 by Scott Leberknight

Java 8 has been out for a while, and Java 7 has been out even longer. But even so, many people still unfortunately are not taking advantage of some of the new features, many of which make reading and writing Java code much more pleasant. For example, Java 7 introduced some relatively simple things like strings in switch statements, underscores in numeric literals (e.g. 1_000_000 is easier to read and see the magnitude than just 1000000), and the try-with-resources statement. Java 8 went a lot further and introduced lambda expressions, the streams API, a new date/time API based on the Joda Time library, Optional, and more.

In this blog and in a few subsequent posts, I will take a simple snippet of code from a real project, and show what the code looked like originally and what it looked like after refactoring it to be more readable and maintainable. To start, this blog will actually tackle the try-with-resources statement introduced in Java 7. Many people even in 2016 still seem not to be aware of this statement, which not only makes the code less verbose, but also eliminates an entire class of errors resulting from failure to close I/O or other resources.

Without further ado (whatever ado actually means), here is a method that was used to check port availability when starting up services.

public boolean isPortAvailable(final int port) {
    ServerSocket serverSocket = null;
    DatagramSocket dataSocket = null;

    try {
        serverSocket = new ServerSocket(port);
        serverSocket.setReuseAddress(true);
        dataSocket = new DatagramSocket(port);
        dataSocket.setReuseAddress(true);
        return true;
    } catch (IOException e) {
        return false;
    } finally {
        if (dataSocket != null) {
            dataSocket.close();
        }

        if (serverSocket != null) {
            try {
                serverSocket.close();
            } catch (IOException e) {
                // ignored
            }
        }
    }
}

The core logic for the above code is pretty simple: open a ServerSocket and a DatagramSocket and if both opened without throwing an exception, then the port is open. It's all the extra boilerplate code and exception handling that makes the code so lengthy and error-prone, because we need to make sure to close the sockets in the finally block, being careful to first check they are not null. For good measure, the ServerSocket#close method throws yet another IOException, which we simply ignore but are required to catch nonetheless. A lot of extra code which obscures the actual simple core of the code.

Here's the refactored version which makes use of the try-with-resources statement from Java 7.

public boolean isPortAvailable(final int port) {
    try (ServerSocket serverSocket = new ServerSocket(port); 
         DatagramSocket dataSocket = new DatagramSocket(port)) {
        serverSocket.setReuseAddress(true);
        dataSocket.setReuseAddress(true);
        return true;
    } catch (IOException e) {
        return false;
    }
}

As you can hopefully see, this code has the same core logic, but much less of the boilerplate. There is not only less code (7 lines instead of 22), but the code is much more readable since only the core logic remains. We are still catching the IOException that can be thrown by the ServerSocket and DatagramSocket constructors, but we no longer need to deal with the routine closing of those socket resources. The try-with-resources statement does that task for us, automatically closing any resources opened in the declaration statement that immediately follows the try keyword.

The one catch is that the declared resources must implement the AutoCloseable interface, which itself extends Closeable. Since the Java APIs make extensive use of Closeable and AutoCloseable this means that most things you'll want to use can be handled via try-with-resources. Classes that don't implement AutoCloseable cannot be used directly in try-with-resources statments. For example, if you are unfortunate enough to still need to deal with XML, for example if you need to use the old-school XMLStreamReader then you are out of luck since it doesn't implement Closeable or AutoCloseable. I generally fix those types of things by creating a small wrapper/decorator class, e.g. CloseableXMLStreamReader, but sometimes it simply isn't worth the trouble unless you are using it in many difference places.

For more information on try-with-resources, the Java tutorials on Oracle's website has a more in-depth article here. In subsequent posts, I'll show some before/after code that makes use of Java 8 features such as the stream API and lambda expressions.

This blog was originally published on the Fortitude Technologies blog here.

Slides for RESTful Web Services with Jersey presentation

Posted on June 10, 2014 by Scott Leberknight

While teaching a course on web development which included Ruby on Rails and Java segments, we used Jersey to expose a simple web services which the Rails application consumed. I put together a presentation on Jersey that I recently gave. Here are the slides:

RESTful Web Services with Jersey from Scott Leberknight

Slides for httpie presentation

Posted on June 09, 2014 by Scott Leberknight

I've used cURL for a long time but I can never seem to remember all the various flags and settings. Recently I came across httpie which is a simple command line tool for accessing HTTP resources. Here are the presentation slides:

httpie from Scott Leberknight

Building a Distributed Lock Revisited: Using Curator's InterProcessMutex

Posted on December 29, 2013 by Scott Leberknight

Last summer I wrote a series of blogs introducing Apache ZooKeeper, which is a distributed coordination service used in many open source projects like Hadoop, HBase, and Storm to manage clusters of machines. The fifth blog described how to use ZooKeeper to implement a distributed lock. In that blog I explained that the goals of a distributed lock are "to build a mutually exclusive lock between processes that could be running on different machines, possibly even on different networks or different data centers". I also mentioned that one significant benefit is that "clients know nothing about each other; they only know they need to use the lock to access some shared resource, and that they should not access it unless they own the lock." That blog described how to use the ZooKeeper WriteLock "recipe" that comes with ZooKeeper in the contrib modules to build a synchronous BlockingWriteLock with easier semantics in which you simply call a lock() method to acquire the lock, and call unlock() to release the lock. Earlier in the series, we learned how to connect to ZooKeeper in the Group Membership Example blog using a Watcher and a CountDownLatch to block until the SyncConnected event was received. All that code wasn't terribly complex but it also was fairly low-level, especially if you include the need to block until a connection event is received and the non-trival implementation of the WriteLock recipe.

In the wrap-up blog I mentioned the Curator project, originally opened sourced by Netflix and later donated by them to Apache. The Curator wiki describes Curator as "a set of Java libraries that make using Apache ZooKeeper much easier". In this blog we'll see how to use Curator to implement a distributed lock, without needing to write any of our own wrapper code for obtaining a connection or to implement the lock itself. In the distributed lock blog we saw how sequential ephemeral child nodes (e.g. child-lock-node-0000000000, child-lock-node-0000000001, child-lock-node-0000000002, etc.) are created under a persistent parent lock node. The client holding the lock on the child with the lowest sequence number owns the lock. We saw several potential gotchas: first, how does a client know whether it successfully created a child node in the case of a partial failure, i.e. a (temporary) connection loss, and how does it know which child node it created, i.e. the child with which sequence number? I noted that a solution was the embed the ZooKeeper session ID in the child node such that the client can easily identify the child node it created. Jordan Zimmerman (the creator of Curator) was kind enough to post a comment to that blog noting that using the session ID is "not ideal" because it "prevents the same ZK connection from being used in multiple threads for the same lock". He said "It's much better to use a GUID. This is what Curator uses."

Second, we noted that distributed lock clients should watch only the immediately preceding child node rather than the parent node in order to prevent a "herd effect" in which every client is notified for every single child node event, when in reality each client only need care about the child immediately preceding it. Curator handles both these cases plus adds other goodies such as a retry policy for connecting to ZooKeeper. So without further comment, lets see how to use a distributed lock in Curator.

First, we'll need to get an instance of CuratorFramework - this is an interface that represents a higher level abstraction API for working with ZooKeeper. It provides automatic connection management including retry operations, a fluent-style API, as well as a bunch of recipes you can use out-of-the-box for distributed data structures like locks, queues, leader election, etc. We can use the CuratorFrameworkFactory and a RetryPolicy of our choosing to get one.

String hosts = "host-1:2181,host-2:2181,host-3:2181";
int baseSleepTimeMills = 1000;
int maxRetries = 3;

RetryPolicy retryPolicy = new ExponentialBackoffRetry(baseSleepTimeMills, maxRetries);
CuratorFramework client = CuratorFrameworkFactory.newClient(hosts, retryPolicy);
client.start();

In the above code we first create a retry policy - in this case an ExponentialBackoffRetry using a base sleep time of 1000 milliseconds and up to 3 retries. Then we can use the CuratorFrameworkFactory.newClient() to obtain an instance of CuratorFramework. Finally we need to call start() (note we'll need to call close() when we're done with the client). Now that we have a client instance, we can use an implementation of InterProcessLock to create our distributed lock. The simplest one is the InterProcessMutex which is a re-entrant mutual exclusion lock that works across JVMs, by using ZooKeeper to hold the lock.

InterProcessLock lock = new InterProcessMutex(client, lockPath);
lock.acquire();
try {
  // do work while we hold the lock
} catch (Exception ex) {
  // handle exceptions as appropriate
} finally {
  lock.release();
}

The above code simply creates a InterProcessMutex for a specific lock path (lockPath), acquires the lock, does some work, and releases the lock. In this case acquire() will block until the lock becomes available. In many cases blocking indefinitely won't be a Good Thing, and Curator provides an overloaded version of acquire() which requires a maximum time to wait for the lock and returns true if the lock is obtained within the time limit and false otherwise.

InterProcessLock lock = new InterProcessMutex(client, lockPath);
if (lock.acquire(waitTimeSeconds, TimeUnit.SECONDS)) {
  try {
    // do work while we hold the lock
  } catch (Exception ex) {
    // handle exceptions as appropriate
  } finally {
    lock.release();
  }
} else {
  // we timed out waiting for lock, handle appropriately
}

The above code demonstrates using the timeout version of acquire. The code is slightly more complex since you need to check whether the lock is acquired or whether we timed out waiting for the lock. Regardless of which version of acquire() you use, you'll need to release() the lock in a finally block. The final piece is to remember to close the client when you're done with it:

client.close();

And that's pretty much it for using Curator's InterProcessMutex to implement a distributed lock. All the complexity in handling connection management, partial failures, the "herd effect", automatic retries, and so on are handled by the higher level Curator APIs. To paraphrase Stu Halloway, you should always understand at least one layer beneath the one you're working at - in this case you should have a decent understanding of how ZooKeeper works under the covers and some of the potential issues of distributed computing. But having said that, go ahead and use Curator to work at a higher level of abstraction and gain the benefits of all the distributed computing experience at Netflix as well as Yahoo (which created ZooKeeper). And last, Happy New Year 2014!

Handling Big Data with HBase Part 6: Wrap-up

Posted on December 19, 2013 by Scott Leberknight

This is the sixth and final blog in an introduction to Apache HBase. In the fifth part, we learned the basics of schema design in HBase and several techniques you can use to make scanning and filtering data more efficient. We also saw two different basic design schemes ("wide" and "tall") for storing information about the same entity, and briefly touched upon more advanced topics like adding full-text search and secondary indexes. In this part, we'll wrap up by summarizing the main points and then listing the (many) things we didn't cover in this introduction to HBase series.

HBase is a distributed database providing inherent scalability, performance, and fault-tolerance across potentially massive clusters of commodity servers. It provides the means to store and efficiently scan large swaths of data. We've looked at the HBase shell for basic interaction, covered the high-level HBase architecture and looked at using the Java API to create, get, scan, and delete data. We also considered how to design tables and row keys for efficient data access.

One thing you certainly noticed when working with the HBase Java API is that it is much lower level than other data APIs you might be used to working with, for example JDBC or JPA. You get the basics of CRUD plus scanning data, and that's about it. In addition, you work directly with byte arrays which is about as low-level as it gets when you're trying to retrieve information from a datastore.

If you are considering whether to use HBase, you should really think hard about how large the data is, i.e. does your app need to be able to accomodate ever-growing volumes of data? If it does, then you need to think hard about what that data looks like and what the most likely data access patterns will be, as this will drive your schema design and data access patters. For example, if you are designing a schema for a weather collection project, you will want to consider using a "tall" schema design such that the sensor readings for each sensor are split across rows as opposed to a "wide" design in which you keep adding columns to a column family in a single row. Unlike relational models in which you work hard to normalize data and then use SQL as a flexible way to join the data in various ways, with HBase you need to think much more up-front about the data access patterns, because retrieval by row key and table scans are the only two ways to access data. In other words, there is no joining across multiple HBase tables and projecting out the columns you need. When you retrieve data, you want to only ask HBase for the exact data you need.

Things We Didn't Cover

Now let's discuss a few things we didn't cover. First, coprocessors were a major addition to HBase in version 0.92, and were inspired by Google adding coprocessors to its Bigtable data store. You can, at a high level, think of coprocessors like triggers or stored procedures in relational databases. Basically you can have either trigger-like functionality via observers, or stored-procedure functionality via RPC endpoints. This allows many new things to be accomplished in an elegant fashion, for example maintaining secondary indexes via observing changes to data.

We showed basic API usage, but there is more advanced usage possible with the API. For example, you can batch data and provide much more advanced filtering behavior than a simple paging filter like we showed. There is also the concept of counters, which allows you to do atomic increments of numbers without requiring the client to perform explicit row locking. And if you're not really into Java, there are external APIs available via Thrift and REST gateways. There's also even a C/C++ client available and there are DSLs for Groovy, Jython, and Scala. These are all discussed on the HBase wiki.

Cluster setup and configuration was not covered at all, nor was performance tuning. Obviously these are hugely important topics and the references below are good starting places. With HBase you not only need to worry about tuning HBase configuration, but also tuning Hadoop (or more specifically, the HDFS file system). For these topics definitely start with the HBase References Guide and also check out HBase: The Definitive Guide by Lars George.

We also didn't cover how to Map/Reduce with HBase. Essentially you can use Hadoop's Map/Reduce framework to access HBase tables and perform tasks like aggregation in a Map/Reduce-style.

Last there is security (which I suppose should be expected to come last for a developer, right?) in HBase. There are two types of security I'm referring to here: first is access to HBase itself in order to create, read, update, and delete data, e.g. via requiring Kerberos authentication to connect to HBase. The second type of security is ACL-based access restrictions. HBase as of this writing you can restrict access via ACLs at the table and column family level. However, HBase Cell Security describes how cell-level security features similar to Apache Accumulo are being added to HBase and are scheduled to be released in version 0.98 in this issue (the current version as of this writing is 0.96).

Goodbye!

With this background, you can now consider whether HBase makes sense on future projects with Big Data and high scalability requirements. I hope you found this series of posts useful as an introduction to HBase.

References

Handling Big Data with HBase Part 5: Data Modeling (or, Life without SQL)

Posted on December 17, 2013 by Scott Leberknight

This is the fifth of a series of blogs introducing Apache HBase. In the fourth part, we saw the basics of using the Java API to interact with HBase to create tables, retrieve data by row key, and do table scans. This part will discuss how to design schemas in HBase.

HBase has nothing similar to a rich query capability like SQL from relational databases. Instead, it forgoes this capability and others like relationships, joins, etc. to instead focus on providing scalability with good performance and fault-tolerance. So when working with HBase you need to design the row keys and table structure in terms of rows and column families to match the data access patterns of your application. This is completely opposite what you do with relational databases where you start out with a normalized database schema, separate tables, and then you use SQL to perform joins to combine data in the ways you need. With HBase you design your tables specific to how they will be accessed by applications, so you need to think much more up-front about how data is accessed. You are much closer to the bare metal with HBase than with relational databases which abstract implementation details and storage mechanisms. However, for applications needing to store massive amounts of data and have inherent scalability, performance characteristics and tolerance to server failures, the potential benefits can far outweigh the costs.

In the last part on the Java API, I mentioned that when scanning data in HBase, the row key is critical since it is the primary means to restrict the rows scanned; there is nothing like a rich query like SQL as in relational databases. Typically you create a scan using start and stop row keys and optionally add filters to further restrict the rows and columns data returned. In order to have some flexibility when scanning, the row key should be designed to contain the information you need to find specific subsets of data. In the blog and people examples we've seen so far, the row keys were designed to allow scanning via the most common data access patterns. For the blogs, the row keys were simply the posting date. This would permit scans in ascending order of blog entries, which is probably not the most common way to view blogs; you'd rather see the most recent blogs first. So a better row key design would be to use a reverse order timestamp, which you can get using the formula (Long.MAX_VALUE - timestamp), so scans return the most recent blog posts first. This makes it easy to scan specific time ranges, for example to show all blogs in the past week or month, which is a typical way to navigate blog entries in web applications.

For the people table examples, we used a composite row key composed of last name, first name, middle initial, and a (unique) person identifier to distinguish people with the exact same name, separated by dashes. For example, Brian M. Smith with identifier 12345 would have row key smith-brian-m-12345. Scans for the people table can then be composed using start and end rows designed to retrieve people with specific last names, last names starting with specific letter combinations, or people with the same last name and first name initial. For example, if you wanted to find people whose first name begins with B and last name is Smith you could use the start row key smith-b and stop row key smith-c (the start row key is inclusive while the stop row key is exclusive, so the stop key smith-c ensures all Smiths with first name starting with the letter "B" are included). You can see that HBase supports the notion of partial keys, meaning you do not need to know the exact key, to provide more flexibility creating appropriate scans. You can combine partial key scans with filters to retrieve only the specific data needed, thus optimizing data retrieval for the data access patterns specific to your application.

So far the examples have involved only single tables containing one type of information and no related information. HBase does not have foreign key relationships like in relational databases, but because it supports rows having up to millions of columns, one way to design tables in HBase is to encapsulate related information in the same row - a "wide" table design. It is called a "wide" design since you are storing all information related to a row together in as many columns as there are data items. In our blog example, you might want to store comments for each blog. The "wide" way to design this would be to include a column family named comments and then add columns to the comment family where the qualifiers are the comment timestamp; the comment columns would look like comments:20130704142510 and comments:20130707163045. Even better, when HBase retrieves columns it returns them in sorted order, just like row keys. So in order to display a blog entry and its comments, you can retrieve all the data from one row by asking for the content, info, and comments column families. You could also add a filter to retrieve only a specific number of comments, adding pagination to them.

The people table column families could also be redesigned to store contact information such as separate addresses, phone numbers, and email addresses in column families allowing all of a person's information to be stored in one row. This kind of design can work well if the number of columns is relatively modest, as blog comments and a person's contact information would be. If instead you are modeling something like an email inbox, financial transactions, or massive amounts of automatically collected sensor data, you might choose instead to spread a user's emails, transactions, or sensor readings across multiple rows (a "tall" design) and design the row keys to allow efficient scanning and pagination. For an inbox the row key might look like <user_id>-<reversed_email_timestamp> which would permit easily scanning and paginating a user's inbox, while for financial transactions the row key might be <user_id>-<reversed_transaction_timestamp>. This kind of design can be called "tall" since you are spreading information about the same thing (e.g. readings from the same sensor, transactions in an account) across multiple rows, and is something to consider if there will be an ever-expanding amount of information, as would be the case in a scenario involving data collection from a huge network of sensors.

Designing row keys and table structures in HBase is a key part of working with HBase, and will continue to be given the fundamental architecture of HBase. There are other things you can do to add alternative schemes for data access within HBase. For example, you could implement full-text searching via Apache Lucene either within rows or external to HBase (search Google for HBASE-3529). You can also create (and maintain) secondary indexes to permit alternate row key schemes for tables; for example in our people table the composite row key consists of the name and a unique identifier. But if we desire to access people by their birth date, telephone area code, email address, or any other number of ways, we could add secondary indexes to enable that form of interaction. Note, however, that adding secondary indexes is not something to be taken lightly; every time you write to the "main" table (e.g. people) you will need to also update all the secondary indexes! (Yes, this is something that relational databases do very well, but remember that HBase is designed to accomodate a lot more data than traditional RDBMSs were.)

Conclusion to Part 5

In this part of the series, we got an introduction to schema design in HBase (without relations or SQL). Even though HBase is missing some of the features found in traditional RDBMS systems such as foreign keys and referential integrity, multi-row transactions, multiple indexes, and son on, many applications that need inherent HBase benefits like scaling can benefit from using HBase. As with anything complex, there are tradeoffs to be made. In the case of HBase, you are giving up some richness in schema design and query flexibility, but you gain the ability to scale to massive amounts of data by (more or less) simply adding additional servers to your cluster.

In the next and last part of this series, we'll wrap up and mention a few (of the many) things we didn't cover in these introductory blogs.

References

Handling Big Data with HBase Part 4: The Java API

Posted on December 15, 2013 by Scott Leberknight

This is the fourth of an introductory series of blogs on Apache HBase. In the third part, we saw a high level view of HBase architecture . In this part, we'll use the HBase Java API to create tables, insert new data, and retrieve data by row key. We'll also see how to setup a basic table scan which restricts the columns retrieved and also uses a filter to page the results.

Having just learned about HBase high-level architecture, now let's look at the Java client API since it is the way your applications interact with HBase. As mentioned earlier you can also interact with HBase via several flavors of RPC technologies like Apache Thrift plus a REST gateway, but we're going to concentrate on the native Java API. The client APIs provide both DDL (data definition language) and DML (data manipulation language) semantics very much like what you find in SQL for relational databases. Suppose we are going to store information about people in HBase, and we want to start by creating a new table. The following listing shows how to create a new table using the HBaseAdmin class.

Configuration conf = HBaseConfiguration.create();
HBaseAdmin admin = new HBaseAdmin(conf);
HTableDescriptor tableDescriptor = new HTableDescriptor(TableName.valueOf("people"));
tableDescriptor.addFamily(new HColumnDescriptor("personal"));
tableDescriptor.addFamily(new HColumnDescriptor("contactinfo"));
tableDescriptor.addFamily(new HColumnDescriptor("creditcard"));
admin.createTable(tableDescriptor);

The people table defined in preceding listing contains three column families: personal, contactinfo, and creditcard. To create a table you create an HTableDescriptor and add one or more column families by adding HColumnDescriptor objects. You then call createTable to create the table. Now we have a table, so let's add some data. The next listing shows how to use the Put class to insert data on John Doe, specifically his name and email address (omitting proper error handling for brevity).

Configuration conf = HBaseConfiguration.create();
HTable table = new HTable(conf, "people");
Put put = new Put(Bytes.toBytes("doe-john-m-12345"));
put.add(Bytes.toBytes("personal"), Bytes.toBytes("givenName"), Bytes.toBytes("John"));
put.add(Bytes.toBytes("personal"), Bytes.toBytes("mi"), Bytes.toBytes("M"));
put.add(Bytes.toBytes("personal"), Bytes.toBytes("surame"), Bytes.toBytes("Doe"));
put.add(Bytes.toBytes("contactinfo"), Bytes.toBytes("email"), Bytes.toBytes("john.m.doe@gmail.com"));
table.put(put);
table.flushCommits();
table.close();

In the above listing we instantiate a Put providing the unique row key to the constructor. We then add values, which must include the column family, column qualifier, and the value all as byte arrays. As you probably noticed, the HBase API's utility Bytes class is used a lot; it provides methods to convert to and from byte[] for primitive types and strings. (Adding a static import for the toBytes() method would cut out a lot of boilerplate code.) We then put the data into the table, flush the commits to ensure locally buffered changes take effect, and finally close the table. Updating data is also done via the Put class in exactly the same manner as just shown in the prior listing. Unlike relational databases in which updates must update entire rows even if only one column changed, if you only need to update a single column then that's all you specify in the Put and HBase will only update that column. There is also a checkAndPut operation which is essentially a form of optimistic concurrency control - the operation will only put the new data if the current values are what the client says they should be.

Retrieving the row we just created is accomplished using the Get class, as shown in the next listing. (From this point forward, listings will omit the boilerplate code to create a configuration, instantiate the HTable, and the flush and close calls.)

Get get = new Get(Bytes.toBytes("doe-john-m-12345"));
get.addFamily(Bytes.toBytes("personal"));
get.setMaxVersions(3);
Result result = table.get(get);

The code in the previous listing instantiates a Get instance supplying the row key we want to find. Next we use addFamily to instruct HBase that we only need data from the personal column family, which also cuts down the amount of work HBase must do when reading information from disk. We also specify that we'd like up to three versions of each column in our result, perhaps so we can list historical values of each column. Finally, calling get returns a Result instance which can then be used to inspect all the column values returned.

In many cases you need to find more than one row. HBase lets you do this by scanning rows, as shown in the second part which showed using a scan in the HBase shell session. The corresponding class is the Scan class. You can specify various options, such as the start and ending row key to scan, which columns and column families to include and the maximum versions to retrieve. You can also add filters, which allow you to implement custom filtering logic to further restrict which rows and columns are returned. A common use case for filters is pagination. For example, we might want to scan through all people whose last name is Smith one page (e.g. 25 people) at a time. The next listing shows how to perform a basic scan.

Scan scan = new Scan(Bytes.toBytes("smith-"));
scan.addColumn(Bytes.toBytes("personal"), Bytes.toBytes("givenName"));
scan.addColumn(Bytes.toBytes("contactinfo"), Bytes.toBytes("email"));
scan.setFilter(new PageFilter(25));
ResultScanner scanner = table.getScanner(scan);
for (Result result : scanner) {
    // ...
}

In the above listing we create a new Scan that starts from the row key smith- and we then use addColumn to restrict the columns returned (thus reducing the amount of disk transfer HBase must perform) to personal:givenName and contactinfo:email. A PageFilter is set on the scan to limit the number of rows scanned to 25. (An alternative to using the page filter would be to specify a stop row key when constructing the Scan.) We then get a ResultScanner for the Scan just created, and loop through the results performing whatever actions are necessary. Since the only method in HBase to retrieve multiple rows of data is scanning by sorted row keys, how you design the row key values is very important. We'll come back to this topic later.

You can also delete data in HBase using the Delete class, analogous to the Put class to delete all columns in a row (thus deleting the row itself), delete column families, delete columns, or some combination of those.

Connection Handling

In the above examples not much attention was paid to connection handling and RPCs (remote procedure calls). HBase provides the HConnection class which provides functionality similar to connection pool classes to share connections, for example you use the getTable() method to get a reference to an HTable instance. There is also an HConnectionManager class which is how you get instances of HConnection. Similar to avoiding network round trips in web applications, effectively managing the number of RPCs and amount of data returned when using HBase is important, and something to consider when writing HBase applications.

Conclusion to Part 4

In this part we used the HBase Java API to create a people table, insert a new person, and find the newly inserted person information. We also used the Scan class to scan the people table for people with last name "Smith" and showed how to restrict the data retrieved and finally how to use a filter to limit the number of results.

In the next part, we'll learn how to deal with the absence of SQL and relations when modeling schemas in HBase.

References

Handling Big Data with HBase Part 3: Architecture Overview

Posted on December 12, 2013 by Scott Leberknight

This is the third blog in a series of introductory blogs on Apache HBase. In the second part, we saw how to interact with HBase via the shell. In this part, we'll look at the HBase architecture from a bird's eye view.

HBase is a distributed database, meaning it is designed to run on a cluster of dozens to possibly thousands or more servers. As a result it is more complicated to install than a single RDBMS running on a single server. And all the typical problems of distributed computing begin to come into play such as coordination and management of remote processes, locking, data distribution, network latency and number of round trips between servers. Fortunately HBase makes use of several other mature technologies, such as Apache Hadoop and Apache ZooKeeper, to solve many of these issues. The figure below shows the major architectural components in HBase.

HBase Architecture

In the above figure you can see there is a single HBase master node and multiple region servers. (Note that it is possible to run HBase in a multiple master setup, in which there is a single active master.) HBase tables are partitioned into multiple regions with each region storing a range of the table's rows, and multiple regions are assigned by the master to a region server.

HBase is a column-oriented data store, meaning it stores data by columns rather than by rows. This makes certain data access patterns much less expensive than with traditional row-oriented relational database systems. For example, in HBase if there is no data for a given column family, it simply does not store anything at all; contrast this with a relational database which must store null values explicitly. In addition, when retrieving data in HBase, you should only ask for the specific column families you need; because there can literally be millions of columns in a given row, you need to make sure you ask only for the data you actually need.

HBase utilizes ZooKeeper (a distributed coordination service) to manage region assignments to region servers, and to recover from region server crashes by loading the crashed region server's regions onto other functioning region servers.

Regions contain an in-memory data store (MemStore) and a persistent data store (HFile), and all regions on a region server share a reference to the write-ahead log (WAL) which is used to store new data that hasn't yet been persisted to permanent storage and to recover from region server crashes. Each region holds a specific range of row keys, and when a region exceeds a configurable size, HBase automatically splits the region into two child regions, which is the key to scaling HBase.

As a table grows, more and more regions are created and spread across the entire cluster. When clients request a specific row key or scan a range of row keys, HBase tells them the regions on which those keys exist, and the clients then communicate directly with the region servers where those regions exist. This design minimizes the number of disk seeks required to find any given row, and optimizes HBase toward disk transfer when returning data. This is in contrast to relational databases, which might need to do a large number of disk seeks before transferring data from disk, even with indexes.

The HDFS component is the Hadoop Distributed Filesystem, a distributed, fault-tolerant and scalable filesystem which guards against data loss by dividing files into blocks and spreading them across the cluster; it is where HBase actually stores data. Strictly speaking the persistent storage can be anything that implements the Hadoop FileSystem API, but usually HBase is deployed onto Hadoop clusters running HDFS. In fact, when you first download and install HBase on a single machine, it uses the local filesystem until you change the configuration!

Clients interact with HBase via one of several available APIs, including a native Java API as well as a REST-based interface and several RPC interfaces (Apache Thrift, Apache Avro). You can also use DSLs to HBase from Groovy, Jython, and Scala.

Conclusion to Part 3

In this part, we got a pretty high level view of HBase architecture. In the next part, we'll dive into some real code and show the basics of working with HBase via its native Java API.

References