BloomJoin: BloomFilter + CoGroup

We recently open-sourced a number of internal tools we’ve built to help our engineers write high-performance Cascading code as the cascading_ext project. Today I’m going to to talk about a tool we use to improve the performance of asymmetric joins—joins where one data set in the join contains significantly more records than the other, or where many of the records in the larger set don’t share a common key with the smaller set.

Asymmetric Joins

A common MapReduce use case for us is joining a large dataset with a global set of records against a smaller one—for example, we have a store with billions of transactions keyed by user ID, and want to find all transactions by users who were seen within the last 24 hours.   The standard way to execute this join with map-reduce is to sort each store by user ID, and then use a reducer function which skips transactions unless the user ID is present on both left and right-hand sides of the join (here the global transaction store is the left hand side and the specific set of user IDs is the right hand side):

 (when writing a Cascading job, this is just a CoGroup with an InnerJoin)

This solution works perfectly well, but from a performance standpoint, there’s a big problemwe pointlessly sorted the entire store of billions of transactions, even though only a few million of them made it into the output store!   What if we could get rid of irrelevant transactions, so we only bother sorting the ones which will end up in the output?

It turns out we can do this, with the use of a clever and widely-used data-structure: the Bloom Filter.

Bloom Filters

A bloom filter is a compact, probabilistic, representation of a set of objects.  Given an arbitrary object, it is cheap to test whether an object is a member of the original set; a property of bloom filters is that while an object that should be in the set  is never falsely rejected, it is possible for an object which should not be in the set to be accepted anyway.  That is to say, the filter does not allow false negatives, but can allow false positives.

For illustration, say we built a bloom filter out of records {A, E}, and want to test values {A, B, C, D} for membership:

Bloom filter simple

Say values C and D were correctly rejectedthese would be true negatives.  Key A will always be accepted since it was in the original filter, so we call it a true positive.  Key B however we call a false positive, since it was accepted even though it was not in the original set of keys.

A bloom filter works by hashing the keys that are inserted into the filter several times and marking the corresponding slots in a bit array.  When testing objects for membership in the filter, the same hash functions are applied to the items being tested.  The corresponding bits are then checked in the array:

As seen above, there are three possibilities when those bits are checked:

  • The item was actually in the original set and is accepted (object A)
  • The item was not in the original set, the corresponding bits are not set in the array, and it is rejected (object C)
  • The item was not in the original set, but the corresponding bits were set anyway, and it is accepted (object B)

Without going deep into the math behind bloom filters, there are four variables to consider when building a bloom filter: the number of keys, false positive rate, size of the filter, and number of hash functions used.  For our MapReduce use case, “size of filter” is a fixed quantity; we’re going to build a bloom filter as large as we can fit the memory of a map task.  We also have a fixed number of keys to insert in the filter (for a particular join, we’re building a bloom filter out of all keys on one side of the join).

Given a fixed number of keys and bits, we can choose a number of hash functions which minimizes our false positive rate.  For more details, check out the wiki article.

Putting it together

So how can a bloom filter improve the performance of our map-reduce job?   When joining two stores, before performing the reduce itself, we can use the bloom filter to filter out most of the records which would not have matched anything in the other side of the join:

Unlike the previous time, when we had to sort all the left hand side records before joining with the right side, we only have to sort the true positives {A}, as well as the false positives {B}.

BloomJoin

We’ve put all of this together into our open-source cascading_ext library on Github, as the sub-assemblies BloomJoin and BloomFilter.  The BloomJoin assembly closely mimics the CoGroup interface, but behind the scenes, we first use a bloom filter to filter the left-hand side pipe before performing a CoGroup.  If you use the CascadingHelper utility in the cascading_ext library, the assembly will work on its own:

Pipe source1 = new Pipe("source1");
Pipe source2 = new Pipe("source2");

Pipe joined = new BloomJoin(source1, new Fields("field1"),
  source2, new Fields("field3"));

CascadingUtil.get().getFlowConnector()
  .connect("Example flow", sources, sink, joined)
  .complete();

If using a raw HadoopFlowConnector, a FlowStepStrategy along with some properties will need to be attached to the flow:

