NYTimes Architecture: No Head, No Master, No Single Point Of Failure

Michael Laing, a Systems Architect at NYTimes, gave this great decription of their use of RabbitMQ and their overall architecture on the RabbitMQ mailing list. The closing sentiment marks this as definitely an architecture to learn from:

Although it may seem complex, nytimes architecture has simple components and is mostly principles and plumbing. The key point to grasp is that there is no head, no master, no single point of failure. As I write this I can see components failing (not RabbitMQ), and we are fixing them so they are more reliable. But the system doesn’t fail, users can connect, and messages are delivered, regardless – all within design parameters.

Since it’s short, to the point, and I couldn’t say it better, I’ll just reproduce two of the email list posts here:

Just a quick note and thank you to the RabbitMQ team for a great product.

Our premier online offering http://www.nytimes.com has a new look and new underpinnings, now including a messaging architecture implemented using RabbitMQ.

This architecture – nytimes architecture - has dozens of RabbitMQ instances spread across 6 AWS zones in Oregon and Dublin. The instances are organized into “wholesale” and “retail” layers. Connection to clients is via websockets/sockjs.

Upon launch today, the system autoscaled to ~500,000 users. Connection times remained flat at ~200ms.

nytimes architecture provides subscription services for breaking news, video feeds, etc. and will add more event based services. It also supports individual messaging related to subscription status for registered users.

This system would not have been possible without RabbitMQ. It was the one component, used everywhere, that never faltered or failed.

We are using: a single Amazon Linux AMI, RabbitMQ, Cassandra 2, python 2We use pika with tornado and libev for the nytimes architecture wholesale and retail pieces; our internal clients use Java and PHP.

We use shovels – lots of shovels – to interconnect.

In production we have a RabbitMQ client 3-cluster and a core 3-cluster in each region on c1-xlarges. A proxy cluster of c1-mediums in Virginia connects clients to the client clusters. All services are parallelized so we can add more cores and clients.

The retail layer autoscales and use c1-mediums with a single rabbit shovel-connected to one of the core rabbits. Each python websocket/sockjs gateway supports up to 100K clients.

We autodeploy into subnets within Virtual Private Clouds in AWS. Clients are routed via least latency to the fastest healthy region.

Of the technical components, the gateway is the most complex. We will be moving it into open source in pieces and the first piece is likely to be the python websocket/sockjs libraries which, frankly, beat the crap out of most other stuff out there and fully conform with the relevant standards. It can be loosely thought of as a C co-process managed by python, and as such, may be possible to reuse in other languages/environments.

We have a 12-node Cassandra cluster across the 2 regions / 6 zones. It is used for persistence of messages and as cache. We do not use persistence in RabbitMQ. Our services are idempotent and important messages may be replicated multiple times creating intentional race conditions in which the fastest wins.

Although it may seem complex, nytimes architecture has simple components and is mostly principles and plumbing. The key point to grasp is that there is no head, no master, no single point of failure. As I write this I can see components failing (not RabbitMQ), and we are fixing them so they are more reliable. But the system doesn’t fail, users can connect, and messages are delivered, regardless – all within design parameters.

(Via HighScalability.com)

Scaling Mercurial at Facebook

With thousands of commits a week across hundreds of thousands of files, Facebook’s main source repository is enormous–many times larger than even the Linux kernel, which checked in at 17 million lines of code and 44,000 files in 2013. Given our size and complexity—and Facebook’s practice of shipping code twice a day–improving our source control is one way we help our engineers move fast.

Choosing a source control system

Two years ago, as we saw our repository continue to grow at a staggering rate, we sat down and extrapolated our growth forward a few years. Based on those projections, it appeared likely that our then-current technology, a Subversion server with a Git mirror, would become a productivity bottleneck very soon. We looked at the available options and found none that were both fast and easy to use at scale.

Our code base has grown organically and its internal dependencies are very complex. We could have spent a lot of time making it more modular in a way that would be friendly to a source control tool, but there are a number of benefits to using a single repository. Even at our current scale, we often make large changes throughout our code base, and having a single repository is useful for continuous modernization. Splitting it up would make large, atomic refactorings more difficult. On top of that, the idea that the scaling constraints of our source control system should dictate our code structure just doesn’t sit well with us.

We realized that we’d have to solve this ourselves. But instead of building a new system from scratch, we decided to take an existing one and make it scale. Our engineers were comfortable with Git and we preferred to stay with a familiar tool, so we took a long, hard look at improving it to work at scale. After much deliberation, we concluded that Git’s internals would be difficult to work with for an ambitious scaling project.

Instead, we chose to improve Mercurial. Mercurial is a distributed source control system similar to Git, with many equivalent features. Importantly, it’s written mostly in clean, modular Python (with some native code for hot paths), making it deeply extensible. Just as importantly, the Mercurial developer community is actively helping us address our scaling problems by reviewing our patches and keeping our scale in mind when designing new features.

When we first started working on Mercurial, we found that it was slower than Git in several notable areas. To narrow this performance gap, we’ve contributed over 500 patches to Mercurial over the last year and a half. These range from new graph algorithms to rewrites of tight loops in native code. These helped, but we also wanted to make more fundamental changes to address the problem of scale.

Speeding up file status operations

For a repository as large as ours, a major bottleneck is simply finding out what files have changed. Git examines every file and naturally becomes slower and slower as the number of files increases, while Perforce “cheats” by forcing users to tell it which files they are going to edit. The Git approach doesn’t scale, and the Perforce approach isn’t friendly.

We solved this by monitoring the file system for changes. This has been tried before, even for Mercurial, but making it work reliably is surprisingly challenging. We decided to query our build system’s file monitor, Watchman, to see which files have changed. Mercurial’s design made integrating with Watchman straightforward, but we expected Watchman to have bugs, so we developed a strategy to address them safely.

Through heavy stress testing and internal dogfooding, we identified and fixed many of the issues and race conditions that are common in file system monitoring. In particular, we ran a beta test on all our engineers’ machines, comparing Watchman’s answers for real user queries with the actual file system results and logging any differences. After a couple months of monitoring and fixing discrepancies in usage, we got the rate low enough that we were comfortable enabling Watchman by default for our engineers.

For our repository, enabling Watchman integration has made Mercurial’s status command more than 5x faster than Git’s status command. Other commands that look for changed files–like diff, update, and commit—also became faster.

Working with large histories

The rate of commits and the sheer size of our history also pose challenges. We have thousands of commits being made every day, and as the repository gets larger, it becomes increasingly painful to clone and pull all of it. Centralized source control systems like Subversion avoid this by only checking out a single commit, leaving all of the history on the server. This saves space on the client but leaves you unable to work if the server goes down. More recent distributed source control systems, like Git and Mercurial, copy all of the history to the client which takes more time and space, but allows you to browse and commit entirely locally. We wanted a happy medium between the speed and space of a centralized system and the robustness and flexibility of a distributed one.

Improving clone and pull

Normally when you run a pull, Mercurial figures out what has changed on the server since the last pull and downloads any new commit metadata and file contents. With tens of thousands of files changing every day, downloading all of this history to the client every day is slow. To solve this problem we created the remotefilelog extension for Mercurial. This extension changes the clone and pull commands to download only the commit metadata, while omitting all file changes that account for the bulk of the download. When a user performs an operation that needs the contents of files (such as checkout), we download the file contents on demand using Facebook’s existing memcache infrastructure. This allows clone and pull to be fast no matter how much history has changed, while only adding a slight overhead to checkout.

But what if the central Mercurial server goes down? A big benefit of distributed source control is the ability to work without interacting with the server. The remotefilelog extension intelligently caches the file revisions needed for your local commits so you can checkout, rebase, and commit to any of your existing bookmarks without needing to access the server. Since we still download all of the commit metadata, operations that don’t require file contents (such as log) are completely local as well. Lastly, we use Facebook’s memcache infrastructure as a caching layer in front of the central Mercurial server, so that even if the central repository goes down, memcache will continue to serve many of the file content requests.

This type of setup is of course not for everyone—it’s optimized for work environments that have a reliable Mercurial server and that are always connected to a fast, low-latency network. For work environments that don’t have a fast, reliable Internet connection, this extension could result in Mercurial commands being slow and failing unexpectedly when the server is congested or unreachable.

Clone and pull performance gains

Enabling the remotefilelog extension for employees at Facebook has made Mercurial clones and pulls 10x faster, bringing them down from minutes to seconds. In addition, because of the way remotefilelog stores its local data on disk, large rebases are 2x faster. When compared with our previous Git infrastructure, the numbers remain impressive. Achieving these types of performance gains through extensions is one of the big reasons we chose Mercurial.

Finally, the remotefilelog extension allowed us to shift most of the request traffic to memcache, which reduced the Mercurial server’s network load by more than 10x. This will make it easier for our Mercurial infrastructure to keep scaling to meet growing demand.

