MongoDB 3.0 with a new storage engine

A lot has happened in MongoDB technology over the past year. For starters:

  • The big news in MongoDB 3.0* is the WiredTiger storage engine. The top-level claims for that are that one should “typically” expect (individual cases can of course vary greatly):
    • 7-10X improvement in write performance.
    • No change in read performance (which however was boosted in MongoDB 2.6).
    • ~70% reduction in data size due to compression (disk only).
    • ~50% reduction in index size due to compression (disk and memory both).
  • MongoDB has been adding administration modules.
    • A remote/cloud version came out with, if I understand correctly, MongoDB 2.6.
    • An on-premise version came out with 3.0.
    • They have similar features, but are expected to grow apart from each other over time. They have different names.

*Newly-released MongoDB 3.0 is what was previously going to be MongoDB 2.8. My clients at MongoDB finally decided to give a “bigger” release a new first-digit version number.

To forestall confusion, let me quickly add:

  • MongoDB acquired the WiredTiger product and company, and continues to sell the product on a standalone basis, as well as bundling a version into MongoDB. This could cause confusion because …
  • … the standalone version of WiredTiger has numerous capabilities that are not in the bundled MongoDB storage engine.
  • There’s some ambiguity as to when MongoDB first “ships” a feature, in that …
  • … code goes to open source with an earlier version number than it goes into the packaged product.

I should also clarify that the addition of WiredTiger is really two different events:

  • MongoDB added the ability to have multiple plug-compatible storage engines. Depending on how one counts, MongoDB now ships two or three engines:
    • Its legacy engine, now called MMAP v1 (for “Memory Map”). MMAP continues to be enhanced.
    • The WiredTiger engine.
    • A “please don’t put this immature thing into production yet” memory-only engine.
  • WiredTiger is now the particular storage engine MongoDB recommends for most use cases.

I’m not aware of any other storage engines using this architecture at this time. In particular, last I heard TokuMX was not an example. (Edit: Actually, see Tim Callaghan’s comment below.)

Most of the issues in MongoDB write performance have revolved aroundlocking, the story on which is approximately:

  • Until MongoDB 2.2, locks were held at the process level. (One MongoDB process can control multiple databases.)
  • As of MongoDB 2.2, locks were held at the database level, and some sanity was added as to how long they would last.
  • As of MongoDB 3.0, MMAP locks are held at the collection level.
  • WiredTiger locks are held at the document level. Thus MongoDB 3.0 with WiredTiger breaks what was previously a huge write performance bottleneck.

In understanding that, I found it helpful to do a partial review of what “documents” and so on in MongoDB really are.

  • A MongoDB document is somewhat like a record, except that it can be more like what in a relational database would be all the records that define a business object, across dozens or hundreds of tables.*
  • A MongoDB collection is somewhat like a table, although the documents that comprise it do not need to each have the same structure.
  • MongoDB documents want to be capped at 16 MB in size. If you need one bigger, there’s a special capability called GridFS to break it into lots of little pieces (default = 1KB) while treating it as a single document logically.

*One consequence — MongoDB’s single-document ACID guarantees aren’t quite as lame as single-record ACID guarantees would be in an RDBMS.

By the way:

  • Row-level locking was a hugely important feature in RDBMS about 20 years ago. Sybase’s lack of it is a big part of what doomed them to second-tier status.
  • Going forward, MongoDB has made the unsurprising marketing decision to talk about “locks” as little as possible, relying instead on alternate terms such as “concurrency control”.

Since its replication mechanism is transparent to the storage engine, MongoDB allows one to use different storage engines for different replicas of data. Reasons one might want to do this include:

  • Fastest persistent writes (WiredTiger engine).
  • Fastest reads (wholly in-memory engine).
  • Migration from one engine to another.
  • Integration with some other data store. (Imagine, for example, a future storage engine that works over HDFS. It probably wouldn’t have top performance, but it might make Hadoop integration easier.)

In theory one can even do a bit of information lifecycle management (ILM), by using different storage engines for different subsets of database, by:

  • Pinning specific shards of data to specific servers.
  • Using different storage engines on those different servers.

That said, similar stories have long been told about MySQL, and I’m not aware of many users who run multiple storage engines side by side.

The MongoDB WiredTiger option is shipping with a couple of options for block-level compression (plus prefix compression that is being used for indexes only). The full WiredTiger product also has some forms of columnar compression for data.

One other feature in MongoDB 3.0 is the ability to have 50 replicas of data (the previous figure was 12). MongoDB can’t think of a great reason to have more than 3 replicas per data center or more than 2 replicas per metropolitan area, but some customers want to replicate data to numerous locations around the world.

ScaleDB: A NewSQL Alternative to Amazon RDS and Oracle RAC with 10x Performance Gain

In the world of relational databases, MySQL and Oracle RAC (Real Application Cluster) own a significant segment of the market. Oracle RAC owns the majority of the enterprise market, while MySQL gained significant popularity amongst many of the Web 2.0 sites.

Today, both of those databases are owned by Oracle (MySQL was acquired by Sun which was later acquired by Oracle).

The following diagrams show the enterprise database marketshare covered by Gartner and Cloud Database market share covered by Jalistic – a Java PaaS provider.

The acquisition of MySQL by Oracle raised concern over the future of the project due to the inherent conflict of interest between its two database products. Oracle RAC is the company’s main “cash cow” product, while MySQL competes for the same audience.

Shortly after Oracle’s acquisition of MySQL, the open source database was forked by one of its original founders into a new project named MariaDB. MariaDB was established to provide an alternative development and support option to MySQL and is now becoming the default database of choice of RedHat.

MySQL vs Oracle RAC Clustering Models

The two databases take a fairly different approach to scalability.

Oracle RAC is based on a shared storage model. With Oracle, the data is broken into strips that are spread across multiple devices, and multiple database servers operate concurrently and in sync over the (shared) data.

MySQL, on the other hand, does not use a shared data model. With MySQL, a given data set is managed by a single server (a model called shared nothing). With MySQL, scaling and High Availability (HA) is achieved by managing copies of the data. As only a single machine can update the data, this mode can only scale-up by adding more capacity to the machine that owns the database. As machines have limits to capacity yet must keep up with large amounts of data or many users, the database needs to be broken into several independent databases (a process called sharding). However, sharding is a complex process, is not dynamic and requires assumptions on data distribution, data growth and the queries. For these reasons, sharding is not possible with many applications. In particular, a cloud deployment is expected to be elastic to dynamically support changing needs and user behaviors.

For High Availability, the MySQL shared nothing approach uses Primary/Backup model with a single master and a backup node (called slave) that manage a copy of the data. Each update to the master requires that the same update will be executed on the slave. The primary and slave nodes require some handshake protocol to determine who is the master and sync the changes of the data. The master node performs updates/writes to the persistent file-system and the level of High Availability is set by the DBA, who decides if the slave can lag behind or needs to confirm the duplicate updates within each transaction.

For scaling, this model can use the slave as a read replica (or make additional copies of the data), a method called horizontal scaling in which read requests are spread across the different copies of the data. (However, all the writes need to go to the master and then be reflected on the replicas/slaves.)

Relational Databases on the Cloud

The high-end position of Oracle RAC, the low-cost and open source nature of MySQL, along with the adoption of the cloud as the infrastructure platform led to a vastly different method of deployment of databases in the cloud.

Oracle RAC took an approach similar to the old mainframe: to produce a pre-engineered appliance (called Exadata) that comes with the software and hardware integrated. That approach was specifically aimed at existing customers of Oracle RAC who needed a quick resolution to their scalability needs without redesigning their applications. Plugging a more optimized stack helped to push the scalability bar without changing the existing applications that rely on the database.