Pipe source1 = new Pipe("source1");
Pipe source2 = new Pipe("source2");

Pipe joined = new BloomJoin(source1, new Fields("field1"),
  source2, new Fields("field3"));

Flow f = new HadoopFlowConnector(BloomProps.getDefaultProperties())
  .connect("Example flow", sources, sink, joined);
f.setFlowStepStrategy(new BloomAssemblyStrategy());
f.complete();

The classes BloomJoinExample and BloomJoinExampleWithoutCascadingUtil have some more examples which should work on your cluster out-of-the-box.

Performance

We measured the performance benefits of BloomJoin by running a benchmark job on our own data. For a sample dataset we took all request logs for one of our services, where each log entry contains a unique user ID. Each user ID may have many log entries in a single day, and is not necessarily seen every day. The job we ran took all logs from a particlar day (2012/12/26), and found all log entries from the previous two months associated with each of the user IDs from that day:

Logs in Full dataset (2 months of logs) 5,531,090,779
Logs from 12/26 134,035,584
Logs from full dataset matching users from 12/26 1,206,247,339 (21.8%)

We implemented this join two waysfirst with a standard CoGroup using an InnerJoin, and second with our BloomFilter assembly:

Metric CoGroup Bloom Join % Of original cost
Map output records 5,531,090,779 1,554,714,936 28.1
Map output bytes 3,145,258,140,017 788,744,906,949 25.1
Reduce shuffle bytes 806,870,037,807 177,418,598,374 22.0
Total time spent by all maps in occupied slots (ms) 620,127,159 288,005,428 46.4
Total time spent by all reduces in occupied slots (ms) 351,864,211 218,623,410 62.1
CPU time spent (ms) 1,069,814,490 527,423,620 49.3

We can see that filtering the map output for relevant records cuts our map output by 75% (for this input, even an optimal filter would only cut our  output by 78%), and correspondingly cuts shuffle bytes and CPU time.  Note that reduce time was only cut by 38% since copying data from mappers to reducers has some fixed costs per reduce task, and both jobs ran with the same number of reduces on this test.

Overall, using a bloom filter cut the cost of the job on our Hadoop cluster by around 50%a huge win.

Parameters to Tweak

There are a number of parameters available to tweak, depending on the the kind of data being processed, and the machines doing the processing.   The defaults should generally work fine, except for NUM_BLOOM_BITS, which will vary depending on the resources available to TaskTrackers on your cluster:

  • BloomProps.NUM_BLOOM_BITS:  The size of the bloom filter in-memory.  The parameter defaults to 300MB, but can be adjusted up or down depending on the free space on your map tasks.
  • BloomProps.NUM_SPLITS:  The number of parts to split the bloom filter into.   This determines the number of reduce tasks used when constructing the filter.

(via blog.liveramp.com)

Batoo JPA – The New JPA Implementation That Runs Over 15 Times Faster…

I loved the JPA 1.0 back in early 2000s. I started using it together with EJB 3.0 even before the stable releases. I loved it so much that I contributed bits and parts for JBoss 3.x implementations.

Those were the days our company was considerably still small in size. Creating new features and applications were more priority than the performance, because there were a lot of ideas that we have and we needed to develop and market those as fast as we can. Now, we no longer needed to write tedious and error prone xml descriptions for the data model and deployment descriptors. Nor we needed to use the curse called “XDoclet”.

On the other side, our company grew steadily, our web site has become the top portal in the country for live events and ticketing. We now had the performance problems! Although the company grew considerably, due to the economics in the industry, we did not make a lot of money. The challenge we had was our company was a ticketing company. Every e-commerce business has high and low seasons. But for ticketing there is low seasons and high hours. While you sell avarage x tickets an hour, when a blockbuster event goes on sale suddenly demand becomes 1000s of xs for an hour. Welcome to hell!

We worked day and night to tweak and enhance the application to use whatever available to keep it up on a big day. To be frank there was always a bigger event that was capable of bringing the site down no matter how hard we tried.

The dream was over, I came to realize that developing applications on top of frameworks is a bit “be careful!” along with “fun”.

I Kept Learning