How it works

Mercurial has several nice abstractions that made this extension possible. The most notable is the filelog class. The filelog is a data structure for representing every revision of a particular file. Each version of a file is identified by a unique hash. Given a hash, the filelog can reconstruct the requested version of a file. The remotefilelog extension replaces the filelog with an alternative implementation that has the same interface. It accepts a hash, but instead of reconstructing the version of the file from local data, it fetches that version from either a local cache or the remote server. When we need to request a large number of files from the server, we do it in large batches to avoid the overhead of many requests.

Open Source

Together, the hgwatchman and remotefilelog extensions have improved source control performance for our developers, allowing them to spend more time getting stuff done instead of waiting for their tools. If you have a large deployment of a distributed revision control system, we encourage you to take a look at them. They’ve made a difference for our developers, and we hope they will prove valuable to yours, too.

(Via: Facebook Engineering Blog)

 

Apache Kafka – A Different Kind of Messaging System

Apache has released Kafka 0.8, the first major release of Kafka since becoming an Apache Software Foundation top level project. Apache Kafka is publish-subscribe messaging implemented as a distributed commit log, suitable for both offline and online message consumption. It is a messaging system initially developed at LinkedIn for collecting and delivering high volumes of event and log data with low latency. The latest release includes intra-cluster replication and multiple data directories support. Request processing is also now asynchronous, implemented via a secondary pool of request handling threads. Log files can be rotated by age, and log levels can be set dynamically via JMX. A performance test tool has been added, to help fix existing performance concerns and look for potential performance enhancements.

Kafka is a distributed, partitioned, replicated commit log service. Producers publish messages to Kafka topics, and consumers subscribe to these topics and consume the messages. A server in a Kafka cluster is called a broker. For each topic, the Kafka cluster maintains a partition for scaling, parallelism and fault-tolerance. Each partition is an ordered, immutable sequence of messages that is continually appended to a commit log. The messages in the partitions are each assigned a sequential id number called the offset.

The offset is controlled by the consumer. The typical consumer will process the next message in the list, although it can consume messages in any order, as the Kafka cluster retains all published messages for a configurable period of time. This makes consumers very cheap, as they can come and go without much impact on the cluster, and allows for offline consumers like Hadoop clusters. Producers are able to choose which topic, and which partition within the topic, to publish the message to. Consumers assign themselves a consumer group name, and each message is delivered to one consumer within each subscribing consumer group. If all the consumers have different consumer groups, then messages are broadcasted to each consumer.

Kafka can be used like a traditional message broker. It has high throughput, built-in partitioning, replication, and fault-tolerance, which makes it a good solution for large scale message processing applications. Kafka can also be used for high volume website activity tracking. Site activity can be published, and can be processed real-time, or loaded into Hadoop or offline data warehousing systems. Kafka can also be used as a log aggregation solution. Instead of working with log files, logs can be treated a stream of messages.

Kafka is used at LinkedIn and it handles over 10 billion message writes per day with a sustained load that averages 172,000 messages per second. There is heavy use of multi-subscriber support, both from internal and external applications that make use of the data. There is a ratio of roughly 5.5 messages consumed for each message produced, which results in a daily total in excess of 55 billion messages delivered to real-time consumers. There are 367 topics that cover both user activity topics and operational data, the largest of which adds an average of 92GB per day of batch-compressed messages. Messages are kept for 7 days, and these average at about 9.5 TB of compressed messages across all topics. In addition to live consumers, there are numerous large Hadoop clusters which consume infrequent, high-throughput, parallel bursts as part of the offline data load.

To get started, visit the official Apache Kafka documentation page from where you can learn more and download Kafka. There is also a paper from LinkedIn titled Building LinkedIn’s Real-time Activity Data Pipeline, which talks about why Kafka was built and the factors that contributed to Kafka’s design.

(via InfoQ.com)

Netflix open sources its data traffic cop, Suro

Netflix is open sourcing a tool called Suro that the company uses to direct data from its source to its destination in real time. More than just serving a key role in the Netflix data pipeline, though, it’s also a great example of the impressive — if not sometimes redunant — ecosystem of open source data-analysis tools making their way out of large web properties.

Netflix’s various applications generate tens of billions of events per day, and Suro collects them all before sending them on their way. Most head to Hadoop (via Amazon S3) for batch processing, while others head to Druid and ElasticSearch (via Apache Kafka) for real-time analysis. According to the Netflix blog post explaining Suro (which goes into much more depth), the company is also looking at how it might use real-time processing engines such as Storm or Samza to perform machine learning on event data.

An example Suro workflow. Source: Netflix

An example Suro workflow. Source: Netflix

To anyone familiar with the big data space, the names of the various technologies at play here are probably associated to some degree with the companies that developed them. Netflix created Suro, LinkedIn created Kafka and Samza, Twitter (actually, Backtype, which Twitter acquired) created Storm, and Metamarkets (see disclosure) created Druid. Suro, the blog post’s authors acknowledged, is based on the Apache Chukwa project and is similar to Apache Flume (created by Hadoop vendor Cloudera) and Facebook’s Scribe. Hadoop, of course, was all but created at Yahoo and has since seen notable ecosystem contributions from all sorts of web companies.

I sometimes wonder whether all these companies really need to be creating their own technologies all the time, or if they could often get by using the stuff their peers have already created. Like most things in life, though, the answer to that question is probably best decided on a case-by-case basis. Storm, for example, is becoming a very popular tool for stream processing, but LinkedIn felt like it needed something different and thus built Samza. Netflix decided it needed Suro as opposed to just using some pre-existing technologies (largely because of its cloud-heavy infrastructure running largely in Amazon Web Services), but also clearly uses lots of tools built elsewhere (including the Apache Cassandra database).

A diagram of LinkedIn's data architecture as of February 2013.

A diagram of LinkedIn’s data architecture as of February 2013, including everything from Kafka to Teradata.

Hopefully, the big winners in all this innovation will be mainstream technology users that don’t have the in-house talent (or, necessarily, the need) to develop these advanced systems in-house but could benefit from their capabilities. We’re already seeing Hadoop vendors, for example, trying to make projects such as Storm and the Spark processing framework usable by their enterprise customers, and it seems unlikely they’ll be the last. There are a whole lot of AWS users, after all, and they might want the capabilities Suro can provide without having to rely on Amazon to deliver them. (We’ll likely hear a lot more about the future of Hadoop as a data platform at our Structure Data conference, which takes place in March.)

(source: Gigaom.com)

 

Eight Cloud and Big Data Predictions for 2014

clip_image002Prediction has always been a fun exercise. It forces you to take a step back from your day to day activities, look at the market “crystal ball” and figure out what the future looks like. This year I found this exercise to be particularly difficult as the amount of new innovation and conflicting trends that are taking place at the same time can be very confusing.

After spending quite some time reading market analyses from different sources (see references at the end) and compiling them with all the things I’ve seen and heard throughout the year, I came to the following observation on how 2014 will look for Cloud and Big Data.  I’m happy to exchange thoughts on that regard and learn how you envision 2014 will look.

 

Public and Private Cloud in 2014

1. Amazon continues to distance itself while Google and Microsoft are catching up

Amazon continues to lead in public cloud and distance itself from the rest of the market. This year it reached 5 times the size of other cloud vendors combined, as noted in a recent Gartner report. Google and Microsoft are slowly closing the gap as the closest alternatives, but at a much slower pace than one would expect given the significant investment of both companies in this area.

clip_image004

2. 2014 will be the year of Enterprise Clouds

Traditional enterprise players, such as IBM, HP, Cisco and Red Hat, continue to fight for the remaining share of the market, mostly around enterprise adoption. Yet, enterprises have been slower to embrace and execute on private cloud strategy. Part of the reason for slow adoption is the gap between the solution provided by most of the regular contenders – who are still competing on selling an end to end story – and the reality that most enterprises are looking for Open and Hybrid cloud strategy, especially given that there is no clear winner.

3. OpenStack will be the most popular choice for Enterprise Clouds in 2014

OpenStack is now high on the radar for Enterprise Cloud, mostly threatening VMware’s strong leadership in that domain. Most of the main contenders have embraced a strong OpenStack strategy, including Red Hat, Ubuntu, Suse, HP, IBM and Cisco, with Cisco taking a surprising leading position over IBM and HP, according to a recent Forrester survey.
clip_image006
VMware’s response of embracing OpenStack is still questionable, as the transition to OpenStack is not only about technical integration, but also involves a big shift in the value chain. More enterprises are less keen on paying high cost for hypervisor licenses, which is to date the main revenue channel in the VMware pie.
clip_image008

4. Native OpenStack alternatives will disrupt many of the existing cloud solutions