Amazon launched RDS, which is basically an on-demand version of the MySQL database. This approach fits nicely with the existing users of MySQL who are looking for a more affordable way to run their database in the cloud.

The main limitation of the Amazon RDS approach is that it inherits the limitations of MySQL and thus provides a limited and relatively complex read-scalability and doesn’t provide a good solution for write-scalability other than the scale-up approach.

ScaleDB – a NewSQL Alternative to Amazon RDS and Oracle RAC

ScaleDB is a NewSQL database that takes an approach similar to the read/write scalability model of Oracle RAC and implements it as a platform that transparently supports the MySQL (or MariaDB) database. As a result, existing MySQL/MariaDB applications can leverage the platform without any changes – they use MySQL or MariaDB as the front end engine which connects to the ScaleDB platform that provides a new and more concurrent database and storage services. This approach makes it possible to run multiple MySQL or MariaDB server instances against a distributed storage in a shared data model (similar to Oracle RAC).

The diagram below shows in high level how ScaleDB works.

Each Database Node runs a standard MariaDB database instance with ScaleDB plugged as the database engine and as an alternative storage and index device.

Scaling of the data and the index is done by processing requests concurrently from multiple database instances and leveraging the ScaleDB distributed storage tier (at the storage/file-system level) where data and index elements are spread evenly across multiple storage nodes of the cluster. Read more on how ScaleDB works.

ScaleDB vs Other NewSQL Databases

Most of the NewSQL databases are based on MySQL as the engine behind their implementation. The main difference between many NewSQLs and ScaleDB is that NewSQL databases brings the NoSQL approach into the SQL world, where ScaleDB takes the Oracle RAC shared storage approach to scale the database.

ScaleDB can deliver write/read scale while offering close to 100% compatibility, whereas in many of the alternative NewSQL approaches, scaling would often involve significant changes in the data model and queries semantics.

Benchmark – ScaleDB vs Amazon RDS

To demonstrate the difference between a standard cloud deployment of MySQL and a ScaleDB deployment and in order to find whether the ScaleDB approach can live up to its promise we conducted a benchmark comparing Amazon RDS scalability for write/read workloads with that of ScaleDB. We tested a dataset that does not fit to the memory (RAM) of a single machine and used  the largest machines offered by Amazon. We required that scaling would be dynamic and that all types of queries would be supported. These requirements made sharding a no-go option.

The benchmark is based on the popular YCSB – Yahoo Benchmark as the benchmarking framework.

The results of that benchmark are shown in the graphs below.

Both illustrate a relatively flat scalability with Amazon RDS and a close to linear scalability on the ScaleDB front.

Benchmark environment:

  • Benchmark Framework – YCSB – Yahoo Benchmark
  • Cloud environment: Amazon
  • Machine Type: Extra Large
  • Data Element Size (per row) – 1k
  • Data Capacity: 50GB
  • Zones – Single zone.
  • RDS was set with 1000 provisional ios
  • ScaleDB cluster setup – 2 database nodes, 2 data volumes (4 machines -data is striped over 2 machines and each machine had a mirror).
  • ScaleDB MySQL engine – MariaDB

Running ScaleDB on OpenStack and Other Clouds with Cloudify

The thing that got me excited about the project is that it serves as a perfect fit for many of our enterprise customers who are using Cloudify for moving their existing applications to their private cloud without code change. Those customers are looking for a simple way to scale their applications and many of them run today on Oracle RAC for that purpose.

The move of enterprises to a cloud-based environment also drives the demand for a more affordable approach to handle their database scaling, which is provided by ScaleDB.

On the other hand, setting up a database cluster can be a fairly complex task.

By creating a Cloudify recipe for ScaleDB, we could remove a large part of that complexity and set up an entire ScaleDB database and storage environment through a single click.

In this way we can run ScaleDB on demand as we would with RDS and scale on both read and write as with Oracle Exadata, only in a more affordable fashion.


1. Cloudify Recipe for running Scale DB

2. Yahoo Benchmark

3. ScaleDB Architecture white paper

4. ScaleDB user manual

(Via Nati Shalom’s Blog)


Under the hood: MySQL Pool Scanner (MPS)

Facebook has one of the largest MySQL database clusters in the world. This cluster comprises many thousands of servers across multiple data centers on two continents.

Operating a cluster of this size with a small team is achieved by automating nearly everything a conventional MySQL Database Administrator (DBA) might do so that the cluster can almost run itself. One of the core components of this automation is a system we call MPS, short for “MySQL Pool Scanner.”

MPS is a sophisticated state machine written mostly in Python. It replaces a DBA for many routine tasks and enables us to perform maintenance operations in bulk with little or no human intervention.

A closer look at a single database node
Every one of the thousands of database servers we have can house a certain number of MySQL instances. An instance is a separate MySQL process, listening on a separate port with its own data set. For simplicity, we’ll assume exactly two instances per server in our diagrams and examples.

The entire data set is split into thousands of shards, and every instance holds a group of such shards, each in its own database schema. A Facebook user’s profile is assigned to a shard when the profile is created and every shard holds data relating to thousands of users.

It’s more easily explained by a diagram of a single database server:


Every instance has a few copies of its data on other instances that are hosted on different servers, and usually in a different data center. This is done to achieve two main goals:

1.    High Availability – If a server goes down, we have the data available elsewhere, ready to be served.
2.    Performance – Different geographical regions have their own replicas so that reads are served                  locally.
The way we achieve this is through simple MySQL master/slave replication. Every instance is part of a replica set. A replica set has a master and multiple slaves. All writes to a replica set must occur on the master. The slaves subscribe to a stream of these write events, and the events are replayed on them as soon as they arrive. Since the master and the slaves have nearly identical data, a read can occur on any one of the instances in the replica set.

Here is a diagram of a simple replica set, where each server hosts only one instance, and the other instance is empty (we call these spares):


A server is essentially a container for instances, so in reality things can get much more complicated.
For example, a single server hosting a master instance may also be hosting a slave instance for a different master, like so:


There are two important “building block” operations MPS relies on:

1.    Creating a copy/placing a server

The first building block is an operation that creates a copy of an instance on a different host. We use a modified version of Xtrabackup to perform most copy operations. A replacement is the same operation if we remove an instance after a copy successfully completes.

First, the system allocates a new spare instance for the operation. We choose one of the slaves or the master and copy its data to the freshly allocated spare instance. This diagram shows a replacement operation, where an instance is removed when the copy is complete:


2.    Promoting a master instance

The second building block is the action of promoting a different instance to be the master in a replica set.
During a promotion, we select a target for the promotion, stop writes to the replica set, change the slaves to replicate from the new master, and resume writes. In the diagram, we show a deletion operation in which the old master is discarded after the promotion is completed successfully. For simplicity, the replica set below consists of only three instances:


These two operations (which are usually complex procedures for most companies running MySQL) have been completely automated to a point where MPS can run them hundreds or thousands of times a day in a fast and safe manner, without any human intervention.

Host management and states
Now that we’ve got the basics out of the way, we can dive into more abstract concepts that utilize these building blocks.

MPS works with a repository that holds the current state and metadata for all our database hosts, and current and past MPS copy operations. This registry is managed by the database servers themselves so that it scales with the database cluster and MPS doesn’t need a complex application server installation. MPS itself is in fact stateless, running on its own pool of hosts and relying on the repository for state management. States are processed separately and in parallel.

When a server “wakes up” in a datacenter (for example, a fresh rack has been connected and provisioned), it will start running a local agent every few minutes. This agent performs the following steps:

  1. Collect data about itself. (Where am I? What hardware do I have? What software versions am I running?)
  2. Triage the host for problems. (Did I wake up in an active cluster? Are my disks OK? Are my flash cards healthy?)
  3. Make sure the server is registered and contains up-to-date metadata in the central repository.
  4. On the first run, place instances on the server in an initial “reimage” state if there is no current record of this server. This is where new servers start their lives in MPS.
  5. So every few minutes, every healthy server “checks in” to this central repository and updates how it’s doing, allowing things like data use and system health to be kept in sync.

