I recently spent some quality time with CRDTs, which is short for commutative replicated data types. I've gotten curious about them when working on the Riak Handbook and I gave a talk about designing data structures for Riak the other week at NoSQL matters, slides are available too

What are commutative replicated data types? It's a fancy term for describing data structures suitable for eventually consistent systems. You know what's an eventually consistent system? Riak!

When working with Riak, your data needs to be designed in a way that allows coping with its eventually consistent nature. This poses a problem for data types like counters, sets, graphs, essentially all data structures that require operations to be executed in a monotonic fashion.

For instance, with a counter, you don't want to lose a single increment when multiple clients add values. But due to Riak's eventually consistent nature you can't guarantee monotonic order of updates. Instead you need to make sure you can restore a logical order of operations at any time given any number of conflicting writes.

When multiple clients update the same object concurrently they cause siblings, two objects with different values. If every sibling has a different value for the counter, how do you make sure you can restore order and therefore the final value? Let's look at a worst-case scenario of a data structure that won't work well in this case. Two clients see an object already stored in Riak representing a counter, currently having the value 1.

{
  "value": 1
}

Two clients now want to update the counter, incrementing its value by 1. They both store the updated data back to Riak, causing a conflict. Now you have two siblings, both having the value 2. You also have the original sibling around as referenced by both clients when they wrote their data.

{
  "value": 2
}

It's unlikely you'll be able to restore the total sum of both values, because you don't know what the previous value for both clients was. You can assume the value was 1, but what if a client incremented by 2? In an eventually consistent system it's hard to say how much has changed since the last time you've seen the data unless you keep track specifically of what has changed.

Commutative replicated data types are data structures designed to help you here. Let's look at an alternative for a counter. What if, instead of keeping a single value, every client keeps its own value and only updates that number instead of the total value?

We can assume that updates of a single client will happen in a monotonic fashion. There shouldn't be more than one client with the same identifier in the system.

Here's an example of our updated data structure:

{
  "client-1": 2,
  "client-2": 2,
  "client-3": 3
}

When a client wants to update a value he only updates its own value. It's a contract between all clients to never touch any other client's data other than merge it back together. When a client finds an object with siblings it can merge them together simply by picking the highest value for every client. Part of the contract is also that a client must merge the data when it finds an object with siblings.

To get the total value for the counter, just calculate the sum of all values, et voila! This surprisingly simple data structure is called G-counter.

Let's look at some code. I'm assuming your bucket has support for siblings enabled.

The bits to generate a counter value are straight-forward. You just have to make sure to assign unique but recurring client identifiers to your client objects. Here we're using the Ruby client.

require 'riak'

riak = Riak::Client.new(client_id: 'client-1')
counter = riak.bucket('g-counters').get_or_new('counter-1')

counter.data ||= {}
counter.data[riak.client_id] ||= 0
counter.data[riak.client_id] += 1
counter.store

After initializing the data structure we're assigning it a default, if necessary and increment the counter. This code can nicely be hidden in a library function somewhere. The interesting bit is merging the data structures back together should the client find siblings. The Ruby client has a convenient way to specify callbacks that should be called when more than one object is returned.

We're writing code that iterates over all siblings, picking the highest value for every client along the way.

Riak::RObject.on_conflict do |robject|
  return nil if robject.bucket != 'g-counters'
  data = robject.siblings.each_with_object({}) do |sibling, data|
    (sibling.data || {}).each do |client_id, value|
      if (data[client_id] || 0) < value
        data[client_id] = value
      end
    end
  end
  robject.data = data
  robject
end

The next time you fetch the data and the Ruby client detects a conflict the callback will be run, merging the data back together into a single data structure.

I'll leave the code to calculate the sum of all values as an exercise to the reader.

All this assumes that you're enforcing a stronger consistency on your data. You need to make sure that R + W > N, because even when one client only updates its own values, he has little control over where its data is written. When you don't make sure that consistency of data is enforced you can run into situations where a client comes across two siblings caused by its own updates. This can happen when a primary replica failed, a secondary replica took its place and the client only uses a small read quorum. These scenarios deserve their own write-up.