The rapid adoption of OpenStack will also disrupt many of the existing cloud solutions that were built in a pre-OpenStack world. New solutions that take a more native-to-OpenStack approach will pop up and replace many of the current solutions. A good example of this is Project Solum, which aims to provide a native alternative to existing PaaS solutions, such as CloudFoundry and OpenShift as I outlined in this recent post. I expect that will see more of these disruptions expanding to other frameworks in 2014.

5. Orchestration & Automation will be the next big thing in 2014

Having said all that, the remaining challenge of enterprises is to break the IT bottleneck. This bottleneck is created by IT-centric decision-making processes, a.k.a “IaaS First Approach,” in which IT is focused on building a private cloud infrastructure – a process that takes much longer than anticipated when compared with a more business/application-centric approach. One of the ways to overcome that challenge is to abstract the infrastructure and allow other departments within the organization to take a parallel path towards the cloud, while ensuring future compatibility with new development in the IT-led infrastructure.
clip_image010

DevOps has definitely been a key example for a business-led initiative that determines the speed of innovation and, thus, competitiveness of many organizations. The move to DevOps forces many organizations to go through both cultural and technology changes in making the biz and application more closely aligned, not just in goals, but in processes and tools as well.

Enterprises with legacy environments and customers take a two-step approach. They first tackle continuous delivery – automating the packages of their software deliverable and keeping tighter control over new production rollout. Once enterprises feel comfortable with their process and environment, they will automate the entire deployment process into production.

Configuration management, orchestration and workflow automation become key enablers in enterprise transition to cloud, and will gain much attention in 2014 and 2015. Again, Amazon has already recognized that need, introducing into the space a new offering called OpsWorks. It is only a matter of time until will see similar offerings embedded with other cloud providers in both public and private clouds.

As the focus moves to orchestration with greater focus on standardization of deployments and packages, DSL and API become equally important. Existing standards, such as TOSCA and CAMP, will undergo massive modifications to make them simpler to use and fit the cloud environment.

Big Data Predictions

There are many advancements happening within the Big Data/NoSQL domain that I’m not going to touch on. I will focus on two main areas that are close to my work.

6. 2014 will see a major increase of Big Data in the Cloud

Cloud infrastructure is closing the gap with the existing data center, as we see the emergence of support for Bare Metal, High CPU, High Memory and Flash-Disk. Many cloud providers are already offering Big Data as Service, such as Elastic Map Reduce and Redshift, and are continuously expanding their offering on that regard.

This advancement in cloud infrastructure removes almost all of the technical barriers for running I/O intensive workloads, such as Big Data analytics, on the cloud. I expect that in 2014, running Big Data analytics on the cloud will become the first choice for any new project, while the use of Big Data in non-cloud environments will be minimized to only niche use cases with extreme regulations and security constraints.

7. In-Memory Data backed by flash-disk will become a popular choice for Real-Time Big Data analytics
Flash disk pricing changes the economics behind the cost/performance ratio of disk-based solutions, making it possible to achieve the performance of an In-Memory-based solution at a price closer to a disk-based solution. So far, attempts have been made to use flash disks as a fast disk alternative to magnetic drives. This path inherits many of the limitations of a disk-based drive and, therefore, doesn’t capture the full potential of flash disk which, like RAM, can provide highly parallel access to data.
clip_image012

At the same time, memory-based solutions like In-Memory Database or In-Memory Data Grid cover a small niche in the Big Data ecosystem, mostly due to the high cost/performance ratio.

I believe that the combination of flash drive, In-Memory data-grids and databases at the front-end change that dynamic and will make memory-based solutions backed by flash disk much more attractive to a bigger niche, specifically as it relates to real-time analytics of Big Data.

In some cases, the combination of memory-based solutions is also integrated with existing Big Data frameworks and, thus, provides seamless performance acceleration. A good example is GigaSpaces’ integration with Storm and GridGain’s integration with Hadoop.

8. Real-time analytics turns mainstream

The most interesting indication that real-time analytics is becoming mainstream is Amazon’s support for real-time analytics, with many of the existing analytics solutions providing real-time analytics capabilities as built-in parts of their reports. Another good example of that is Google Analytics Real Time View.

New disruptive forces in Cloud worth watching in 2014
Disruptive technologies change the market landscape in ways that are difficult to anticipate by their very nature. Rather than predicting their impact, I felt it would be simpler to list them.

  • Networking - The networking segment of IT is experiencing a major disruption as of late with the move to Software Defined Network, OpenStack Neutron project and Network Function Virtualization. These developments will change networking from not only a technology perspective, but they are also driving a completely different business model which will be based on utilization or a subscription-based model.
  • Linux Container - Linux containers are gaining interest both as light-weight software packaging as well as VMS’s. The most commonly used use case for Linux containers has as an underlying container for PaaS. With the introduction of new projects, such as Docker – which makes the Linux container easy to use, we will see wider and more pervasive use of containers as high performance virtualization, as packaging tools in continuous deployment scenarios, etc.
  • Bare Metal Cloud - Bare metal cloud has been a small niche in the cloud space, mostly due to the fact that it is usually offered in a static configuration setup. Bare metal clouds provide the same degree of elasticity to bare metal devices. With OpenStack, for example, you can spawn a new bare metal device just as you would provision any other VM. The development would reduce one of the last remaining barriers for bringing mission critical applications to the cloud.
  • OpenStack as an Innovation Accelerator - Many items on the list of disruptive technologies that I listed above are not that new, and to a certain degree, have existed for years, like in the case of Linux Container. Often, a disruptive force needs to gain certain critical mass before it can break into massive adoption. OpenStack creates an ecosystem that provides a platform for many users to integrate new technologies in a way that could be consumed by end users immediately and that plays a major role in the acceleration of adoption of many of those disruptive technologies.

Where do we go from here?

There are many individual technologies and trends that have a disruptive force behind them. Having said that, I think that it is the intersection between those technologies that holds the most promising potential. Here are few examples:

  • Big Data and the Cloud Application Orchestration - Big Data analytics for operational information could serve as the new “brain” behind a new class of orchestration engine that would combine artificial intelligence decision-making based on trends and historical analysis and handle complex failure and scaling scenarios automatically.
  • Putting Network and Applications together - Putting network and applications together holds a lot of promise in the way we scale applications across regions and multiple sites, as well as how we control application SLA’s in a shared environment. For example, giving priority to customer-facing services versus batch analytics or optimizing the network routing based on the locality of the data, etc.

(via: Nati Shalom’s Blog)

 

Advanced Hard Drive Caching Techniques

With the introduction of the solid-state Flash drive, performance came to the forefront for data storage technologies. Prior to that, software developers and server administrators needed to devise methods for which they could increase I/O throughput to storage, most of which resulted in low capacity caching to random access memory (RAM) or a RAM drive. Although not as fast as RAM, the Flash drive was almost a dream come true, but it had its limitations—one of which was its low capacities packaged in the NAND-based chips. The traditional spinning disk drive provided users’ desired capacities but lacked in speedy accessibility. Even with the 6Gb SATA protocol, sequential data access at best performed at approximately 150MB per second (or MB/s) for both read and write operations, while random access varied between 2–5MB/s as the seeking across multiple sectors laid out in multiple tracks across multiple spinning platters proved to be an extremely disruptive bottleneck. The solid-state drive (SSD) with no movable components significantly decreased these access latencies, thus rendering this bottleneck almost nonexistent.

Even today, the consumer SSD cannot compare to the capacities provided by the magnetic hard disk drive (or HDD), which is why in this article I intend to introduce readers to proven methods for obtaining near SSD performance with the traditional HDD. Multiple open-source projects exist that can achieve this, all but one of which utilizes an SSD as a caching node, and the other caches to RAM. The device drivers I cover here are dm-cache, FlashCache and the RapidDisk/RapidCache suite; I also briefly discuss bcache and EnhanceIO.

Note:

To build the kernel modules shown in this article, you need to have either the full kernel source or the kernel headers installed for your current kernel image revision.

In my examples, I am using a commercial SATA III (6Gbps) SSD with an average performance of the following:

  • Sequential read: 231MB/s
  • Sequential write: 74MB/s
  • Random read: 230MB/s
  • Random write: 72MB/s

This SSD provides the caching layer for a slower mechanical SATA III HDD that performs at the following:

  • Sequential read: 115MB/s
  • Sequential write: 72MB/s
  • Random read: 2MB/s
  • Random write: 2MB/s

In my environment, the SSD is labeled as /dev/sdb, and the HDD is /dev/sda3. These are non-intrusive transparent caching solutions intended to achieve the performance benefits of SSDs. They can be added and removed to existing storage targets without issue or data loss (assuming that all cached data has been flushed to disk successfully). Also, all the examples here showcase a write-back caching scheme with the exception of RapidCache, which instead will be used in write-through mode. In write-back mode, newly written data is cached but not immediately written to the destination target. Write-through mode always will write new data to the target while still maintaining it in cache for future reads.