The smallest unit MPS manages at the moment is an instance. Each instance can be in various states. The important states are as follows:

  • Production: Instance is serving production traffic.
  • Spare: Instance is ready to be copied to or allocated to some other task.
  • Spare allocated: Instance has been chosen as the target for a copy, and a copy is in progress.
  • Spare deallocated: Temporary triaging state. Instance has been removed from production and is pending triaging and cleanup. No instances stay here for more than a few minutes.
  • Drained: The instance is not being used, and is being reserved for testing, data center maintenance, etc. An operator intervention is required to take a host out of this state.
  • Reimage: Servers with all instances in this state are being reimaged or are in the process of being repaired. Servers in this state are handed off and managed by a co-system called Windex, which was discussed in a previous post.

An instance may move between states due to MPS executing an action or an operator intervention. This state diagram shows the main states and the actions that may cause an instance to move between those states.


The diagram above describes only a small subset of possible paths an instance may take in MPS. The state changes described here are the ones that result from simple copy and maintenance operations. There are many other reasons for instances to change states, and hardcoding all the options and checks would create software that is difficult and complex to maintain. Meet “problems,” another fundamental concept in MPS.

A “problem” is a property that is attached to an instance. If all instances on a host have this problem, we consider it to be attached to the server itself. Another way to think of problems is like tags. MPS consults a decision matrix that helps it make decisions about instances with a specific problem. It is basically a map between tuples: (state, problem) – (action, state).

It is easier to understand with some examples:

  • (production, low-space) – (replace, spare deallocated): Replace an instance in production with limited space, moving it to a different server.
  • (spare de-allocated, old-kernel) – (move, reimage): If an instance happened to move through this state, it has no production data on it, so why not reimage it?
  • (production, master-in-fallback-location) – (promote, production): We should promote this master instance to the correct location, and leave the instance in the production state.

The various states and “problems” in MPS allow us to create a flexible and maintainable infrastructure to manage a server’s life cycle.

Examples of common failure resolution and maintenance operations
In a large data center, there are tens or hundreds of server failures a day. Here are a few examples of common day-to-day failures that MPS takes care of without human intervention:

  • Broken slave instances are detected and disabled until they are replaced in the background.
  • Broken master instances are demoted so that healthy replicas take the place of their fallen brethren and get replaced in the background.
  • Instances on servers that might run out of space due to growth are moved to underutilized servers.

With thousands of servers, site-wide maintenance tasks like upgrading to a new kernel, changing partition sizes, or upgrading firmware on controllers become very complex. The same goes for localized operations such as moving some racks or allocating test servers for our engineering teams. Here are some common maintenance operations an operator can ask MPS to perform with a single command:

  • Drain any number of database racks for maintenance and take them out of production. Most such operations complete in less than 24 hours.
  • Re-image thousands of machines (to perform kernel upgrades, for example) at a specified concurrency. MPS will replace each machine and then send it to Windex.
  • Allocate any number of spares to be used for a new project or testing. Want 200 servers to run tests? No problem.
  • Create a copy of the entire Facebook data set at a new data center at a specified concurrency–building out our new Lulea data center, for example!

Automating away the mundane tasks with MPS allow us to better scale the number of servers we manage, and frees up the MySQL Operations team to work on more exciting challenges.

(source: Facebook Engineering)

Need Help With Database Scalability? Understand I/O

This is a guest post by Zardosht Kasheff, Software Developer at Tokutek, a storage engine company that delivers 21st-Century capabilities to the leading open source data management platforms.

As software developers, we value abstraction. The simpler the API, the more attractive it becomes. Arguably, MongoDB’s greatest strengths are its elegant API and itsagility, which let developers simply code.

But when MongoDB runs into scalability problems on big data, developers need to peek underneath the covers to understand the underlying issues and how to fix them. Without understanding, one may end up with an inefficient solution that costs time and money. For example, one may shard prematurely, increasing hardware and management costs, when a simpler replication setup would do. Or, one may increase the size of a replica set when upgrading to SSDs would suffice.

This article shows how to reason about some big data scalability problems in an effort to find efficient solutions.

Defining the Issues

First, let’s define the application. We are discussing MongoDB applications. That means we are addressing a document-store database that supports secondary indexes and shardedclusters. In the context of other NoSQL technologies, such as Riak or Cassandra, we may discuss these I/O bottlenecks differently, but this article just focuses on the properties ofMongoDB.

Second, what does the application do? Are we processing transactions on-line (OLTP) or are we doing analytical processing (OLAP)? For this article, we are discussing OLTP applications. OLAP applications have big data challenges that MongoDB may or may not be able to address, but this article focuses on OLTP applications.

Third, what’s big data? By big data, we mean that we are accessing and using more data than we can fit in RAM on a single machine. As a result, if the data resides on one server, then most of it must reside on disk, and require I/O to access. Note that we are not discussing scenarios where the database is large, but the data accessed or used (sometimes called the “working set”) is small. An example would be storing years of data, but the application only frequently accesses the last day’s worth.

Fourth, what are the limiting factors in OLTP applications with big data? In short: I/O. Hard disk drives do at most hundreds of I/O’s per second. RAM, on the other hand, accesses data millions of times per second. The disparity in these limits causes I/O a bottleneck for big data applications.

Lastly, how do we solve our I/O bottlenecks? With analytical thinking. Formulas and direct instructions can get us most of the way there, but a lasting solution requires understanding. Users must look at the I/O characteristics of their application and make design decisions to best fit those characteristics.

Cost Model

To solve I/O bottlenecks, we first need to understand what database operations induce I/O.

One can argue that MongoDB, and many other databases, underneath all of the bells and whistles, perform three basic operations:

● Point Query: Find a single document. Given the location of a document somewhere either on disk or in memory, retrieve the document. On big data, where the document is likely not in memory, this operation probably causes an I/O.

● Range Query: Find some number of documents in succession in an index. This is generally a MUCH more effective operation than a point query, because the data we are looking for is packed together on disk and brought into memory with very few I/Os. A range query used to retrieve 100 documents may induce 1 I/O, whereas 100 point queries to retrieve 100 documents may induce 100 I/O’s.

● Write: Write a document to the database. For traditional databases such asMongoDB, this may cause I/O. For databases with write-optimized data structures, such as TokuMX, this induces very little I/O. Unlike traditional databases, write-optimized data structures are able to amortize the I/O performed against many inserts.

Understanding the I/O implications of these three basic operations leads to understanding the I/O used by MongoDB statements made against a database. MongoDB takes these three basic operations and builds four basic user level operations:

● Inserts. This writes a new document to the database.

● Queries. Using an index on a collection, this does a combination of range queries and point queries. If the index is a covering index or a clustering index (conceptually the same as TokuDB for MySQL’s clustering index), then the query is likely doing just range queries. If not, then a combination of range queries and point queries are used. I explain these concepts in an indexing talk. The talk uses MySQL, but all of the concepts apply to MongoDB and TokuMX as well.

● Updates and Deletes. These are a combination of queries and writes. A query is used to find the documents to be updated or deleted, and then writes are used update or remove the found documents.

Now that we understand the cost model, to resolve I/O bottlenecks, the user needs tounderstand where the application induces I/O. This is where we need to break some abstraction and peek at how the database behaves. Does the I/O come from queries? If so, how are the queries behaving that is causing the I/O? Does it come from updates? If it comes from updates, is it coming from the query used in the update or the insert used in the update? Once the user understands what is inducing the I/O, steps can be taken to resolve the bottleneck.