If you want to know more about commutative replicated data types I highly suggest reading the relevant paper on them. It's almost fifty pages long and required me several reads to get a good grasp of them, but it's totally worth it. There are more specific implementations available for CRDTs too, specifically statebox for Erlang, knockbox for Clojure and a sample implementation in Ruby. The latter comes with a handy README that shows examples for the specific data types. All of them aren't specific to Riak but can be used with it. Also fresh from the world of academic papers is this one by Neil Conway et. al. on lattices in distributed computing by way of Bloom, a language for disorderly distributed computing.

There are some other caveats with CRDTs and Riak but we'll look at them in more detail in another installment of this series, in particular regarding consistency and garbage collection. There's a lot to be said about CRDTs and there's a lot of brain matter to be spent on trying to understand them. The next update for Riak Handbook might even include a section on them. The topic is certainly fascinating enough to warrant one, as it addresses the issues people commonly encounter when designing data structures for eventual consistency.

Tags: riak, crdt

Key rack

One of the most common questions to ask about Riak is: how do I get a list of all the keys in my bucket, in the cluster, or that have an attribute that matches my query using MapReduce?

The motivation behind it is simple: you want to delete all the keys in a bucket, count the amount of keys stored in your cluster entirely, you want to clear out your cluster or you want to run ad-hoc queries on the data stored in a bucket.

All valid in their own right.

But things are not so simple with Riak. To understand why, let's take a quick look under the covers.

What's in a bucket?

A bucket is a namespace in Riak. It's not a physically distinctive entity like a table in a relational database. You can set some properties on it, things like replication levels, commit hooks, quorum, but that's it. Those are stored in the cluster's configuration which is gossiped around the cluster just like the data that identifies what partition goes on which machine.

In fact, when you specify a bucket and a key to fetch or write some data, they're stuck together to find the location in the cluster. Consider a bucket-key combination like users/roidrage. To find the location, Riak hashes both, not just the key. Both bucket and key uniquely identify a piece of data, allowing you to have multiple object with the same key, but in different buckets.

When an object is stored in Riak's storage backends, it uses both bucket and key name to identify it. What you get as a result are files that contain an abundance of different bucket-key combinations and their respective objects, sometimes not even in any order. The only physical distinction Riak has for data on disk is the partition they belong to. Everything else is up to the storage backend. There's no distinction between buckets on disk.

One reason for this is consistent hashing. If you remember the last installment of this series, I mentioned that consistent hashing's downside is that you lose key ordering. Keys are randomly spread out through the cluster. Some ordering still exists depending on the backend, but in general, ordering is lost.

Listing all of the keys

So to list keys in a bucket, Riak has to go through all of the keys in every partition, and I mean ALL OF THEM. Here's a picture of keys and an impersonation of Riak, having to take care of all of them.

No big deal, right? Unless of course, you store millions and millions of them, and want to find about all the keys from say, the bucket users, which may not even have to be more than 1000. To do that, Riak goes through every partition, every partition loads the keys either from memory (Bitcask) or disk (LevelDB) and sifts through them, finding the ones belonging to the users bucket.

All that said, it's certainly not impossible to do, if you have some time to wait, depending on the amount of data stored.

$ curl 'localhost:8098/buckets/users/keys?keys=true'

But wait, don't do that. Do this instead, streaming the keys instead of waiting for them all to arrive and then having them dumped at once.

$ curl 'localhost:8098/buckets/users/keys?keys=stream'

That's much better.

Listing keys has an impact on your Riak nodes, so if you can avoid it, don't do it!

So how do I really get all of the keys?

If select * from riak is not a great option, then what is?

Instead of relying on Riak, build an index on the keys. Thanks to Riak 2i (Secondary Indexes), this is easy. In fact, you get indexing of keys for free when using the LevelDB backend, just use the index $key. This takes advantage of LevelDB's sorted file structure. Neat!

But, and here's the kicker, you can only fetch ranges of keys. So instead of asking for all the keys, you ask for a range large enough to fit all the keys.

$ curl 'localhost:8098/buckets/users/index/$key/0/zzzz'

This finds all the keys that start with something lexicographically larger than 0 and less than zzzz and returns them to you in a list. Now there's a slim chance you'll get users with names like that, but I'll leave that exercise, or proper validations, up to you.

Using that list, you can count the number of keys in that bucket, or you can delete them one by one.

Ideally...

In an ideal world, listing keys in a bucket would be possible and not an expensive operation. Riak could for example allow users to store buckets in separate files. The downside is that with a lot of buckets, you'll hit the limits of open file descriptors in no time, a bit of a bummer. But until something better comes along, secondary indexes are a nice tool to at least avoid resorting to listing all of the keys.