Note:

The benchmarks shown here were obtained by using FIO, a file I/O benchmarking and test tool designed for data storage technologies. It is maintained by Linux kernel developer Jens Axboe. Unless noted otherwise, all captured I/O is written at the typical 4KB page size, asynchronously to the storage target 32 transfers at a time (that is, queue depth).

dm-cache

dm-cache has been around for quite some time—at least since 2006. It originally made its debut as a research project developed by Dr Ming Zhao through his summer internship at IBM research. The dm-cache module just recently was integrated into the Linux kernel tree as of version 3.9. Whether you choose to enable it in a recently downloaded kernel or compile it from the official project site, the results will be the same. To load the module, you need to invoke modprobe or insmod:

$ sudo modprobe dm-cache

Now that the module is loaded, you need to inform that module about which drive to point to for the cache and which to point to for the destination. The dm-cache project site provides a Perl script to simplify this process called dmc-setup.pl. For example, if I wanted to use the entire SSD in write-back caching mode with a 4KB block size, I would type:

$ sudo perl dmc-setup.pl -o /dev/sda3 -c /dev/sdb -n cache -b 8 -w 

This script is a wrapper to the equivalent dmsetup command below:

$ echo 0 20971520 cache /dev/sda3 /dev/sdb 0 8 65536 16 1 | 
 ↪dmsetup create cache

The dm-cache documentation hosted on the project site provides details on each parameter field, so I don’t cover them here.

You may notice that in both examples, I named the mapping to both drives “cache”. So, when I need to access the drive mapping, I must refer to it as “cache”.

The following mapping passes all data requests to the caching driver, which in turn performs the necessary magic to process the requests either by handling it entirely out of cache or both the cache and the slower device:

$ ls -l /dev/mapper
total 0
lrwxrwxrwx 1 root root       7 Jun 30 12:10 cache -> ../dm-0
crw------- 1 root root 10, 236 Jun 30 11:52 control

Just like with any other device-mapper-enabled target, I also can pull up detailed mapping data:

$ sudo dmsetup status cache
0 20971520 cache stats: reads(83), writes(0), 
 ↪cache hits(0, 0.0),replacement(0), replaced dirty blocks(0)

$ sudo dmsetup table cache
0 20971520 cache conf: capacity(256M), associativity(16), 
 ↪block size(4K), write-back

If the target drive already is formatted with data on it, you just need to mount it; otherwise, format it to your specified filesystem:

$ sudo mke2fs -F /dev/mapper/cache 

Remember, these solutions are non-intrusive, so if you have existing data that needs to remain on that disk drive, skip the above step and go straight to mounting it for data accessibility:

    $ sudo mount /dev/mapper/cache /mnt/cache
    $ df|grep cache  /dev/mapper/cache  10321208 1072632   8724288  11% /mnt/cache

Using a benchmarking utility, the numbers will vary. On read operations, it is wholly dependent on whether the desired data resides in cache or whether the module needs to retrieve it from the slower disk. On write operations, it depends on the Flash technology itself, and whether it needs to go through a typical programmable erase (PE) cycle to write the new data. Regardless of this, the random read/write access to the slower drive has been increased significantly:
  • Sequential read: 105MB/s
  • Sequential write: 50MB/s
  • Random read: 67MB/s
  • Random write: 51MB/s

You can continue monitoring the cache status by typing:

$ sudo dmsetup status cache 
0 20971520 cache stats: reads(301319), writes(353216), 
 ↪cache hits(24485, 0.3),replacement(345972), 
 ↪replaced dirty blocks(92857)

To remove the cache mapping, unmount the drive and invoke dmsetup:

$ sudo umount /mnt/cache
$ sudo dmsetup remove cache

FlashCache

FlashCache is a project developed and maintained by Facebook. It was inspired by dm-cache. Much like dm-cache, it too is built from the device-mapper framework. It currently is hosted on GitHub and can be cloned from there. The repository encompasses the kernel module and administration utilities. Once built and installed, load the kernel module and in a similar fashion to the previous examples, create a mapping of the SSD and HDD:

$ sudo modprobe flashcache
$ sudo flashcache_create -p back -b 8 cache /dev/sdb /dev/sda3
cachedev cache, ssd_devname /dev/sdb, disk_devname /dev/sda3 
 ↪cache mode WRITE_BACK block_size 8, md_block_size 8, 
 ↪cache_size 0
FlashCache metadata will use 223MB of your 3944MB main memory

The flashcache_create administration utility is similar to the dmc-setup.pl Perl script used for dm-cache. It is a wrapper utility designed to simplify the dmsetup process. As with the dm-cache module, once the mapping has been created, you can view mapping details by typing:

$ sudo dmsetup table cache
0 20971520 flashcache conf:
    ssd dev (/dev/sdb), disk dev (/dev/sda3) cache mode(WRITE_BACK)
    capacity(57018M), associativity(512), data block size(4K) 
     ↪metadata block size(4096b)
    skip sequential thresh(0K)
    total blocks(14596608), cached blocks(83), cache percent(0)
    dirty blocks(0), dirty percent(0)
    nr_queued(0)
Size Hist: 4096:83 
$ sudo dmsetup status cache
0 20971520 flashcache stats: 
    reads(83), writes(0)
    read hits(0), read hit percent(0)
    write hits(0) write hit percent(0)
    dirty write hits(0) dirty write hit percent(0)
    replacement(0), write replacement(0)
    write invalidates(0), read invalidates(0)
    pending enqueues(0), pending inval(0)
    metadata dirties(0), metadata cleans(0)
    metadata batch(0) metadata ssd writes(0)
    cleanings(0) fallow cleanings(0)
    no room(0) front merge(0) back merge(0)
    disk reads(83), disk writes(0) ssd reads(0) ssd writes(83)
    uncached reads(0), uncached writes(0), uncached IO requeue(0)
    disk read errors(0), disk write errors(0) ssd read errors(0) 
     ↪ssd write errors(0)
    uncached sequential reads(0), uncached sequential writes(0)
    pid_adds(0), pid_dels(0), pid_drops(0) pid_expiry(0)

Mount the mapping for file accessibility:

$ sudo mount /dev/mapper/cache /mnt/cache

Using the same benchmarking utility, observe the differences between FlashCache and the previous module:

  • Sequential read: 284MB/s
  • Sequential write: 72MB/s
  • Random read: 284MB/s
  • Random write: 71MB/s

The numbers look more like the native SSD performance. However, I want to note that this article is not intended to prove that one solution performs better than the other, but instead to enlighten readers of the many methods you can use to accelerate data access to existing and slower configurations.

To unmount and remove the drive mapping, type the following in the terminal:

$ sudo umount /mnt/cache
$ sudo dmsetup remove /dev/mapper/cache

RapidDisk and RapidCache

Currently at version 2.9, RapidDisk is an advanced Linux RAM disk whose features include the capabilities to allocate RAM dynamically as a block device, use it as standalone disk drives, or even map it as caching nodes to slower local disk drives via RapidCache (the latter of which was inspired by FlashCache and uses the device-mapper framework). RAM is being accessed to handle the data storage by allocating memory pages as they are needed. It is a volatile form of storage, so if power is removed or if the computer is rebooted, all data stored within RAM will not be preserved. This is why the RapidCache module was designed to handle only read-through/write-through caching, which means that whatever is intended to be written to the slower storage device will be cached to RapidCache and written immediately to the hard drive. And, if data is being requested from the hard drive and it does not pre-exist in the RapidCache node, it will read the data from the slower device and then cache it to the RapidCache node. This method will retain the same write performance as the hard drive, but significantly increase sequential and random access read performance to cached data.

Once the package, which consists of two kernel modules and an administration utility, is built and installed, you need to insert the modules by typing the following on the command line:

$ sudo modprobe rxdsk
$ sudo modprobe -r rxdsk

Let’s assume that you’re running on a computer that contains 4GB of RAM, and you confidently can say that at least 1GB of that RAM is never used by the operating system and its applications. Using RapidDisk to create a RAM drive of 1GB in size, you would type:

$ sudo rxadm --attach 1024

Remember, RapidDisk will not pre-allocate this storage. It will allocate RAM only as it is used.

A quick benchmark test of just the RAM drive produces some overwhelmingly fast results with 4KB I/O transfers:

  • Sequential read: 1.6GB/s
  • Sequential write: 1.6GB/s
  • Random read: 1.3GB/s
  • Random write: 1.1GB/s

It produces the following with 1MB I/O transfers:

  • Sequential read: 4.9GB/s
  • Sequential write: 4.3GB/s
  • Random read: 4.9GB/s
  • Random write: 4.0GB/s