Assuming we understand the I/O characteristics of the application, we can talk about several approaches to addressing them. The approach I like to take is this: first attack the problem with software, and when that is not enough, then attack the problem with hardware. Software is cheaper and easier to maintain.

Possible Software Solutions

A possible software solution is one where we change the software or the application to reduce I/O usage. Here are possible solutions for different bottlenecks

Problem: Inserts Causing Too Much I/O.

Possible Solution: Use a write optimized database, such as TokuMX. One of TokuMX’sbenefits is drastically reducing the I/O requirements of writes in databases by using Fractal Trees indexes.

Problem: Queries Causing Too Much I/O.

Possible Solutions: Use better indexing. Reduce point queries by using range queries instead.

In my talk, “Understanding Indexing”, I explain how to reason about indexes to reduce I/O for queries. It’s difficult to summarize the talk in one paragraph, but the gist is as follows. One can reduce the I/O of the application by avoiding doing individual point queries to retrieve each document. To do this, we use covering or clustering indexes that smartly filter the documents analyzed by the query, and can report results using range queries.

Better indexing may not be sufficient. If you have an OLTP application and your queries are essentially point queries (because they retrieve very few documents), then even with proper indexes, you may still have an I/O bottleneck. In such cases, a hardware solution is probably necessary.

Also, additional indexes increase the cost of insertions, as each insertion must keep the indexes up to date as well, but write-optimized databases mitigate that cost. This is where we need to approach the problem with an understanding of our application. For some applications, this is practical, and for others it is not.

Problem: Updates/Deletes Cause Too Much I/O

Solution: Combine the solutions above.

Updates and deletes are tricky in that they are a combination of queries and inserts. Improving their performance requires a good understanding of the cost of the operation. Which part of the operation induces I/O? Is it the query? If so, one can try to improve the indexing. Is it the write? Is it both? Based on what part is inducing the I/O, we apply the solutions above.

One mistake many make is taking a write-optimized database such as TokuMX and expect it to eliminate I/O bottlenecks of updates and deletes without changing any of the indexing. A write-optimized database is not enough. The implicit query within an update/delete must be handled as well.

Possible Hardware Solutions

As mentioned above, when software solutions are not enough we look to hardware. Let’s look at these possibilities, and analyze their benefits and drawbacks:

● Buy more memory to hopefully get more, if not all, of your working set into memory.

● Increase your IOPS by moving to an SSD.

● Buy more machines and move to a scaled out solution. This can be:

○ Read scaling via replication

○ Sharding

Buying more memory

RAM is expensive, and there is only so much RAM one can put on a single machine. Simply put, if data is large enough, keeping it in RAM is not a feasible option. This solution may work for a wide range of applications, but our focus here is tackling applications that cannot do this.

Moving to SSDs

Making the storage device an SSD is a practical solution for improving throughput. If I/O is the limiting factor of your application, an increase in IOPS (I/Os per second) naturally leads to an increase in throughput. This may be simple and practical.

The cost model of SSDs is different than the cost model of hard disks. SSDs dramatically improve your I/O throughput, but are not cheap. They are smaller, cost more, and wear out faster. Therefore, the cost per GB of SSDs is quite higher than the cost per GB of hard disks. To keep costs down, data compression becomes a key.

So, the cost of the hardware increases, but the cost of managing the application does not.

Read scaling via replication

Read scaling with replication is effective for applications where queries are the bottleneck. The idea is as follows:

● Use replication to store multiple copies of your data on separate machines

● Distribute reads across the machines, thereby improving read throughput

By taking the reads that used to bottleneck on one machine and spreading them out, more resources are available for the application to either handle more concurrent queries with the same write workload or to increase the write workload on the application.

If inserts, updates, or deletes are your bottleneck, then replication may not be very effective, because the write work is duplicated on all servers that are added to the replica set. The machine that takes the data input (called the master in MySQL or primary in MongoDB) will still have the same bottleneck.


Sharding partitions your data across different replica sets based on a shard key. Different replica sets in the cluster are responsible for ranges of values in the shard key space. So, an application’s write throughput is increased by spreading the write workload across separate replica sets in a cluster. For high write workloads, sharding can be very effective.

By partitioning the data by ranges in the shard key space, queries that use the shard key can effectively do range queries on a few shards, making such queries very efficient. If one makes the shard key a hash, then all range queries must run on all shards in the cluster, but point queries on the shard key run on single shards.

Because MongoDB is schema-less and does not support joins, sharding is elegant and relatively easy to use. If the solutions above are not enough to handle your application and more resources are required, then one can argue that sharding is inevitable.

Nevertheless, sharding is arguably the most heavyweight solution and has a high cost. For starters, your entire hardware budget is multiplied. You do not just add machines to a shardedsetup, you add entire replica sets. You need to add and manage config servers. Due to these costs, one should really consider if sharding is truly necessary. Usually, any one of the solutions presented above are cheaper.

Another big challenge is selecting a shard key. A good shard key has the following properties:

● Many (if not all) of your queries use the shard key. Any query that does not use it must run on all shards. This can be a pain point.

● The shard key should do a good job of distributing writes to different replica sets in the cluster. If all writes are directed to the same replica set in the cluster, then that replica set becomes a bottleneck for writes, just as it was in a non-sharded setup. This makes something like a timestamp a very bad shard key.

These requirements are not always easy to fill. Sometimes, a good shard key does not exist, making sharding ineffective.

In conclusion, many solutions may work, but none is always guaranteed, not even sharding. This is why understanding the characteristics of the application is crucial. These solutions are tools. How best to use the tools, is up to the user.


More Database Power – 20,000 IOPS for MySQL With the CR1 Instance

If you are a regular reader of this blog, you know that I am a huge fan of the Amazon Relational Database Service (RDS). Over the course of the last couple of years, I have seen that my audiences really appreciate the way that RDS obviates a number of tedious yet mission-critical tasks that are an inherent responsibility when running a relational database. There’s no joy to be found in keeping operating systems and database engines current, creating and restoring backups, scaling hardware up and down, or creating an architecture that provides high availability.

Today we are making RDS even more powerful by adding a new high-end database instance class. The new db.cr1.8xlarge instance type gives you plenty of memory, CPU power, and network throughput to allow your MySQL 5.6 applications to perform at up to 20,000 IOPS. This is a 60% improvement over the previous high-water mark of 12,500 IOPS and opens the door to database-driven applications that are even more demanding than before. Here are the specs:

  • 64-bit platform
  • 244 GB of RAM
  • 88 ECU (16 hyperthreaded virtual cores each delivering 2.75 ECU)
  • High-performance networking

This new instance type is available in the US East (Northern Virginia), US West (Oregon), EU (Ireland), and Asia Pacific (Tokyo) Regions and you can start using it today!

(source: Amazon Web Services blog)

Jepsen: Testing the Partition Tolerance of PostgreSQL, Redis, MongoDB and Riak

Distributed systems are characterized by exchanging state over high-latency or unreliable links. The system must be robust to both node and network failure if it is to operate reliably–however, not all systems satisfy the safety invariants we’d like. In this article, we’ll explore some of the design considerations of distributed databases, and how they respond to network partitions.

IP networks may arbitrarily drop, delay, reorder, or duplicate messages send between nodes–so many distributed systems use TCP to prevent reordered and duplicate messages. However, TCP/IP is still fundamentally asynchronous: the network may arbitrarily delay messages, and connections may drop at any time. Moreover, failure detection is unreliable: it may be impossible to determine whether a node has died, the network connection has dropped, or things are just slower than expected.