Curious about other ways to index and query data in Riak? You'll like the Riak Handbook, which will be published later this week. Covers Riak's secondary indexes and other strategies to query, inspect and analyze data.

Check back in tomorrow for an introduction on storing timelines in Riak.

Tags: riak, nosql

I'm happy to report that the Riak Handbook has hit a major update, bringing a whopping 43 pages of new content with it. If you already bought the book, this is a free update, and instructions how and where to download it were sent in a separate email.

Here's a run down of what's new:

  • The book was updated for Riak 1.1.

  • The biggest new feature of the Riak Handbook is a section entirely dedicated to use cases, full of examples and code from real time usage scenarios, giving you food for thought when it comes to using Riak in your applications. Rumor has it there's also an introduction on Riak CS in there, the new cloud storage system that's API-compatible with Amazon's S3.

  • Full coverage on pre- and post-commit hooks. Get ready for a thorough run-through of things you can do with your data before and after it's written to Riak. Covers both JavaScript and Erlang examples.

  • To go full circle on deploying Erlang code in a Riak cluster, the book now has a section dedicated to it, so you don't have to manually compile Erlang code every time you update it.

  • More details on secondary indexes, how you can use them to store multiple values for a single document and how you can use them to model object associations.

  • A section on load balancing Riak nodes.

  • An introduction on where on a network you can and should put your Riak nodes.

  • A big, big section on monitoring and an overview of Riak Control, the new cluster management tool introduced in Riak 1.1.

  • Last but not least, the book now comes as a man page. Need to look something up real quick while you're working on the command line? No worries, drop the man page in your $MANPATH and type "man riak". Easy to search when you're working remotely on a Riak node, or if you're generally a fan of spending a lot of time on the command line (just like I am).

To celebrate the new release, the Riak Handbook is 25% off until next Monday, June 4th. It's a release party, and everyone's invited!

Tags: riak, handbook, nosql

The awesome dudes at Basho released Riak 0.13 and with it their first version of Riak Search yesterday. This is all kinds of exciting, and I'll tell you why. Riak Search is (way down below) based on Lucene, both the library and the query interface. It mimicks the Solr web API for querying and indexing. Just like you'd expect something coming out of Basho, you can add and remove nodes at any time, scaling up and down as you go. I've seen an introduction on the basics back at Berlin Buzzwords, and it was already shaping up to be nothing but impressive. But enough with all the praise, why's this stuff exciting?

  • The key/value model is quite restrictive when it comes to fetching data by, well anything else than a key. Keeping reverse lookup indexes was one way to do it, but the consistency model of Riak made it hard if not impossible to maintain a consistent list of interesting entries in an atomic way.

    Riak Search fills this gap (and not only for Riak, the key/value store, but for any key/value store if you will) by offering something that scales up and down in the same way as Riak, so you don't have to resort to e.g. Redis to maintain reverse lookup indexes.

    Run queries in any way you can think of, fetch ranges, groups, you name it, no need to do anything really. It even integrates directly with Riak through pre-commit hooks.

  • It's based on proven technology (Lucene, that is). It doesn't compete with something entirely new, it takes what's been worked on and constantly improved for quite a while now, and raises it onto a new foundation to make it scale much nicer, the foundation being Riak Core, Riak KV and Bitcasks, and some new components developed at Basho.

  • It uses existing interfaces. Imagine just pointing your search indexing library to a new end point, and there you go. Just the thought of that makes me teary. Reindex data, reconfigure your clients to point to a new endpoint, boom, there's your nicely scalable search index.

  • Scaling Solr used to be awkward. Version 1.5 will include some heavy improvements, but I believe the word shard fell at some point. Imagine a Solr search index where you can add and remove nodes at any time, the indexing rebalancing without requiring manual intervention.

    Sound good? Yeah, Riak Search can do that too.

Remember though, it's just a first release, which will be improved over time. I for one am just happy they finally released it, I almost crapped my pants, it's that exciting to have something like Riak Search around. And I say that with all honesty and no fanboyism whatsoever. Having used Solr quite a lot in the past I'm well aware of its strengths and weaknesses and the sweet spot Riak Search hits.

I urge you to play with it. Installing it and feeding it with data could not be easier. Well done, Basho!