Impressive, right? To utilize such a speedy RAM drive as a caching node to a slower drive, a mapping must be created, where /dev/rxd0 is the node used to access the RAM drive, and /dev/mapper/rxc0 is the node used to access the mapping of the two drives:

$ sudo rxadm --rxc-map rxd0 /dev/sda3 4

You can get a list of attached devices and mappings by typing:

$ sudo rxadm --list
rxadm 2.9
Copyright 2011-2013 Petros Koutoupis

List of rxdsk device(s):

 RapidDisk Device 1: rxd0
    Size: 1073741824

List of rxcache mapping(s):

 RapidCache Target 1: rxc0
0 20971519 rxcache conf:
    rxd dev (/dev/rxd0), disk dev (/dev/sda3) mode (WRITETHROUGH)
    capacity(1024M), associativity(512), block size(4K)
    total blocks(262144), cached blocks(0)
 Size Hist: 512:663 

As with the previous device-mapper-based solutions, you even can list detailed information of the mapping by typing:

$ sudo dmsetup table rxc0
0 20971519 rxcache conf:
    rxd dev (/dev/rxd0), disk dev (/dev/sda3) mode (WRITETHROUGH)
    capacity(1024M), associativity(512), block size(4K)
    total blocks(262144), cached blocks(0)
 Size Hist: 512:663 

$ sudo dmsetup status rxc0
0 20971519 rxcache stats: 
    reads(663), writes(0)
    cache hits(0) replacement(0), write replacement(0)
    read invalidates(0), write invalidates(0)
    uncached reads(663), uncached writes(0)
    disk reads(663), disk writes(0)
    cache reads(0), cache writes(0)

Format the mapping if needed and mount it:

$ sudo mount /dev/mapper/rxc0 /mnt/cache

A benchmark test produces the following results:

  • Sequential read: 794MB/s
  • Sequential write: 70MB/s
  • Random read: 901MB/s
  • Random write: 2MB/s

Notice that the write performance is not very great, and that’s because it is not meant to be. Write-through mode promises only faster read performance of cached data and consistent write performance to the original drive. The read performance, however, shows significant improvement when accessing cached data.

To remove the mapping and detach the RAM drive, type the following:

$ sudo umount /mnt/cache
$ sudo rxadm --rxc-unmap rxc0
$ sudo rxadm --detach rxd0

Other Solutions Worth Mentioning

bcache:

bcache is relatively new to the hard drive caching scene. It offers all the same features and functionalities as the previous solutions with the exception of its capability to map one or more SSDs as the cache for one or more HDDs instead of one volume to one volume. The project’s maintainer does, however, tout its superiority over the other solutions when it comes to data access performance from the cache. From what I can tell, bcache is unlike the previous solutions where it does not rely on the device-mapper framework and instead is a standalone module. At the time of this writing, it is set to be integrated into release 3.10 of the Linux kernel tree. Unfortunately, I haven’t had the opportunity or the appropriate setup to test bcache. As a result, I haven’t been able to dive any deeper into this solution and benchmark its performance.

EnhanceIO:

EnhanceIO is an SSD caching solution produced by STEC, Inc., and hosted on GitHub. It was greatly inspired by the work done by Facebook for FlashCache, and although it’s open-source, a commercial version is offered by the company for those seeking additional support. STEC did not simply modify a few lines of code of FlashCache and republish it. Instead, STEC rewrote the write-back caching logic while also improving other areas, such as memory footprint, failure handling and more. As with bcache, I haven’t had the opportunity to install and test EnhanceIO.

Summary

These solutions are intended to provide users with near SSD speeds and HDD capacities at a significantly reduced cost. From the data center to your home office, these solutions can be deployed almost anywhere. They also can be tuned to operate more appropriately in their intended environments. Some of them even offer a variety of caching algorithm options, such as Least Recently Used (LRU), Most Recently Used (MRU), hybrids of the two or just a simple first-in first-out (FIFO) caching scheme. The first three options can be expensive regarding performance, as they require the tracking of cached data sets for what has been accessed and how recently in order to determine whether to discard it. FIFO, however, functions as a circular buffer in which the oldest cached data set will be discarded first. With the exception of RapidCache, the SSD-focused modules also preserve metadata of the cache to ensure that any disruptions, including power cycles/outages, don’t compromise the integrity of the data.

Resources

dm-cache: http://visa.cs.fiu.edu/tiki/dm-cache

FlashCache: https://github.com/facebook/flashcache

EnhanceIO: https://github.com/stec-inc/EnhanceIO

bcache: http://bcache.evilpiepirate.org

RapidDisk: http://www.rapiddisk.org

FIO Git Repository: http://git.kernel.dk/?p=fio.git;a=summary

Wikipedia Page on Caching Algorithms:http://en.wikipedia.org/wiki/Cache_algorithms

 (source: LinuxJournal.com)

Rocksdb Architecture – An open source Key-Value DB builds on LevelDB

1. Introduction

The rocksdb project started at Facebook as an experiment to develop an efficient database software that can realize the full potential of storing data on fast storage ( especially flash storage) for server workloads. It is a C++ library and can be used to store keys-and-values where keys and values are arbitrary size byte streams. It has support for atomic reads and atomic writes. It has highly flexible configurable settings that can be tuned to run on a variety of production environments: it can be configured to run on data on pure memory, flash, hard disks or on HDFS. It has support for various compression algorithms and good tools for production support and debugging.

Rocksdb borrows significant code from the open source leveldb project as well as significant ideas from Apache HBase. The initial code was forked from open source leveldb 1.5. It also builds upon code and ideas that were built at Facebook before the birth of rocksdb.

2. Assumptions and Goals

Performance:

The primary design point for rocksdb is that it should be performant for fast storage and for server workloads. It should be able to exploit the full potential of high read/write rates offered by flash or RAM-memory subsystems. It should support efficient point lookups as well as range scans. It should be configurable to support high random-read workloads, high update workloads or a combination of both. Its architecture should support easy tuning of Read Amplification, Write Amplification and Space Amplification.

Production Support:

Rocksdb should be designed in such a way that it has built-in support for tools and utilities that help deployment and debugging in production environments. Most major parameters should be fully tunable so that it can be used by different applications on different hardware.

Backward Compatibility:

Newer versions of this software should be backward compatible, so that existing applications do not need to change when upgrading to newer releases of rocksdb.

3. High Level Architecture

Rocksdb is a key-value store where keys and values are arbitrary byte streams. rocksdb organizes all data in sorted order and the common operations are Get(key)Put(key)Delete(key) and Scan(key).

The three basic constructs of rocksdb are memtablesstfile and logfile. The memtable is an in-memory data structure, new writes are inserted into the memtable and are optionally written to the logfile. The logfile is a sequentially written file on storage. When the memtablefills up, it is flushed to a sstfile on storage and the corresponding logfile can be safely deleted. The data in an sstfile is kept sorted to facilitate easy lookup of keys.

The format of an sstfile is described in more details here.

4. Features

Gets, Iterators and Snapshots

Rocksdb is a data store that stores arbitrary keys and values. The keys and values are treated as pure byte streams. There is no limit to the size of a key or a value. There is a Get api that allows an application to fetch a single key-value from the database. A MultiGet api allows an application to retrieve a bunch of keys from the database, all the keys-values returned via a MultiGet call are consistent with one-another.

All data in the database is logically arranged in sorted order. An application can specify a Key-Comparison method that specifies a total ordering of keys. An Iterator api allows an application to do a RangeScan on the database. The Iterator can seek to a specified key and then the application can start scanning one key at a time from that point. The Iterator api can also be used to do a reverse iteration of the keys in the database. A consistent-point-in-time view of the database is created when the Iterator is created. Thus, all keys returned via the Iterator are from a consistent view of the database.

Snapshot api allows an application to create a point-in-time view of a database. The Get and Iterator apis can be used to read data from a specified snapshot. In some sense, a Snapshot and an Iterator both provide a point-in-time view of the database. But their implementations are different. Short lived scans are best done via an iterator while long-running scans are better used via a snapshot. An iterator keeps a reference count on all underlying files that correspond to that point-in-time-view of the database, these files will not get deleted till the Iterator is released. On the other hand, a snapshot does not prevent files deletions; instead the compaction process understands the existence of snapshots and promises to never delete a key that is visible in any existing snapshot.

Snapshots are not persisted across database restarts: a reload of the rocksdb library (via a server restart) would release all pre-existing snapshots.

Prefix Iterators

Most LSM engines cannot support an efficient RangeScan api because it needs to look into every data file. But most applications do not do pure-random scans of key ranges in the database; instead applications typically scan within a key-prefix. Rocksdb uses this to its advantage. Applications can configure a prefix_extractor to specify a key-prefix. Rocksdb uses this to store blooms for every key-prefix. An iterator that specifies a prefix (via ReadOptions) will use these bloom bits to avoid looking into data files that do not contain keys with the specified key-prefix.

