BloomJoin: BloomFilter + CoGroup

We recently open-sourced a number of internal tools we’ve built to help our engineers write high-performance Cascading code as the cascading_ext project. Today I’m going to to talk about a tool we use to improve the performance of asymmetric joins—joins where one data set in the join contains significantly more records than the other, or where many of the records in the larger set don’t share a common key with the smaller set.

Asymmetric Joins

A common MapReduce use case for us is joining a large dataset with a global set of records against a smaller one—for example, we have a store with billions of transactions keyed by user ID, and want to find all transactions by users who were seen within the last 24 hours.   The standard way to execute this join with map-reduce is to sort each store by user ID, and then use a reducer function which skips transactions unless the user ID is present on both left and right-hand sides of the join (here the global transaction store is the left hand side and the specific set of user IDs is the right hand side):

 (when writing a Cascading job, this is just a CoGroup with an InnerJoin)

This solution works perfectly well, but from a performance standpoint, there’s a big problemwe pointlessly sorted the entire store of billions of transactions, even though only a few million of them made it into the output store!   What if we could get rid of irrelevant transactions, so we only bother sorting the ones which will end up in the output?

It turns out we can do this, with the use of a clever and widely-used data-structure: the Bloom Filter.

Bloom Filters

A bloom filter is a compact, probabilistic, representation of a set of objects.  Given an arbitrary object, it is cheap to test whether an object is a member of the original set; a property of bloom filters is that while an object that should be in the set  is never falsely rejected, it is possible for an object which should not be in the set to be accepted anyway.  That is to say, the filter does not allow false negatives, but can allow false positives.

For illustration, say we built a bloom filter out of records {A, E}, and want to test values {A, B, C, D} for membership:

Bloom filter simple

Say values C and D were correctly rejectedthese would be true negatives.  Key A will always be accepted since it was in the original filter, so we call it a true positive.  Key B however we call a false positive, since it was accepted even though it was not in the original set of keys.

A bloom filter works by hashing the keys that are inserted into the filter several times and marking the corresponding slots in a bit array.  When testing objects for membership in the filter, the same hash functions are applied to the items being tested.  The corresponding bits are then checked in the array:

As seen above, there are three possibilities when those bits are checked:

  • The item was actually in the original set and is accepted (object A)
  • The item was not in the original set, the corresponding bits are not set in the array, and it is rejected (object C)
  • The item was not in the original set, but the corresponding bits were set anyway, and it is accepted (object B)

Without going deep into the math behind bloom filters, there are four variables to consider when building a bloom filter: the number of keys, false positive rate, size of the filter, and number of hash functions used.  For our MapReduce use case, “size of filter” is a fixed quantity; we’re going to build a bloom filter as large as we can fit the memory of a map task.  We also have a fixed number of keys to insert in the filter (for a particular join, we’re building a bloom filter out of all keys on one side of the join).

Given a fixed number of keys and bits, we can choose a number of hash functions which minimizes our false positive rate.  For more details, check out the wiki article.

Putting it together

So how can a bloom filter improve the performance of our map-reduce job?   When joining two stores, before performing the reduce itself, we can use the bloom filter to filter out most of the records which would not have matched anything in the other side of the join:

Unlike the previous time, when we had to sort all the left hand side records before joining with the right side, we only have to sort the true positives {A}, as well as the false positives {B}.

BloomJoin

We’ve put all of this together into our open-source cascading_ext library on Github, as the sub-assemblies BloomJoin and BloomFilter.  The BloomJoin assembly closely mimics the CoGroup interface, but behind the scenes, we first use a bloom filter to filter the left-hand side pipe before performing a CoGroup.  If you use the CascadingHelper utility in the cascading_ext library, the assembly will work on its own:

Pipe source1 = new Pipe("source1");
Pipe source2 = new Pipe("source2");

Pipe joined = new BloomJoin(source1, new Fields("field1"),
  source2, new Fields("field3"));

CascadingUtil.get().getFlowConnector()
  .connect("Example flow", sources, sink, joined)
  .complete();

If using a raw HadoopFlowConnector, a FlowStepStrategy along with some properties will need to be attached to the flow:

Pipe source1 = new Pipe("source1");
Pipe source2 = new Pipe("source2");

Pipe joined = new BloomJoin(source1, new Fields("field1"),
  source2, new Fields("field3"));

Flow f = new HadoopFlowConnector(BloomProps.getDefaultProperties())
  .connect("Example flow", sources, sink, joined);
f.setFlowStepStrategy(new BloomAssemblyStrategy());
f.complete();

The classes BloomJoinExample and BloomJoinExampleWithoutCascadingUtil have some more examples which should work on your cluster out-of-the-box.

Performance

We measured the performance benefits of BloomJoin by running a benchmark job on our own data. For a sample dataset we took all request logs for one of our services, where each log entry contains a unique user ID. Each user ID may have many log entries in a single day, and is not necessarily seen every day. The job we ran took all logs from a particlar day (2012/12/26), and found all log entries from the previous two months associated with each of the user IDs from that day:

Logs in Full dataset (2 months of logs) 5,531,090,779
Logs from 12/26 134,035,584
Logs from full dataset matching users from 12/26 1,206,247,339 (21.8%)

We implemented this join two waysfirst with a standard CoGroup using an InnerJoin, and second with our BloomFilter assembly:

Metric CoGroup Bloom Join % Of original cost
Map output records 5,531,090,779 1,554,714,936 28.1
Map output bytes 3,145,258,140,017 788,744,906,949 25.1
Reduce shuffle bytes 806,870,037,807 177,418,598,374 22.0
Total time spent by all maps in occupied slots (ms) 620,127,159 288,005,428 46.4
Total time spent by all reduces in occupied slots (ms) 351,864,211 218,623,410 62.1
CPU time spent (ms) 1,069,814,490 527,423,620 49.3

We can see that filtering the map output for relevant records cuts our map output by 75% (for this input, even an optimal filter would only cut our  output by 78%), and correspondingly cuts shuffle bytes and CPU time.  Note that reduce time was only cut by 38% since copying data from mappers to reducers has some fixed costs per reduce task, and both jobs ran with the same number of reduces on this test.

Overall, using a bloom filter cut the cost of the job on our Hadoop cluster by around 50%a huge win.

Parameters to Tweak

There are a number of parameters available to tweak, depending on the the kind of data being processed, and the machines doing the processing.   The defaults should generally work fine, except for NUM_BLOOM_BITS, which will vary depending on the resources available to TaskTrackers on your cluster:

  • BloomProps.NUM_BLOOM_BITS:  The size of the bloom filter in-memory.  The parameter defaults to 300MB, but can be adjusted up or down depending on the free space on your map tasks.
  • BloomProps.NUM_SPLITS:  The number of parts to split the bloom filter into.   This determines the number of reduce tasks used when constructing the filter.

(via blog.liveramp.com)

Advertisements

Leave a Reply

Please log in using one of these methods to post your comment:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s