Update: From reading all this you may get the impression that Riak Search builds heavily on a Lucene foundation. That's not the case. When I say that it builds on top of Lucene, I actually meant that it can and does reuse its analyzers and query parsing. Both can be replaced with custom (Erlang) implementations. That's the only part of Lucene that is actually used by Riak Search, because why reinvent the wheel?

Tags: riak, search, fulltext

The simplicity of consistent hashing is pretty mind-blowing. Here you have a number of nodes in a cluster of databases, or in a cluster of web caches. How do you figure out where the data for a particular key goes in that cluster?

You apply a hash function to the key. That's it? Yeah, that's the whole deal of consistent hashing. It's in the name, isn't it?

The same key will always return the same hash code (hopefully), so once you've figured out how you spread out a range of keys across the nodes available, you can always find the right node by looking at the hash code for a key.

It's pretty ingenious, if you ask me. It was cooked up in the lab chambers at Akamai, back in the late nineties. You should go and read the original paper right after we're done here.

Consistent hashing solves the problem people desperately tried to apply sharding to pretty nicely and elegantly. I'm not going to bore you with the details on how exactly consistent hashing works. Mike Perham does a pretty good job at that already, and there are many more blog posts explaining implementations and theory behind it. Also, that little upcoming book of mine has a full-length explanation too. Here's a graphic showing the basic idea of consistent hashing, courtesy of Basho.

Consistent Hashing

Instead I want to look at the practical implications of consistent hashing in distributed databases and cache farms.

Easier to Avoid Hotspots

When you put data on nodes based on a random result, which is what the hash function calculates, a value that's a lot more random than the key it's based on, it's easier to avoid hotspots. Why?

Assume a key based on an increasing value, or a simple range of keys, based on the hour of the day, like 2011-12-11-13. You add new hours and therefore new data as time passes, and keys are stored based on the range they fall in. For example, the keys 2011-12-11-18 until 2011-12-11-23 are stored on the same node, with the rest of the keys stored on other nodes, just because the ranges or the partitioning scheme happen to be set up this way.

For a consumer-facing site, the evening hours are usually the busiest time of the day. They create more data, more writes, and possibly more reads too. For the hours between 18:00 and 23:00, all the load goes to the single node that carries all the relevant data.

But when you determine the location in the cluster based solely on the hash of the key, chances are much higher that two keys lexicographically close to each other end up on different nodes. Thus, the load is shared more evenly. The disadvantage is that you lose the order of keys.