This type of failure – where messages are arbitrarily delayed or dropped–is called a network partition. Partitions can occur in production networks for a variety of reasons: GC pressure, NIC failure, switch firmware bugs, misconfiguration, congestion, or backhoes, to name a few. Given that partitions occur, the CAP theorem restricts the maximally achievable guarantees of distributed systems. When messages are dropped, “consistent” (CP) systems preserve linearizability by rejecting some requests on some nodes. “Available” (AP) systems can handle requests on all nodes, but must sacrifice linearizability: different nodes can disagree about the order in which operations took place. Systems can be both consistent and available when the network is healthy, but since real networks partition, there are no fully CA systems.

It’s also worth noting that CAP doesn’t just apply to the database as a whole–but also to subsystems like tables, keys, columns, and even distinct operations. For example, a database can offer linearizability for each key independently, but not between keys. Making that tradeoff allows the system to handle a larger space of requests during a partition. Many databases offer tunable consistency levels for individual reads and writes, with commensurate performance and correctness tradeoffs.

Testing partitions

Theory bounds a design space, but real software may not achieve those bounds. We need to test a system’s behavior to really understand how it behaves.

First, you’ll need a collection of nodes to test. I’m using five LXC nodes on a Linux computer, but you could use Solaris zones, VMs, EC2 nodes, physical hardware, etc. You’ll want the nodes to share a network of some kind–in my case, a single virtual bridge interface. I’ve named my nodes n1, n2, … n5, and set up DNS between them and the host OS.

To cause a partition, you’ll need a way to drop or delay messages: for instance, with firewall rules. On Linux you can use iptables -A INPUT -s some-peer -j DROP to cause a unidirectional partition, dropping messages from some-peer to the local node. By applying these rules on several hosts, you can build up arbitrary patterns of network loss.

Running these commands repeatably on several hosts takes a little bit of work. I’m using a tool I wrote called Salticid, but you could use CSSH or any other cluster automation system. The key factor is latency–you want to be able to initiate and end the partition quickly, so Chef or other slow-converging systems are probably less useful.

Then, you’ll need to set up the distributed system on those nodes, and design an application to test it. I’ve written a simple test: a Clojure program, running outside the cluster, with threads simulating five isolated clients. The clients concurrently add N integers to a set in the distributed system: one writes 0, 5, 10, …; another writes 1, 6, 11, …; and so on. Each client records a log of its writes, and whether they succeeded or failed. When all writes are complete, it waits for the cluster to converge, and check whether the client logs agree with the actual state of the database. This is a simple kind of consistency check, but can be adapted to test a variety of data models.

The client and config automation, including scripts for simulating partitions and setting up databases, is freely available. For instructions and code, click here.


A single-node PostgreSQL instance is a CP system; it can provide serializable consistency for transactions, at the cost of becoming unavailable when the node fails. However, the distributed system comprised of the server and a client may not be consistent.

Postgres’ commit protocol is a special case of two-phase commit. In the first phase, the client votes to commit (or abort) the current transaction, and sends that message to the server. The server checks to see whether its consistency constraints allow the transaction to proceed, and if so, it votes to commit. It writes the transaction to storage and informs the client that the commit has taken place (or failed, as the case may be.) Now both the client and server agree on the outcome of the transaction.

What happens if the message acknowledging the commit is dropped before the client receives it? Then the client does’t know whether the commit succeeded or not! The 2PC protocol says nodes must wait for the acknowledgement message to arrive in order to decide the outcome. If it doesn’t arrive, 2PC fails to terminate. It’s not a partition-tolerant protocol. Real systems can’t wait forever, so at some point the client times out, leaving the commit protocol in an indeterminate state.

If I cause this type of partition, the JDBC Postgres client throws exceptions like

217 An I/O error occurred while sending to the backend.   
Failure to execute query with SQL:   
INSERT INTO "set_app" ("element") VALUES (?)  ::  [219]  
Message: An I/O error occured while sending to the backend.  
SQLState: 08006   
Error Code: 0  
218 An I/O error occured while sending to the backend.

… which we might interpret as “the transactions for writes 217 and 218 failed”.  However, when the test app queries the database to find which write transactions were successful, it finds that two “failed” writes were actually present:

1000 total
950 acknowledged 
952 survivors 
2 unacknowledged writes found! ヽ(´ー`)ノ 
(215 218) 
0.95 ack rate 
0.0 loss rate 
0.002105263 unacknowledged but successful rate

Out of 1000 attempted writes, 950 were successfully acknowledged, and all 950 of those writes were present in the result set. However, two writes (215 and 218) succeeded, even though they threw an exception! Note that this exception doesn’t guarantee that the write succeeded or failed: 217 also threw an I/O error while sending, but because the connection dropped before the client’s commit message arrived at the server, the transaction never took place.

There is no way to reliably distinguish these cases from the client. A network partition–and indeed, most network errors–doesn’t mean a failure. It means the *absence* of information. Without a partition-tolerant commit protocol, like extended three-phase commit, we cannot assert the state of these writes.

You can handle this indeterminacy by making your operations idempotent and retrying them blindly, or by writing the transaction ID as a part of the transaction itself, and querying for it after the partition heals.


Redis is a data structure server, typically deployed as a shared heap. Since it runs on one single-threaded server, it offers linearizable consistency by default: all operations happen in a single, well-defined order.

Redis also offers asynchronous primary->secondary replication. A single server is chosen as the primary, which can accept writes. It relays its state changes to secondary servers, which follow along. Asynchronous, in this context, means that clients do not block while the primary replicates a given operation – the write will “eventually” arrive on the secondaries.

To handle discovery, leader election, and failover, Redis includes a companion system: Redis Sentinel. Sentinel nodes gossip the state of the Redis servers they can see, and attempt to promote and demote nodes to maintain a single authoritative primary. In this test, I’ve installed Redis and Redis Sentinel on all five nodes. Initially all five clients read from the primary on n1, and n2–n5 are secondaries. Then we partition n1 and n2 away from n3, n4, and n5.

If Redis were a CP system, n1 and n2 would become unavailable during the partition, and a new primary in the majority component (n3, n4, n5) would be elected. This is not the case. Instead, writes continue to successfully complete against n1. After a few seconds, the Sentinel nodes begin to detect the partition, and elect, say, n5 as a new primary.

For the duration of the partition, there are two primary nodes–one in each component of the system–and both accept writes independently. This is a classic split-brain scenario–and it violates the C in CP. Writes (and reads) in this state are not linearizable, because clients will see a different state of the database depending on which node they’re talking to.

What happens when the partition resolves? Redis used to leave both primaries running indefinitely. Any partition resulting in a promotion would cause permanent split-brain. That changed in Redis 2.6.13, which was released April 30th, 2013.  Now, the sentinels will resolve the conflict by demoting the original primary, destroying a potentially unbounded set of writes in the process. For instance:

2000 total 
1998 acknowledged 
872 survivors 1126 acknowledged writes lost! (╯°□°)╯︵ ┻━┻ 
50 51 52 53 54 55 ... 1671 1675 1676 1680 1681 1685 
0.999 ack rate 
0.5635636 loss rate 
0.0 unacknowledged but successful rate

Out of 2000 writes, Redis claimed that 1998 of them completed successfully. However, only 872 of those integers were present in the final set. Redis threw away 56% of the writes it claimed were successful.

There are two problems here. First, notice that all the clients lost writes at the beginning of the partition: (50, 51, 52, 53, …). That’s because they were all writing to n1 when the network dropped–and since n1 was demoted later, any writes made during that window were destroyed.

The second problem was caused by split-brain: both n1 and n5 were primaries up until the partition healed. Depending on which node they were talking to, some clients might have their writes survive, and others have their writes lost. The last few numbers in the set (mod 5) are all 0 and 1–corresponding to clients which kept using n1 as a primary in the minority component.

In any replication scheme with failover, Redis provides neither high availability nor consistency. Only use Redis as a “best-effort” cache and shared heap where arbitrary data loss or corruption is acceptable.


MongoDB is a document-oriented database with a similar distribution design to Redis. In a replica set, there exists a single primary node which accepts writes and asynchronously replicates a log of its operations (“oplog”) to N secondaries. However, there are a few key differences from Redis.

First, Mongo builds in its leader election and replicated state machine. There’s no separate system which tries to observe a replica set in order to make decisions about what it should do. The replica set decides among itself which node should be primary, when to step down, how to replicate, etc. This is operationally simpler and eliminates whole classes of topology problems.

Second, Mongo allows you to ask that the primary confirm successful replication of a write by its disk log, or by secondary nodes. At the cost of latency, we can get stronger guarantees about whether or not a write was successful.

To test this, I’ve set up a five-node replica set, with the primary on n1. Clients make their writes to a single document (the unit of MongoDB consistency) via compare-and-set atomic updates. I then partition n1 and n2 away from the rest of the cluster, forcing Mongo to elect a new primary on the majority component, and to demote n1. I let the system run in a partitioned state for a short time, then reconnect the nodes, allowing Mongo to reconverge before the end of the test.

There are several consistency levels for operations against MongoDB, termed “write concerns”. The defaults, up until recently, were to avoid checking for any type of failure at all. The Java driver calls this WriteConcern.UNACKNOWLEDGED. Naturally, this approach can lose any number of “successful” writes during a partition:

6000 total 
5700 acknowledged 
3319 survivors 
2381 acknowledged writes lost! (╯°□°)╯︵ ┻━┻ 
469 474 479 484 489 494 ... 3166 3168 3171 3173 3178 3183 
0.95 ack rate 
0.4177193 loss rate 
0.0 unacknowledged but successful rate

In this trial, 42% of writes, from 469 through 3183, were thrown away.

However, WriteConcern.SAFE, which confirms that data is successfully committed to the primary, also loses a large number of writes:

6000 total 
5900 acknowledged 
3692 survivors 
2208 acknowledged writes lost! (╯°□°)╯︵ ┻━┻ 
458 463 468 473 478 483 ... 3075 3080 3085 3090 3095 3100 0.98333335 ack rate 
0.3742373 loss rate 
0.0 unacknowledged but successful rate

Because the replication protocol is asynchronous, writes continued to succeed against n1, even though n1 couldn’t replicate those writes to the rest of the cluster. When n3 was later elected primary in the majority component, it came to power with an old version of history–causally disconnected from those writes on n1. The two evolved inconsistently for a time, before n1 realized it had to step down.

When the partition healed, Mongo tried to determine which node was authoritative. Of course, there is no authoritative node, because both accepted writes during the partition. Instead, MongoDB tries to find the node with the highest optime (a monotonic timestamp for each node’s oplog). Then it forces the older primary (n1) to roll back to the last common point between the two, and re-applies n3’s operations.

In a rollback, MongoDB dumps a snapshot of the current state of conflicting objects to disk, in a BSON file. An operator could later attempt to reconstruct the proper state of the document.

This system has several problems. First, there’s a bug in the leader election code: MongoDB may promote a node which does not have the highest optime in the reachable set. Second, there’s a bug in the rollback code. In my tests, rollback only worked roughly 10% of the time. In almost every case, MongoDB threw away the conflicting data altogether. Moreover, not all types of objects will be fully logged during a rollback: capped collections, for instance, throw away all conflicts by design. Third, even if these systems do work correctly, the rollback log is not sufficient to recover linearizability. Because the rollback version and the oplog do not share a well-defined causal order, only order-free merge functions (e.g. CRDTs) can reconstruct the correct state of the document in generality.

This lack of linearizability also applies to FSYNC_SAFE, JOURNAL_SAFE, and even REPLICAS_SAFE, which ensures that writes are acknowledged by two replicas before the request succeeds:

6000 total 
5695 acknowledged 
3768 survivors 
1927 acknowledged writes lost! (╯°□°)╯︵ ┻━┻ 
712 717 722 727 732 737 ... 2794 2799 2804 2809 2814 2819 
0.94916666 ack rate 
0.338367 loss rate 
0.0 unacknowledged but successful rate

The only way to recover linearizability in MongoDB’s model is by waiting for a quorum of nodes to respond. However, WriteConcern.MAJORITY is still inconsistent, dropping acknowledged writes and recovering failed writes.

6000 total 
5700 acknowledged 
5701 survivors 
2 acknowledged writes lost! (╯°□°)╯︵ ┻━┻ 
(596 598)
3 unacknowledged writes found! ヽ(´ー`)ノ
(562 653 3818)
0.95 ack rate 
1.754386E-4 loss rate 
5.2631577E-4 unacknowledged but successful rate

Where UNSAFE, SAFE, and REPLICAS_SAFE can lose any or all writes made during a partition, MAJORITY can only only lose writes which were in flight when the partition started. When the primary steps down it signs off on all WriteConcern requests, setting OK to TRUE on each reply regardless of whether the WriteConcern was satisfied.

Moreover, MongoDB can emit any number of false negatives. In this trial, 3 unacknowledged writes were actually recovered in the final dataset. At least in version 2.4.1 and earlier, there is no way to prevent data loss during partition, at any consistency level.

If you need linearizability in MongoDB, use WriteConcern.MAJORITY. It won’t actually be consistent, but it dramatically reduces the window of write loss.


As a Dynamo clone, Riak takes an AP approach to partition tolerance. Riak will detect causally divergent histories–whether due to a partition or normal concurrent writes–and present alldiverging copies of an object to the client, who must then choose how to merge them together.

The default merge function in Riak is last-write-wins. Each write includes a timestamp, and merging values together is done by preserving only the version with the highest timestamp. If clocks are perfectly synchronized, this ensures Riak picks the most recent value.

Even in the absence of partitions and clock skew, causally concurrent writes means that last-write-wins can cause successful writes to be silently dropped:

2000 total 
2000 acknowledged 
566 survivors 
1434 acknowledged writes lost! (╯°□°)╯︵ ┻━┻ 
1 2 3 4 6 8 ... 1990 1991 1992 1995 1996 1997 
1.0 ack rate 
0.717 loss rate

In this case, a healthy cluster lost 71% of operations–because when two clients wrote to the value at roughly the same time, Riak simply picked the write with the higher timestamp and ignored the others–which might have added new numbers.

Often, people try to resolve this problem by adding a lock service to prevent concurrent. Since locks must be linearizable, the CAP theorem tells us that distributed lock systems cannot be fully available during a partition–but even if they were, it wouldn’t prevent write loss. Here’s a Riak cluster with R=W=QUORUM, where all clients perform their reads+writes atomically using a mutex. When the partition occurs, Riak loses 91% of successful writes:

2000 total 
1985 acknowledged 
176 survivors 
1815 acknowledged writes lost! (╯°□°)╯︵ ┻━┻ 
85 90 95 100 105 106 ... 1994 1995 1996 1997 1998 1999 
6 unacknowledged writes found! ヽ(´ー`)ノ 
(203 204 218 234 262 277) 
0.9925 ack rate 
0.91435766 loss rate 
0.00302267 unacknowledged but successful rate

In fact, LWW can cause unbounded data loss, including the loss of information written before the partition occurred. This is possible because Dynamo (by design) allows for sloppy quorums, where fallback vnodes on both sides of the partition can satisfy R and W.

We can tell Riak to use a strict quorum using PR and PW–which succeeds only if a quorum of theoriginal vnodes acknowledges the operation. This can still cause unbounded data loss if a partition occurs:

2000 total 
1971 acknowledged 
170 survivors 
1807 acknowledged writes lost! (╯°□°)╯︵ ┻━┻ 
86 91 95 96 100 101 ... 1994 1995 1996 1997 1998 1999 
6 unacknowledged writes found! ヽ(´ー`)ノ 
(193 208 219 237 249 252) 
0.9855 ack rate 0.9167935 loss rate 
0.00304414 unacknowledged but successful rate