Updates

There is a Put api that can insert a single key-value to the database. A Write api allows multiple keys-values to be inserted atomically into the database. The database guarantees that the keys-values in a single Write call will either all be inserted into the database or none of them will be inserted into the database.

Persistency

Rocksdb has a transaction log. All Puts are stored in a in-memory buffer called the memtable as well as optionally inserted into a transaction log. Each Put has a set of flags specified via WriteOptions. The WriteOptions can specify whether the Put should be inserted into the transaction log. Also, the WriteOptions can specify whether a sync call is issued to the transaction log before a Put is declared to be committed.

Internally, rocksdb uses a batch-commit mechanism to batch transactions into the transaction log so that it can potentially commit multiple transactions using a single sync call.

Fault Tolerance

Rocksdb uses a checksum to detect corruptions in the storage. These checksums are for each block; a block is typically anywhere between 4K to 128K in size. A block, once written to storage, is never modified. Rocksdb will dynamically detect hardware support for checksum computations and avail that support wherever available.

Multi-Threaded Compactions

Compactions are needed to remove multiple copies of the same key that can occur if an application overwrites an existing key. Compactions also process deletions of keys. Compactions can occur in multiple threads if configured appropriately. The overall write throughput of an LSM database is directly dependent on the speed at which compactions can occur, especially when the data is stored in fast storage like SSD or RAM. Rocksdb can be configured to issue compaction requests from multiple threads concurrently. It is observed that sustained write rates can increase upto a factor of 10 with multi-threaded compaction when the database is on SSDs compared to single-threaded compactions.

The entire database is stored in a set of sstfiles. When a memtable is full, its content is written out to a file in Level-0 (L0). Rocksdb removes duplicates and overwritten keys in the memtable when it is flushed to a file in L0. Periodically, some files are read in and merged to form larger files. This is called Compaction.

Rocksdb supports two different styles of compaction. The Universal Style Compaction stores all files in L0 and all files are arranged in time order. A compaction picks up a few files that are chronologically adjacent to one another and merges them back into a new file L0. All files can have overlapping keys.

The Level Style Compaction stores data in multiple levels in the database. The more recent data is in L0 and the earliest data is in Lmax. Files in L0 can have overlapping keys but files in other layers do not have overlapping keys. A compaction process picks one file in Ln and all its overlapping files in Ln+1 and replaces them with new files in Ln+1. The Universal Style Compaction typically results in lower write amplification but higher space amplification than Level Style Compaction.

There is a MANIFEST file in the database that records the state of the database. The Compaction process adds new files and deletes existing files from the database, and it makes these operations persistent by recording them in the MANIFEST file. Transactions to be recorded in the MANIFEST file uses a batch-commit algorithm to amortize the cost of repeated syncs to the MANIFEST file.

Avoiding Stalls

Background compaction threads are also used to flush memtable contents to a file on storage. If all background compaction threads are busy doing long-running compactions, then a sudden burst of writes can fill up the memtable(s) quickly, thus stalling new writes. This situation can be avoided by configuring rocksdb to keep a small set of threads explicitly reserved for the sole purpose of flushing memtableto storage.

Compaction Filter

There are times when an application would like to process keys at compaction time. For example, a database that has inherent support for Time-To-Live (ttl) can optionally remove keys that are expired. This can be done via a application defined Compaction Filter. If the application wants to continuously delete data older than a specific time, then it can use the compaction filter to drop records that have expired. The rocksdb Compaction Filter gives control to the application to modify the value of a key or to entirely drop a key as part of the compaction process. For example, an application can continuously run a data sanitizer as part of the compaction.

ReadOnly Mode

There are times when an application wants to open a database for reading only. It can open the database in ReadOnly mode, the database guarantees that the application won’t be able to modify anything in the database. This results in much higher read performance because oft-traversed code paths can avoid locks completely.

Database Debug Logs

The rocksdb database software writes detailed logs to a file named LOG*. These are mostly used for debugging and analyzing a running system. There is a configuration to allow rolling this LOG at a specified periodicity.

Data Compression

Rocksdb supports snappy, zlib and bzip2 compression. rocksdb can be configured to support different compression algorithms at data in different levels. Typically, 90% of data in the L-max level. A typical installation might configure no-compression for levels L0-L2, snappy compression for the mid levels and zlip compression for Lmax.

Transaction Logs

Rocksdb stores transactions into logfile to protect against system crashes. On restart, it re-processes all the transactions that were recorded in the logfile. The logfile can be configured to be stored in a directory different from the directory where the sstfiles are stored. This is necessary for those cases where you might want to store all data files in non-persistent fast storage, while at the same time, you can ensure no data loss by putting all transaction logs on slower but persistent storage.

Full Backups, Incremental Backups and Replication

Rocksdb has support for full backups and incremental backups. Rocksdb is an LSM database engine, so data files once created are never overwritten and this makes it easy to extract a list of file-names that correspond to a point-in-time snapshot of the database contents. The api DisableFileDeletions instructs rocksdb to not delete data files. Compactions will continue to occur, but files that are not needed by the database will not be deleted. An backup application can then invoke the api GetLiveFiles/GetSortedWalFiles to retrieve the list of live files in the database and copy them to a backup location. Once the backup is complete, the application can invoke EnableFileDeletions; the database is now free to reclaim all the files that are not needed any more.

Incremental Backups and Replication need to be able to find and tail all the recent changes to the database. The api GetUpdatesSinceallows an application to tail the rocksdb transaction log. It can continuously fetch transactions from the rocksdb transaction log and apply them to a remote replica or a remote backup.

A replication system typically wants to annotate each Put with some arbitrary metadata. This metadata may be used to detect loops in the replication pipeline. It can also be used to timestamp and sequence transactions. For this purpose, rocksdb supports a api calledPutLogData that an application can use to annotate each Put with metadata. This metadata is stored only in the transaction log and is not stored in the data files. The metadata inserted via PutLogData can be retrieved via the GetUpdatesSince api.

Rocksdb transaction logs are created in the database directory. When a log file is no longer needed, it is moved to the archive directory. The reason for the existence of the archive directory is because a replication stream that is falling behind might need to retrieve transactions from a log file that is way in the past. The api GetSortedWalFiles returns a list of all transaction log files.

Support for Multiple Embedded Databases in the same process

A common use-case for applications that use rocksdb is that they inherently partition their data set into multiple logical partitions or shards. This technique is helpful for application load balancing and fast recovery from faults. This means that a single server process should be able to operate multiple rocksdb databases simultaneously. This is done via an environment object named Env. Among other things, a thread pool is associated with an Env. If applications want to share a common thread pool (for background compactions) among multiple database instances, then it should use the same Env object for opening those databases.

Similarly, there is support for multiple database instances to share the same block cache.

Block Cache — Compressed and Uncompressed Data

Rocksdb uses a LRU cache for blocks to serve reads. The block cache is partitioned into two individual caches: the first one caches uncompressed blocks and the second one caches compressed blocks in RAM. If a compressed block cache is configured, then the database intelligently avoids caching data in the OS buffers.

Table cache

The Table Cache is a construct that caches open file descriptors. These file descriptors are for sstfiles. An application can specify the maximum size of the Table Cache.

External Compaction Algorithms

The performance of an LSM database has a significant dependency on the compaction algorithm and implementation. rocksdb has two supported compaction algorithms: LevelStyle and UniversalStyle. But we would like to enable the large community of developers to develop and experiment with other compaction policies. For this reason, rocksdb has appropriate hooks to switch off the inbuilt compaction algorithm and has other apis to allow an application to operate their own compaction algorithms. Options.disable_auto_compaction, if set, disables the inbuilt compaction algorithm. The GetLiveFilesMetaData api allows an external component to look at every data file in the database, decide on which data files to merge and compact, and the DeleteFile api allows it to delete data files that are deemed obsolete.

Non-Blocking Database Access

There are certain applications that are architected in such a way that they would like to retrieve data from the database only if that data retrieval call is non-blocking, i.e. the data retrieval call does not have to read in data from storage. Rocksdb caches a portion of the database in the block cache and these applications would like to retrieve the data only if it is found in this block cache. If this call does not find the data in the block cache then rocksdb returns an appropriate error code to the application. The application can then schedule a normal Get/Next operation understanding that fact that this data retrieval call could potentially block for IO from the storage (maybe in a different thread context).

Stackable DB

Rocksdb has a built-in wrapper-mechanism to layer additional functionality as a layer above the code database kernel. This functionality is encapsulated by an api called StackableDB. For example, the time-to-live functionality is implemented by a StackableDB and is not part of the core rocksdb api. This approach keeps the code modularized and clean.

Memtables:

Pluggable Memtables:

The default implementation of the memtable for rocksdb is a skiplist. The skiplist is an sorted set. The sorted set is a necessary construct when the workload interleaves writes with range-scans. But there are some applications that do not interleave their writes and scans and there are other applications that do not do range-scans altogether. For these applications, a sorted set might not provide the optimal performance. For this reason, rocksdb supports a pluggable api that allows an application to provide their own implementation of a memtable. There are three memtables that are part of the library: a skiplist memtable, a vector memtable and a prefix-hash memtable. A vector memtable is appropriate for bulk-loading data into the database. Every write inserts a new element at the end of the vector; when it is time to flush the memtable to storage the elements in the vector are sorted and written out to a file in L0. A prefix-hash memtable allows efficient processing of gets, puts and scans-within-a-key-prefix.

Memtable Pipelining

Rocksdb supports configuring an arbitrary number of memtables for a database. When a memtable is full it becomes an immutable memtable and a background thread starts flushing its contents to storage. Meanwhile, new writes continue to accumulate to a newly allocated memtable. If the newly allocated memtable is filled up to its limit, it too is converted to an immutable memtable and inserted into the flush pipeline. The background thread continues to flush all the pipelined immutable memtables to storage. This pipelining increases write throughput of rocksdb especially when it is operating on slow storage devices.

Memtable Compaction:

When a memtable is being flushed to storage, an inline-compaction process removes duplicate records from the output steam. Similarly, if an earlier put is hidden by a later delete, then the put is not written to the output file at all. This feature reduces the size of data on storage and write amplification greatly. This is an essential feature when rocksdb is used as a producer-consumer-queue, especially when the lifetime of an element in the queue is very short-lived.

Merge Operator

Rocksdb natively supports three types of records, a Put record, a Delete record and a Merge record. When a compaction process encounters a Merge record, it invokes an application-specified method called the Merge Operator. The Merge can combine multiple Put and Merge records into a single one. This is a powerful feature and allows applications that typically do read-modify-writes to completely avoid the reads. It allows an application to record the intent-of-the-operation as a Merge Record and the rocksdb compaction process lazily applies that intent to the original value. This feature is described in detail in Merge Operator

5. Tools

There are a number of interesting tools that are used to support a database in production. The sst_dump utility dumps all the keys-values in a sst file. The ldb tool can put, get, scan the contents of a database. ldb can also dump contents of the MANIFEST, it can also be used to change the number of configured levels of the database. It can be used to manually compact a database.

6. Tests

There are a bunch of unit tests that test specific features of the database. A make check command runs all unit tests. The unit tests trigger specific features of rocksdb and are not designed to test data correctness at scale. The db_stress test is used to validate data correctness at scale.

7. Performance

a. Setup

All of the benchmarks are run on the same machine. Here are the details of the test setup:

  • 12 CPUs, HT enabled -> 24 vCPUs reported, 2 sockets X 6 cores/socket with X5650 @ 2.67GHz
  • 2 FusionIO devices in SW RAID 0 that can do ~200k 4kb read/second at peak
  • Fusion IO devices were about 50% to 70% full for each of the benchmark runs
  • Machine has 144 GB of RAM
  • Operating System Linux 2.6.38.4
  • 1G rocksdb block cache
  • 1 Billion keys; each key is of size 10 bytes, each value is of size 800 bytes
  • total database size is 800GB

The following benchmark results compare the performance of rocksdb compared to leveldb.. This is an IO bound workload where the database is 800GB while the machine has only 144GB of RAM.

b. Bulk Load of keys in Random Order

Measure performance to load 1B keys into the database. The keys are inserted in random order. The database is empty at the beginning of this benchmark run and gradually fills up. No data is being read when the data load is in progress.

rocksdb:   103 minutes, 80 MB/sec (total data size 481 GB, 1 billion key-values)
leveldb:   many many days (in 20 hours it inserted only 200 million key-values) 

Rocksdb was configured to first load all the data in L0 with compactions switched off and using an unsorted vector memtable. Then it made a second pass over the data to merge-sort all the files in L0 into sorted files in L1. Leveldb is very slow because of high write amplification. Here are the command(s) for loading the data into rocksdb

echo "Bulk load database into L0...."
bpl=10485760;overlap=10;mcz=2;del=300000000;levels=2;ctrig=10000000; delay=10000000; stop=10000000; wbn=30; mbc=20; mb=1073741824;wbs=268435456; dds=1; sync=0; r=1000000000; t=1; vs=800; bs=65536; cs=1048576; of=500000; si=1000000; ./db_bench --benchmarks=fillrandom --disable_seek_compaction=1 --mmap_read=0 --statistics=1 --histogram=1 --num=$r --threads=$t --value_size=$vs --block_size=$bs --cache_size=$cs --bloom_bits=10 --cache_numshardbits=4 --open_files=$of --verify_checksum=1 --db=/data/mysql/leveldb/test --sync=$sync --disable_wal=1 --compression_type=zlib --stats_interval=$si --compression_ratio=50 --disable_data_sync=$dds --write_buffer_size=$wbs --target_file_size_base=$mb --max_write_buffer_number=$wbn --max_background_compactions=$mbc --level0_file_num_compaction_trigger=$ctrig --level0_slowdown_writes_trigger=$delay --level0_stop_writes_trigger=$stop --num_levels=$levels --delete_obsolete_files_period_micros=$del --min_level_to_compress=$mcz --max_grandparent_overlap_factor=$overlap --stats_per_interval=1 --max_bytes_for_level_base=$bpl --memtablerep=vector --use_existing_db=0 --disable_auto_compactions=1 --source_compaction_factor=10000000
echo "Running manual compaction to do a global sort map-reduce style...."
bpl=10485760;overlap=10;mcz=2;del=300000000;levels=2;ctrig=10000000; delay=10000000; stop=10000000; wbn=30; mbc=20; mb=1073741824;wbs=268435456; dds=1; sync=0; r=1000000000; t=1; vs=800; bs=65536; cs=1048576; of=500000; si=1000000; ./db_bench --benchmarks=compact --disable_seek_compaction=1 --mmap_read=0 --statistics=1 --histogram=1 --num=$r --threads=$t --value_size=$vs --block_size=$bs --cache_size=$cs --bloom_bits=10 --cache_numshardbits=4 --open_files=$of --verify_checksum=1 --db=/data/mysql/leveldb/test --sync=$sync --disable_wal=1 --compression_type=zlib --stats_interval=$si --compression_ratio=50 --disable_data_sync=$dds --write_buffer_size=$wbs --target_file_size_base=$mb --max_write_buffer_number=$wbn --max_background_compactions=$mbc --level0_file_num_compaction_trigger=$ctrig --level0_slowdown_writes_trigger=$delay --level0_stop_writes_trigger=$stop --num_levels=$levels --delete_obsolete_files_period_micros=$del --min_level_to_compress=$mcz --max_grandparent_overlap_factor=$overlap --stats_per_interval=1 --max_bytes_for_level_base=$bpl --memtablerep=vector --use_existing_db=1 --disable_auto_compactions=1 --source_compaction_factor=10000000
du -s -k test
504730832   test

Here are the command(s) for loading the data into leveldb:

echo "Bulk load database ...."
wbs=268435456; r=1000000000; t=1; vs=800; cs=1048576; of=500000; ./db_bench --benchmarks=fillrandom --num=$r --threads=$t --value_size=$vs --cache_size=$cs --bloom_bits=10 --open_files=$of --db=/data/mysql/leveldb/test --compression_ratio=50 --write_buffer_size=$wbs --use_existing_db=0

c. Bulk Load of keys in Sequential Order

Measure performance to load 1B keys into the database. The keys are inserted in sequential order. The database is empty at the beginning of this benchmark run and gradually fills up. No data is being read when the data load is in progress.

rocksdb:   36 minutes, 370 MB/sec (total data size 760 GB)
leveldb:   91 minutes, 146 MB/sec (total data size 760 GB)

Rocksdb was configured to use multi-threaded compactions so that multiple threads could be simultaneously compacting (via file-renames) non-overlapping key ranges in multiple levels. This was the primary reason why rocksdb is much much faster than leveldb for this workload. Here are the command(s) for loading the database into rocksdb.

echo "Load 1B keys sequentially into database....."
bpl=10485760;overlap=10;mcz=2;del=300000000;levels=6;ctrig=4; delay=8; stop=12; wbn=3; mbc=20; mb=67108864;wbs=134217728; dds=0; sync=0; r=1000000000; t=1; vs=800; bs=65536; cs=1048576; of=500000; si=1000000; ./db_bench --benchmarks=fillseq --disable_seek_compaction=1 --mmap_read=0 --statistics=1 --histogram=1 --num=$r --threads=$t --value_size=$vs --block_size=$bs --cache_size=$cs --bloom_bits=10 --cache_numshardbits=4 --open_files=$of --verify_checksum=1 --db=/data/mysql/leveldb/test --sync=$sync --disable_wal=1 --compression_type=zlib --stats_interval=$si --compression_ratio=50 --disable_data_sync=$dds --write_buffer_size=$wbs --target_file_size_base=$mb --max_write_buffer_number=$wbn --max_background_compactions=$mbc --level0_file_num_compaction_trigger=$ctrig --level0_slowdown_writes_trigger=$delay --level0_stop_writes_trigger=$stop --num_levels=$levels --delete_obsolete_files_period_micros=$del --min_level_to_compress=$mcz --max_grandparent_overlap_factor=$overlap --stats_per_interval=1 --max_bytes_for_level_base=$bpl --use_existing_db=0