I loved programming, I loved Java, I loved opensource. I developed almost every possible type applications on every possible platform I could. For the rest I went in and discovered stuff. I learned a lot from masters thanks to open source. In contrast to most, I read articles and codes written by great programmers like Linus Torvalds, Gavin King, Ed Merks and so many others.

With the experiences I gathered, I quit the ticketing company I loved and became a Software Consultant. This opened a new era in front of me that there were a lot of industries and a lot of different platforms and industries.

In each project I became the performance police of the application.

I am now the performance freak!

I Took The Red Pill!

One day I said to myself, could JPA be faster? If yes, how fast can it be. I spent about two weeks to create an entitymanager that persisted and loaded entities. Then I ran it and compared the results to ones off of Hibernate. The results were not really promising I was only about %50 faster than Hibernate in persisting and finding the entities. I spent another week to tweak the loops, cached metamodel chunks, changed access to classes from interfaces to abstract classes, modified the lists to arrays and so many other things. Suddenly I had a prototype that were 50+ times faster than Hibernate!

Development Of Batoo JPA

I was astonished by how drastically performance went up by just paying attention to performance centric coding. By then I was using Visual VM to measure the times spent in the JPA layer. I got down and wrote a self profiling tool that measured the CPU resources spent at the JPA Layer and started implementing every aspect of the JPA 2.0 Specification. After each iteration I re-run the benchmark and when the performance dropped considerably I went back to changes and inspected the new code line by line – the profiling tool I created reported performance hit of every line of the JPA Stack.

It took about 6 months to implement the specification as a whole, on top of it, I introduced a Maven Plugin to create bytecode instrumentation at build time and a complementary Eclipse Plugin to allow use of instrumentation in Eclipse IDE.

After a carriage of 6 months Batoo JPA was born in August 2012. it measured over 15 times faster than Hibernate.

Benchmark

As stated earlier, a benchmark was introduced to measure every micro development iteration of Batoo JPA. This benchmark was not created to put forward the areas Batoo JPA was fast so that other would believe in Batoo JPA, but was created to put together a most common domain model and persistence operations that existed in almost every JPA application – so that I knew how fast Batoo JPA was.

Performance Metrics

The scenario is:

  • A Person object
    • With phonenumbers – PhoneNumber object
    • With addresses – Address object
      • That point to country – Country Object

Common life-cycle tasks has been introduced:

  • Persist 100K person objects with two phone numbers and two addresses in lots of 10 per session
  • Locate and load 250K person objects with lots of 10 per session
  • Remove 5K person objects with lots of 5 per session
  • Update 100K person objects with lots of 100
  • Query person objects 25K times using Object Oriented Criteria Querying API.
  • Query person objects 25K times using JPQL – Java Persistence Query Language, an SQL-like query scripting language.

For the sake of simplicity, the benchmark was run on top of in-memory embedded Derby with the profiler slicing the times spent at the

  • Unit Test Layer
  • JPA Layer
  • Derby Layer

The times spent at the Unit Test Layer is omitted from the Results due to irrelevancy.

Results

The times given in the below tables are in milliseconds spent in the JPA layer while running the benchmark scenario. The same tests are run for Batoo and Hibernate JPA in different runs to isolate boot, memory, cache, garbage collection etc. effects.

The tables below show

  • the total time spent at Derby Layer as DB Operation Total
  • the type of the test as Test
  • the times for each test at Derby Layer as DB Operation
  • the times for each test at JPA Layer as Core Operation
  • the total time spent at JPA Layer as Core Operation Total
  • the total time spent at both JPA and Derby Layers as Operation Total


Below are the ratios of CPU resources spent by Hibernate and Batoo JPA. It is assumed that an an application generates average 1 save, 5 locate, 2 remove and 3 update and 5 + 5 total of ten queries in ratios. Now although these numbers are extremely dependent on the application nature, some sort of assumption is needed to measure the overall speed comparison.


Given the scenario above, Batoo JPA measures over 15 times faster than Hibernate – the leading JPA implementation.

As you may have noticed Batoo JPA not only performs insanely fast at the JPA Layer it also employs a number of optimizations to relieve the pressure on the database. This is why Batoo JPA measures half the time at DB Layer in comparison to the one off of Hibernate.

Interpretation of Results