Dynamo is designed to preserve writes as much as possible. Even though a node might return “PW val unsatisfied” when it can’t replicate to the primary vnodes for a key, it may have been able to write to one primary vnode–or any number of fallback vnodes. Those values will still be exchanged during read-repair, considered as conflicts, and the timestamp used to discard the older value–meaning all writes from one side of the cluster.

This means the minority component’s “failed” writes can destroy all of the majority component’s successful writes.

It is possible to preserve data in an AP system by using CRDTs. If we use sets as the data structure, with union as the merge function, we can preserve all writes even in the face of arbitrary partitions:

2000 total 
1948 acknowledged 
2000 survivors All 
2000 writes succeeded. :-D

This is not linearizable consistency–and not all data structures can be expressed as CRDTs. It also does not prevent false negatives–Riak can still time out or report failures–but it does guarantee safe convergence for acknowledged writes.

If you’re using Riak, use CRDTs, or write as much of a merge function as you can. There are only a few cases (e.g. immutable data) where LWW is appropriate; avoid it everywhere else.

Measure assumptions

We’ve seen that distributed systems can behave in unexpected ways under partition–but the nature of that failure depends on many factors. The probability of data loss and inconsistency depends on your application, network, client topology, timing, the nature of the failure, and so on. Instead of telling you to choose a particular database, I encourage you to think carefully about the invariants you need, the risks you’re willing to accept, and to design your system accordingly.

A key part of building that design is measuring it. First, establish the boundaries for the system – places where it interacts with the user, the internet, or other services. Determine the guarantees which must hold at those boundaries.

Then, write a program which makes requests of the system from just outside that boundary, and measures the external consequences. Record whether HTTP requests returned 200 or 503s, or the list of comments on a post at each point. While running that program, cause a failure: kill a process, unmount a disk, or firewall nodes away from one another.

Finally, compare logs to verify the system’s guarantees. For instance, if acknowledged chat messages should be delivered at least once to their recipients, check to see whether the messages were actually delivered.

The results may be surprising; use them to investigate the system’s design, implementation, and dependencies. Consider designing the system to continuously measure its critical safety guarantees, much like you instrument performance.


Even if you don’t use MongoDB or Riak, there are some general lessons you can take away from these examples.

First, clients are an important part of the distributed system, not objective observers. Network errors mean “I don’t know,” not “It failed.” Make the difference between success, failure, and indeterminacy explicit in your code and APIs. Consider extending consistency algorithms through the boundaries of your systems: hand TCP clients ETags or vector clocks, or extend CRDTs to the browser.

Even well-known algorithms like two-phase commit have some caveats, like false negatives. SQL transactional consistency comes in several levels. If you use the stronger consistency levels, remember that conflict handling is essential.

Certain problems are hard to solve well, like maintaining an authoritative primary with failover. Consistency is a property of the data, not of the nodes. Avoid systems which assume node state consensus implies data consistency.

Wall clocks are only useful for ensuring responsiveness in the face of deadlock, and even then they’re not a positive guarantee of correctness. In these tests, all clocks were well-synchronized with NTP, and we still lost data. Even worse things can happen if a clock gets out of sync, or a node pauses for a while. Use logical clocks on your data. Distrust systems which rely on the system time, unless you’re running GPS or atomic clocks on your nodes. Measure your clock skew anyway.

Where correctness matters, rely on techniques with a formal proof and review in the literature. There’s a huge gulf between theoretically correct algorithm and live software–especially with respect to latency–but a buggy implementation of a correct algorithm is typically better than a correct implementation of a terrible algorithm. Bugs you can fix. Designs are much harder to re-evaluate.

Choose the right design for your problem space. Some parts of your architecture demand strong consistency. Other parts can sacrifice linearizability while remaining correct, as with CRDTs. Sometimes you can afford to lose data entirely. There is often a tradeoff between performance and correctness: think, experiment, and find out.

Restricting your system with particular rules can make it easier to attain safety. Immutability is an incredibly useful property, and can be combined with a mutable CP data store for powerful hybrid systems. Use idempotent operations as much as possible: it enables all sorts of queuing and retry semantics. Go one step further, if practical, and use full CRDTs.

Preventing write loss in databases like MongoDB requires a significant latency tradeoff. It might be faster to just use Postgres. Sometimes buying more reliable network and power infrastructure is cheaper than scaling out. Sometimes not.

Distributed state is a difficult problem, but with a little work we can make our systems significantly more reliable. For more on the consequences of network partitions, including examples of production failures, see this.