Here are the command(s) for loading the data into leveldb:

echo "Load 1B keys sequentially into database....."
wbs=134217728; r=1000000000; t=1; vs=800; cs=1048576; of=500000; ./db_bench --benchmarks=fillseq --num=$r --threads=$t --value_size=$vs --cache_size=$cs --bloom_bits=10 --open_files=$of --db=/data/mysql/leveldb/test --compression_ratio=50 --write_buffer_size=$wbs --use_existing_db=0

d. Write Performance

Measure performance to randomly overwrite 1B keys into the database. The database was first created by sequentially inserting all the 1 B keys. The results here do not measure the sequential-insertion phase, it measures only second part of the test that overwrites 1 B keys in random order. The test was run with the Write-Ahead-Log (WAL) enabled but fsync on commit was not done to the WAL.

rocksdb: 15 hours 38 min;  56.295 micros/op, 17K ops/sec,  13.8 MB/sec
leveldb: many many days;  600 micros/op,     1.6K ops/sec, 1.3 MB/sec
          (in 5 days it overwrote only 662 million out of 1 billion keys, after which I killed the test)

Rocksdb was configured with 20 compaction threads. These threads can simultaneously compact non-overlapping key ranges in the same or different levels. Rocksdb was also configured for a 1TB database by setting the number of levels to 6 so that write amplification is reduced. L0-L1 compactions were given priority to reduce stalls. zlib compression was enabled only for levels 2 and higher so that L0 compactions can occur faster. Files were configured to be 64 MB in size so that frequent fsyncs after creation of newly compacted files are reduced. Here are the commands to overwrite 1 B keys in rocksdb:

echo "Overwriting the 1B keys in database in random order...."
bpl=10485760;overlap=10;mcz=2;del=300000000;levels=6;ctrig=4; delay=8; stop=12; wbn=3; mbc=20; mb=67108864;wbs=134217728; dds=0; sync=0; r=1000000000; t=1; vs=800; bs=65536; cs=1048576; of=500000; si=1000000; ./db_bench --benchmarks=overwrite --disable_seek_compaction=1 --mmap_read=0 --statistics=1 --histogram=1 --num=$r --threads=$t --value_size=$vs --block_size=$bs --cache_size=$cs --bloom_bits=10 --cache_numshardbits=4 --open_files=$of --verify_checksum=1 --db=/data/mysql/leveldb/test --sync=$sync --disable_wal=1 --compression_type=zlib --stats_interval=$si --compression_ratio=50 --disable_data_sync=$dds --write_buffer_size=$wbs --target_file_size_base=$mb --max_write_buffer_number=$wbn --max_background_compactions=$mbc --level0_file_num_compaction_trigger=$ctrig --level0_slowdown_writes_trigger=$delay --level0_stop_writes_trigger=$stop --num_levels=$levels --delete_obsolete_files_period_micros=$del --min_level_to_compress=$mcz --max_grandparent_overlap_factor=$overlap --stats_per_interval=1 --max_bytes_for_level_base=$bpl --use_existing_db=1

Here are the commands to overwrite 1 B keys in leveldb:

echo "Overwriting the 1B keys in database in random order...."
wbs=268435456; r=1000000000; t=1; vs=800; cs=1048576; of=500000; ./db_bench --benchmarks=overwrite --num=$r --threads=$t --value_size=$vs --cache_size=$cs --bloom_bits=10 --open_files=$of --db=/data/mysql/leveldb/test --compression_ratio=50 --write_buffer_size=$wbs --use_existing_db=1

g. Read performance

Measure random read performance of a database with 1 Billion keys, each key is 10 bytes and value is 800 bytes. Rocksdb and leveldb were both configured with a block size of 4 KB. Data compression is not enabled. There were 32 threads in the benchmark application issuing random reads to the database. rocksdb is configured to verify checksums on every read while leveldb has checksum verification switched off.

rocksdb:  70 hours,  8 micros/op, 126K ops/sec (checksum verification)
leveldb: 102 hours, 12 micros/op,  83K ops/sec (no checksum verification)

Data was first loaded into the database by sequentially writing all the 1B keys to the database. Once the load is complete, the benchmark randomly picks a key and issues a read request. The above measure measurement does not include the data loading part, it measures only the part that issues the random reads to database. The reason rocksdb is faster is because it does not use mmaped IO because mmaped IOs on some linux platforms are known to be slow. Also, rocksdb shards the block cache into 64 parts to reduce lock contention. rocksdb is configured to avoid compactions triggered by seeks whereas leveldb does seek-compaction for this workload.

Here are the commands used to run the benchmark with rocksdb:

echo "Load 1B keys sequentially into database....."
bpl=10485760;overlap=10;mcz=2;del=300000000;levels=6;ctrig=4; delay=8; stop=12; wbn=3; mbc=20; mb=67108864;wbs=134217728; dds=1; sync=0; r=1000000000; t=1; vs=800; bs=4096; cs=1048576; of=500000; si=1000000; ./db_bench --benchmarks=fillseq --disable_seek_compaction=1 --mmap_read=0 --statistics=1 --histogram=1 --num=$r --threads=$t --value_size=$vs --block_size=$bs --cache_size=$cs --bloom_bits=10 --cache_numshardbits=6 --open_files=$of --verify_checksum=1 --db=/data/mysql/leveldb/test --sync=$sync --disable_wal=1 --compression_type=none --stats_interval=$si --compression_ratio=50 --disable_data_sync=$dds --write_buffer_size=$wbs --target_file_size_base=$mb --max_write_buffer_number=$wbn --max_background_compactions=$mbc --level0_file_num_compaction_trigger=$ctrig --level0_slowdown_writes_trigger=$delay --level0_stop_writes_trigger=$stop --num_levels=$levels --delete_obsolete_files_period_micros=$del --min_level_to_compress=$mcz --max_grandparent_overlap_factor=$overlap --stats_per_interval=1 --max_bytes_for_level_base=$bpl --use_existing_db=0
echo "Reading 1B keys in database in random order...."
bpl=10485760;overlap=10;mcz=2;del=300000000;levels=6;ctrig=4; delay=8; stop=12; wbn=3; mbc=20; mb=67108864;wbs=134217728; dds=0; sync=0; r=1000000000; t=32; vs=800; bs=4096; cs=1048576; of=500000; si=1000000; ./db_bench --benchmarks=readrandom --disable_seek_compaction=1 --mmap_read=0 --statistics=1 --histogram=1 --num=$r --threads=$t --value_size=$vs --block_size=$bs --cache_size=$cs --bloom_bits=10 --cache_numshardbits=6 --open_files=$of --verify_checksum=1 --db=/data/mysql/leveldb/test --sync=$sync --disable_wal=1 --compression_type=none --stats_interval=$si --compression_ratio=50 --disable_data_sync=$dds --write_buffer_size=$wbs --target_file_size_base=$mb --max_write_buffer_number=$wbn --max_background_compactions=$mbc --level0_file_num_compaction_trigger=$ctrig --level0_slowdown_writes_trigger=$delay --level0_stop_writes_trigger=$stop --num_levels=$levels --delete_obsolete_files_period_micros=$del --min_level_to_compress=$mcz --max_grandparent_overlap_factor=$overlap --stats_per_interval=1 --max_bytes_for_level_base=$bpl --use_existing_db=1

Here are the commands used to run the test on leveldb:

echo "Load 1B keys sequentially into database....."
wbs=134217728; r=1000000000; t=1; vs=800; cs=1048576; of=500000; ./db_bench --benchmarks=fillseq --num=$r --threads=$t --value_size=$vs --cache_size=$cs --bloom_bits=10 --open_files=$of --db=/data/mysql/leveldb/test --compression_ratio=50 --write_buffer_size=$wbs --use_existing_db=0
echo "Reading the 1B keys in database in random order...."
wbs=268435456; r=1000000000; t=32; vs=800; cs=1048576; of=500000; ./db_bench --benchmarks=readrandom --num=$r --threads=$t --value_size=$vs --cache_size=$cs --bloom_bits=10 --open_files=$of --db=/data/mysql/leveldb/test --compression_ratio=50 --write_buffer_size=$wbs --use_existing_db=1