There are partitioning schemes that can work around this, even with a range-based key location. HBase (and Google's BigTable, for that matter) stores ranges of data in separate tablets. As tablets grow beyond their maximum size, they're split up and the remaining parts re-distributed. The advantage of this is that the original range is kept, even as you scale up.

Consistent Hashing Enables Partitioning

When you have a consistent hash, everything looks like a partition. The idea is simple. Consistent hashing forms a keyspace, which is also called continuum, as presented in the illustration. As a node joins the cluster, it picks a random number, and that number determines the data it's going to be responsible for. Everything between this number and one that's next in the ring and that has been picked by a different node previously, is now belong to this node. The resulting partition could be of any size theoretically. It could be a tiny slice, or a large one.

First implementations of consistent hashing still had the problem that a node picking a random range of keys resulted in one node potentially carrying a larger keyspace than others, therefore still creating hotspots.

But the improvement was as simple as it was ingenious. A hash function has a maximum result set, a SHA-1 function has a bit space of 2^160. You do the math. Instead of picking a random key, a node could choose from a fixed set of partitions, like equally size pizza slices. But instead of picking the one with the most cheese on, everyone gets an equally large slice. The number of partitions is picked up front, and practically never changes over the lifetime of the cluster.

For good measure, here's a picture of a sliced pizza.

Consistent Pizza

Partitioning Makes Scaling Up and Down More Predictable

With a fixed number of partitions of the same size, adding new nodes becomes even less of a burden than with just consistent hashing. With the former, it was still unpredictable how much data had to be moved around to transfer ownership of all the data in the range of the new node. One thing's for sure, it already involved a lot less work than previous methods of sharding data.

With partitioning, a node simply claims partitions, and either explicitly or implicitly asks the current owners to hand off the data to them. As a partition can only contain so many keys, and randomness ensures a somewhat even spread of data, there's a lot less unpredictability about the data that needs to be transferred.

If that partitions just so happens to carry the largest object by far in you whole cluster, that's something even consistent hashing can't solve. It only cares for keys.

Going back to HBase, it cares for keys and the size of the tablet the data is stored in, as it breaks up tablets once they reach a threshold. Breaking up and reassigning a tablet requires coordination, which is not an easy thing to do in a distributed system.

Consistent Hashing and Partitioning Enable Replication

Consistent hashing made one thing a lot easier: replicating data across several nodes. The primary means for replication is to ensure data survives single or multiple machine failures. The more replicas you have, the more likely is your data to survive one or more hardware crashes. With three replicas, you can afford to lose two nodes and still serve the data.

With a fixed set of partitions, a new node can just pick the ones it's responsible for, and another stack of partitions it's going to be a replica for. When you really think about it, both processes are actually the same. The beauty of consistent hashing is that there doesn't need to be master for any piece of data. Every node is simply a replica of a number of partitions.

But replication has another purpose besides ensuring data availability.

Replication Reduces Hotspots (Even More!!!)

Having more than one replica of a single piece of data means you can spread out the request load even more. With three replicas of that data, residing on three different nodes, you can now load-balance between them. Neat!

With that, consistent hashing enables a pretty linear increase in capacity as you add more nodes to a cluster.

Consistent Hashing Enables Scalability and Availability

Consistent hashing allows you to scale up and down easier, and makes ensuring availability easier. Easier ways to replicate data allows for better availability and fault-tolerance. Easier ways to reshuffle data when nodes come and go means simpler ways to scale up and down.

It's an ingenious invention, one that has had a great impact. Look at the likes of Memcached, Amazon's Dynamo, Cassandra, or Riak. They all adopted consistent hashing in one way or the other to ensure scalability and availability.

Want to know more about distributed databases in general and Riak in particular? You'll like the Riak Handbook, a hands-on guide full of practical examples and advice on how to use Riak to ensure scalability and availability for your data.

In the next installment we're looking at the consequences and implications of losing key ordering in a Riak cluster.

Tags: nosql, riak

The idea of building and storing user timelines (think Twitter) in Riak confused me at first. It sounds like such a spot-on case for time series databases. Yet Yammer managed to make the idea pretty popular. The whole thing lacked implementation though, because they kept their to themselves, which I don't blame them for at all.

Apple Mac Timeline

So let's have a look at how simple it is to build one. You can see a timeline of all Apple Macintosh products above, but that's not what we want to build.

Instead we want to build something like the Twitter timeline. A user follows many other users, and wants to look at a feed built from their activities, so something like the timeline shown below.

Twitter timeline

How do you model a timeline in Riak?

For every user we store one object in Riak. Every timeline contains a list of tweet ids, or whatever activity you're referencing, or it can contain the whole tweets. Something like this should work:

If you want to store more data, turn the list into an array of hashes containing whatever information is necessary to rebuild the timeline later.

Adding entries to the timeline

To add new entries to a timeline, prepend them to the existing list of entries, here's some Ruby code to show how it's done.

The code assumes you take care of followership somewhere else. You can store that data in Riak too, but the code is oblivious to its whereabouts.

Conflicts, siblings, oh my!

The fun starts when two clients update the same timeline, you get a conflict and siblings. The strength of a simple data structure like the example above is that they're easy to merge together while still keeping ordering based on the ids. The ids are ordered only in this example, Twitter somewhat makes sure they are.

When you get a conflict, a smart Riak library like Ripple helps you find out about it. To add on the earlier example, here's a version of add that detects conflicts.

Suddenly you have two or more objects instead of one, each containing a different timeline. To turn them into a single list, you merge all of them together, discard the duplicates, and restore order based on the id. Here's some Ruby to do that.

You iterate over all timeline objects and keep adding unique activities to a new list, returning that when done.

Sort, and done!

All that's left to do is sort the result.

There's the whole code. To spare you the pain of having to write your own library, all this is bundled into a gem called riaktivity. If you're doing Python, the Brett Hoerner has got you covered with timak. Be sure to watch the original talk by Yammer, it's well worth it.

There's ups and downs to this approach, and things that need to be taken care of. More on that and modeling data for Riak in general is covered in the Riak Handbook, the definitive guide on Riak.

Tags: riak, nosql