(Author: Kyle Kingsbury | source:

Virident vCache vs. FlashCache

Ease of basic installation. The setup process was simply a matter of installing two RPMs and running a couple of commands to enable vCache on the PCIe flash card (a Virident FlashMAX II) and set up the cache device with the command-line utilities supplied with one of the RPMs. Moreover, the vCache software is built in to the Virident driver, so there is no additional module to install. FlashCache, on the other hand, requires building a separate kernel module in addition to whatever flash memory driver you’ve already had to install, and then further configuration requires modification to assorted sysctls. I would also argue that the vCache documentation is superior. Winner: vCache.

Ease of post-setup modification / advanced installation. Many of the FlashCache device parameters can be easily modified by echoing the desired value to the appropriate sysctl setting; with vCache, there is a command-line binary which can modify many of the same parameters, but doing so requires a cache flush, detach, and reattach. Winner: FlashCache.

Operational Flexibility: Both solutions share many features here; both of them allow whitelisting and blacklisting of PIDs or simply running in a “cache everything” mode. Both of them have support for not caching sequential IO, adjusting the dirty page threshold, flushing the cache on demand, or having a time-based cache flushing mechanism, but some of these features operate differently with vCache than with FlashCache. For example, when doing a manual cache flush with vCache, this is a blocking operation. With FlashCache, echoing “1″ to the do_sync sysctl of the cache device triggers a cache flush, but it happens in the background, and while countdown messages are written to syslog as the operation proceeds, the device never reports that it’s actually finished. I think both kinds of flushing are useful in different situations, and I’d like to see a non-blocking background flush in vCache, but if I had to choose one or the other, I’ll take blocking and modal over fire-and-forget any day. FlashCache does have the nice ability to switch between FIFO and LRU for its flushing algorithm; vCache does not. This is something that could prove useful in certain situations. Winner: FlashCache.

Operational Monitoring: Both solutions offer plenty of statistics; the main difference is that FlashCache stats can be pulled from /proc but vCache stats have to be retrieved by running the vgc-vcache-monitor command. Personally, I prefer “cat /proc/something” but I’m not sure that’s sufficient to award this category to FlashCache. Winner: None.

Time-based Flushing: This wouldn’t seem like it should be a separate category, but because the behavior seems to be so different between the two cache solutions, I’m listing it here. The vCache manual indicates that “flush period” specifies the time after which dirty blocks will be written to the backing store, whereas FlashCache has a setting called “fallow_delay”, defined in the documentation as the time period before “idle” dirty blocks are cleaned from the cache device. It is not entirely clear whether or not these mechanisms operate in the same fashion, but based on the documentation, it appears that they do not. I find the vCache implementation more useful than the one present in FlashCache. Winner: vCache.

Although nobody likes a tie, if you add up the scores, usability is a 2-2-1 draw between vCache and FlashCache. There are things that I really liked better with FlashCache, and there are other things that I thought vCache did a much better job with. If I absolutely must pick a winner in terms of usability, then I’d give a slight edge to FlashCache due to configuration flexibility, but if the GA release of vCache added some of FlashCache’s additional configuration options and exposed statistics via /proc, I’d vote in the other direction.

Disclosure: The research and testing conducted for this post were sponsored by Virident.

First, some background information. All tests were conducted on Percona’s Cisco UCS C250test machine, and both the vCache and FlashCache tests used the same 2.2TB Virident FlashMAX II as the cache storage device. EXT4 is the filesystem, and CentOS 6.4 the operating system, although the pre-release modules I received from Virident required the use of the CentOS 6.2 kernel, 2.6.32-220, so that was the kernel in use for all of the benchmarks on both systems. The benchmark tool used was sysbench 0.5 and the version of MySQL used was Percona Server 5.5.30-rel30.1-465. Each test was allowed to run for 7200 seconds, and the first 3600 seconds were discarded as warmup time; the remaining 3600 seconds were averaged into 10-second intervals. All tests were conducted with approximately 78GiB of data (32 tables, 10M rows each) and a 4GiB buffer pool. The cache devices were flushed to disk immediately prior to and immediately following each test run.

With that out of the way, let’s look at some numbers.

vCache vs. vCache – MySQL parameter testing

The first test was designed to look solely at vCache performance under some different sets of MySQL configuration parameters. For example, given that the front-end device is a very fast PCIe SSD, would it make more sense to configure MySQL as if it were using SSD storage or to just use an optimized HDD storage configuration? After creating a vCache device with the default configuration, I started with a baseline HDD configuration for MySQL (configuration A, listed at the bottom of this post) and then tried three additional sets of experiments. First, the baseline configuration plus:

innodb_read_io_threads = 16
innodb_write_io_threads = 16

We call this configuration B. The next one contained four SSD-specific optimizations based partially on some earlier work that I’d done with this Virident card (configuration C):

innodb_io_capacity = 30000
innodb_adaptive_flushing_method = keep_average
innodb_max_dirty_pages_pct = 60

And then finally, a fourth test (configuration D) which combined the parameter changes from tests B and C. The graph below shows the sysbench throughput (tps) for these four configurations:
As we can see, all of the configuration options produce numbers that, in the absence of outliers, are roughly identical, but it’s configuration C (shown in the graph as the blue line – SSD config) which shows the most consistent performance. The others all have assorted performance drops scattered throughout the graph. We see the exact same pattern when looking at transaction latency; the baseline numbers are roughly identical for all four configurations, but configuration C avoids the spikes and produces a very constant and predictable result.

vCache vs. FlashCache – the basics

Once I’d determined that configuration C appeared to produce the most optimal results, I moved on to reviewing FlashCache performance versus that of vCache, and I also included a “no cache” test run as well using the base HDD MySQL configuration for purposes of comparison. Given the apparent differences in time-based flushing in vCache and FlashCache, both cache devices were set up so that time-based flushing was disabled. Also, both devices were set up such that all IO would be cached (i.e., no special treatment of sequential writes) and with a 50% dirty page threshold. Again, for comparison purposes, I also include the numbers from the vCache test where the time-based flushing is enabled.
As we’d expect, the HDD-only solution barely registered on the graph. With a buffer pool that’s much smaller than the working set, the no-cache approach is fairly crippled and ineffectual. FlashCache does substantially better, coming in at an average of around 600 tps, but vCache is about 3x better. The interesting item here is that vCache with time-based flushing enabled actually produces better and more consistent performance than vCache without time-based flushing, but even at its worst, the vCache test without time-based flushing still outperforms FlashCache by over 2x, on average.

Looking just at sysbench reads, vCache with time-based flushing consistently hit about 27000 per second, whereas without time-based flushing it averaged about 12500. FlashCache came in around 7500 or so. Sysbench writes came in just under 8000 for vCache + time-based flushing, around 6000 for vCache without time-based flushing, and somewhere around 2500 for FlashCache.

We can take a look at some vmstat data to see what’s actually happening on the system during all these various tests. Clockwise from the top left in the next graph, we have “no cache”, “FlashCache”, “vCache with no time-based flushing”, and “vCache with time-based flushing.” As the images demonstrate, the no-cache system is being crushed by IO wait. FlashCache and vCache both show improvements, but it’s not until we get to vCache with the time-based flushing that we see some nice, predictable, constant performance.

So why is it the case that vCache with time-based flushing appears to outperform all the rest? My hypothesis here is that time-based flushing allows the backing store to be written to at a more constant and, potentially, submaximal, rate compared to dirty-page-threshold flushing, which kicks in at a given level and then attempts to flush as quickly as possible to bring the dirty pages back within acceptable bounds. This is, however, only a hypothesis.

vCache vs. FlashCache – dirty page threshold

Finally, we examine the impact of a couple of different dirty-page ratios on device performance, since this is the only parameter which can be reliably varied between the two in the same way. The following graph shows sysbench OLTP performance for FlashCache vs. vCache with a 10% dirty threshold versus the same metrics at a 50% dirty threshold. Time-based flushing has been disabled. In this case, both systems produce better performance when the dirty-page threshold is set to 50%, but once again, vCache at 10% outperforms FlashCache at 10%.


The one interesting item here is that vCache actually appears to get *better* over time; I’m not entirely sure why that’s the case or at what point the performance is going to level off since these tests were all run for 2 hours anyway, but I think the overall results still speak for themselves, and even with a vCache volume where the dirty ratio is only 10%, such as might be the case where a deployment has a massive data set size in relation to both the working set and the cache device size, the numbers are encouraging.


Overall, the I think the graphs speak for themselves. When the working set outstrips the available buffer pool memory but still fits into the cache device, vCache shines. Compared to a deployment with no SSD cache whatsoever, FlashCache still does quite well, massively outperforming the HDD-only setup, but it doesn’t even really come close to the numbers obtained with vCache. There may be ways to adjust the FlashCache configuration to produce better or more consistent results, or results that are more inline with the numbers put up by vCache, but when we consider that overall usability was one of the evaluation points and combine that with the fact that the best vCache performance results were obtained with the default vCache configuration, I think vCache can be declared the clear winner.

Base MySQL & Benchmark Configuration

All benchmarks were conducted with the following:

sysbench ­­--num­-threads=32 ­­--test=tests/db/oltp.lua ­­--oltp_tables_count=32 \
--oltp­-table­-size=10000000 ­­--rand­-init=on ­­--report­-interval=1 ­­--rand­-type=pareto \
--forced­-shutdown=1 ­­--max­-time=7200 ­­--max­-requests=0 ­­--percentile=95 ­­\
--mysql­-user=root --mysql­-socket=/tmp/mysql.sock ­­--mysql­-table­-engine=innodb ­­\
--oltp­-read­-only=off run

The base MySQL configuration (configuration A) appears below:

#####fixed innodb options
innodb_file_format = barracuda
innodb_buffer_pool_size = 4G
innodb_file_per_table = true
innodb_data_file_path = ibdata1:100M
innodb_flush_method = O_DIRECT
innodb_log_buffer_size = 128M
innodb_flush_log_at_trx_commit = 1
innodb_log_file_size = 1G
innodb_log_files_in_group = 2
innodb_purge_threads = 1
innodb_fast_shutdown = 1
#not innodb options (fixed)
back_log = 50
wait_timeout = 120
max_connections = 5000
max_connect_errors = 10
table_open_cache = 10240
max_allowed_packet = 16M
binlog_cache_size = 16M
max_heap_table_size = 64M
sort_buffer_size = 4M
join_buffer_size = 4M
thread_cache_size = 1000
query_cache_size = 0
query_cache_type = 0
ft_min_word_len = 4
thread_stack = 192K
tmp_table_size = 64M
server­id = 101
key_buffer_size = 8M
read_buffer_size = 1M
read_rnd_buffer_size = 4M
bulk_insert_buffer_size = 8M
myisam_sort_buffer_size = 8M
myisam_max_sort_file_size = 10G
myisam_repair_threads = 1