We do appreciate that JPA is not the single part of an application. But we do believe that the current JPA implementation consume quite a bit of your server budget. While a typical application cluster spends CPU resources for persistence layer about %20 to %40, Batoo JPA will well be able to bring your cluster down to half of its size allowing you save a lot on licensing administration and hardware, as well as room to scale up even for non-cluster friendly applications – in my experience I saw applications running on 96 core Solaris systems simply because they are not scalable.

Conclusion

We have managed to create a JPA Product that allows you to enjoy the great features of JPA Technology but also do not require you to compromise on performance!

On top of that Batoo JPA is developed using the Apache Coding Standards and has valuable documentation within the code. The project codebase is released with LGPL license and there is absolutely no closed source part and we envision that it would be that way forever.

As stated earlier, it also has a complementary Maven and Eclipse plugin to provide instrumentation for build and development phases.

Batoo JPA deviates from the specification almost zero, making it easy for existing JPA applications be migrated to Batoo JPA, while requiring no additional learning phase to start using it.

Last but not the least, Batoo JPA not only saves you when you run your application, but also during the time you deploy your application. Batoo JPA employs parallel deployer managers to handle deployment in parallel. Considering a developer deploys the application during his / her development phase well 10x times a day if not 100, with a moderately large domain model this may take quite a bit of developers time when summed up. Although we haven’t made a concrete benchmark on deployment speed, we know that Batoo JPA  deploys about 3 4 times faster than Hibernate.

Reference

(via HighScalability.com)

LinkedIn Moved From Rails To Node: 27 Servers Cut And Up To 20x Faster

Update: More background by Ikai Lan, who worked on the mobile server team at LinkedIn, says some facts were left out: the app made “a cross data center request, guys. Running on single-threaded Rails servers (every request blocked the entire process), running Mongrel, leaking memory like a sieve.” Which explains why any non-blocking approach would be a win. And Ikai, I hope as you do that nobody reads HS and just does what somebody else does without thinking. The goal here is information that you can use to make your own decisions.

Ryan Paul has written an excellent behind-the-scenes look at LinkedIn’s mobile engineering. While the mobile part of the story–23% mobile usage; focus on simplicity, ease of use, and reliability; using a room metaphor; 30% native, 80% HTML; embedded lightweight HTTP server; single client-app connection–could help guide your mobile strategy, the backend effects of moving from Rails to Node.js may also prove interesting.

After evaluation, some of the advantages of Node.js were:

  • Much better performance and lower memory overhead than other tested options, running up to 20x faster in some scenarios
  • Programmers could leverage their JavaScript skills.
  • Frontend and backend mobile teams could be combined into a single unit.
  • Servers were cut to 3 from 30. Enough headroom remains to handle 10x current levels of resource utilization.
  • Development could focus more on application development than firefighting

Clearly a lot of issues are being mixed together here. We have a rewrite, a change of stack, and a change of logic distribution between the server and the client, so there’s plenty of room to argue where the gains really came from, but it’s clear LinkedIn believes the use of Node.js was a big win for them. YMMV.

The comment section is, well, vigorous, but has some interesting observations. I especially liked one comment by oluseyi:

For our inevitable rearchitecting and rewrite, we want to cache content aggressively, store templates client-side (with the ability to invalidate and update them from the server) and keep all state purely client side. This means the application may request from the server all content matching a set of filters updated since a provided timestamp in order to refresh its cache; rather than opening and closing several connections, we want to open a single long-lived connection and stream all of the relevant metadata, assets and content. (Again, remember that the original implementation just rendered the returned HTML, which means that URIs for images, etc pointed to the server, and because the web views were being created and destroyed with navigation, the images were not being effectively cached.)
In the long-lived connection implementation, there are no longer “views” in the traditional MVC web application sense. The final result of server-side processing in the controller to aggregate any necessary data is not markup written to the output stream but rather a large binary blob which the client unpacks to extract all relevant data.

So while I see your concern that the term MVC is being misused here, its usage is correct in a web application context-specific sense. The “view” (markup written to output, including references to external URIs that must be resolved by the rendering web view) is now an aggregated data stream, which gets unpacked, cached and then rendered client-side into the view.

(via HighScalability.com)