RebornDB: The Next Generation Distributed Key-Value Store

pasted image 0 (1)There are many key-value stores in the world and they are widely used in many systems. E.g, we can use a Memcached to store a MySQL query result for later same query, use MongoDB to store documents for better searching, etc.

For different scenarios, we should choose different key-value store. There is no silver-bullet key-value store for all solutions. But if you just want a simple key-value store, easy to use, very fast, supporting many powerful data structures, redis may be a good choice for your start.

Redis is advanced key-value cache and store, under BSD license. It is very fast, has many data types(String, Hash, List, Set, Sorted Set …), uses RDB or AOF persistence and replication to guarantee data security, and supplies many language client libraries.

Most of all, market chooses Redis. There are many companies using Redis and it has proved its worth.

Although redis is great, it still has some disadvantages, and the biggest one is memory limitation.  Redis keeps all data in memory, which limits the whole dataset size and lets us save more data impossibly.

The official redis cluster solves this by splitting data into many redis servers, but it has not been proven in many practical environments yet. At the same time, it need us to change our client libraries to support “MOVED” redirection and other special commands, this is unacceptable in running production too. So redis cluster is not a good solution now.


We like redis, and want to go beyond its limitation, so we building a service named QDB, which is compatible with redis, saves data in disk to exceed memory limitation and keeps hot data in memory for performance.


QDB is a redis like, fast key-value store.It has below good features:

  • Compatible with Redis. If you are familiar with Redis, you can use QDB easily. QDB supports most of Redis commands and data structures (String, Hash, List, Set, Sorted Set).

  • Saves data in disk (exceeding memory limitation) and keeps hot data in memory, thanks to the backend storage.

  • Mutli backend storage support, you can choose RocksDB, LevelDB or GoLevelDB (Later, we will use RocksDB for example).

  • Bidirectional synchronization with Redis, we can sync data from Redis as a slave and replicate data into Redis as a master.

Backend Storage

QDB uses LevelDB/RocksDB/GoLevelDB as the backend storage. These storages are all based on log structured merge tree (LSM Tree) with very fast write and good read performance, at the same time, they all use bloom filter and LRU cache to improve read performance.

LevelDB is the earliest version developed by google, RocksDB is a optimized version maintained by facebook, GoLevelDB is a pure go implementation of LevelDB. If you only want a quick trial and don’t want to build and install RocksDB or LevelDB, you can use GoLevelDB directly, but we don’t recommend you to use it in production because of its low performance.

LevelDB and RocksDB are both great for your production, but we prefer RocksDB because its awesome performance, Later we may only support RocksDB and GoLevelDB, one for production and one for trial and test.


QDB is great, we can save huge data in one machine with high write/read performance. But as the dataset grows, we still meet the problem that we cannot keep all data in one machine. At the same time, the QDB server will become both a bottleneck and a single point of failure.

We must consider cluster solution now.


RebornDB is a proxy-based distributed redis cluster solution. It’s similar to twemproxy which is nearly the earliest and famousest redis proxy-based cluster solution.

But twemproxy has its own problems. It only supports static cluster topology, so we cannot add or remove redis for data re-sharding dynamically. If we run many twemproxys and want to add a backend redis, how to let all twemproxys update the configuration safely is another problem which increases complexity for IT operation. At the same time, twitter, the company developing twemproxy, has already given up and not used it in production now.

Unlike twemproxy, RebornDB has a killer feature: re-sharding dataset dynamically,  this is useful when your dataset grows quickly and you have to add more storage nodes for scalability. And above all, RebornDB will do re-sharding tranparently and not influence current running services.


We can think RebornDB is a black box and use any existent redis client to communicate with it like a single redis server. The following picture shows the RebornDB architecture.

pasted image 0

RebornDB has following components: reborn-proxy, backend store, coordinator, reborn-config, and reborn-agent.


The unique outward service for clientsis reborn-proxy. Any redis client can connect to any reborn-proxy and run commands.

Reborn-proxy parses command sent from client using RESP, dispatches it to the corresponding backend store, receives the reply and returns to the client.

Reborn-proxy is stateless, which means that you can horizontally scale redis-proxy easily to serve more requests.

We may have many reborn-proxys, how to let clients to discover them is another topic in distributed system design, but we will not dive into it here, some practical approaches are using DNS, LVS, HAProxy, etc…

Backend store

Backend store is reborn-server (a modifed redis version) or QDB. We introduces a concept named group to manage one or more backend stores. A group must have a master and zero, one, or more slaves to form a replication topology.

We split whole dataset into 1024 slots(we will use hash(key) % 1024 to determine which slot the key belongs to), and save different slots in different groups. If you want to do re-sharding, you can add a new group and let RebornDB migrate all data in one slot from other group to it.

We can also use different backend stores in different groups. E.g, we want group1 to save hot data and group2 to save large cold data, and we can use reborn-server in group1 and QDB in group2. Reborn-server is much faster than QDB, so we can guarantee hot data read/write performance.


We use zookeeper or etcd as the coordinator, which coordinate all services when we need to do some write operations, like resharding,  failover, etc.

All RebornDB informations are saved in coordinator, e.g. key routing rules, with them reborn-proxy can dispatch commands to correct backend store.


Reborn-config is the management tool, we can use it to add/remove group, add/remove store in a group, migrate data from one group to another, etc.

We must use reborn-config if we want to change RebornDB cluster information. E.g, we cann’t use “SLAVEOF NO ONE” command on backend store directly to promote a master, and must use “reborn-config server promote groupid server”. We must not only change the replication topology in group, but also update the information in coordinator too, only reborn-config can do this.

Reborn-config also supplies a web site so that you can manage RebornDB easily, and you can also use its HTTP restful API for more control.


Reborn-agent is a HA tool. You can use it to start/stop applications (reborn-config, qdb-server, reborn-server, reborn-proxy). We will discuss in detail in subsequent High Availabilitysection.


RebornDB supports resharding dynamically. How do we support this?

As we said above, we will split whole dataset into 1024 slots, and let different groups store different slots. When we add a new group, we will migrate some slots from old groups into the new group. We call this migration when in resharding, and the minimum migration unit is slot in RebornDB.

Let’s start with a simple example below:


pasted image 0 (2)


We have two groups, group1 has slots 1,2, and group2 has slots 3, 4, 5. Now group2 has a high workload, we will add a group3 and migrate slot 5 into it.

We can use following command to migrate slot 5 from group2 to group3.

reborn-config slot migrate 5 5 3

This comamnd looks simple, but we need to do more work internally to guarantee migration safety. We must use a 2PC to tell reborn-proxy that we will migrate slot5 from group2 to group3, after all reborn-proxys confirm and reponse, we will begin to migrate.


pasted image 0 (3)


The migration flow is simple: get a key in slot5, migrate its data from group2 to group3, then delete the key in group2, once again. In the end, group2 has no slot5 data and all slot5 data is in group3.

When slot5 is in migration, Reborn-proxy may handle a command which the key belongs to slot5, but reborn-proxy cannot know whether this key is in group2 or in group3 at that time. So reborn-proxy will send a key migration command to group2 first, then dispatch this command to group3.

The key migration is atomic, so we can be sure that the key is in group3 after doing migration command, whether it was in group2 or group3 before.

If no data belongs to slot5 exists in group2, we will stop migration, and the topology looks like below:


pasted image 0 (4)


High Availability

RebornDB uses reborn-agent to supply HA solution.

Reborn-agent will check applications it started whether are alive or not every second. If the reborn-agent finds an application is down, it will restart it again.

Reborn-agent is similar like supervisor but has more great features.

Reborn-agent supplies HTTP Restful API so that we can add/remove applications which need to be monitored dynamically. E.g, we can use HTTP “/api/start_redis” API to start a new reborn-server, or “/api/start_proxy” API to start a new reborn-proxy, we can also use “/api/stop” to stop a running application and remove it from monitoring list.

Reborn-agent is not only for local application monitoring, but also for backend store HA. Mutli reborn-agents will first elect a leader through coordinator, a leader reborn-agent will check backend store alive every second, if it finds the backend store is down, it will do failover. If the down backend store is slave, reborn-agent will only set it offline in coordinator, but if the down backend store is master, reborn-agent will select a new master from slaves, and do failover.


Although RebornDB has many great features, we still need more work to improve it. We may do following things later:

  • More user friendly. Now running RebornDB is not easy, we may do some work like initializing slots, adding server to group, assigning slots to one group, etc. How to reduce the user threshold must be considered for us in future work.

  • Replication migration. Now we migrate slot key by key, it is not fast when a slot has much data. Using replication migration may be better. In the above example, group2 first generate a snapshot from which group3 can get all slot5 data at that point time, then group3 syncs the changes from group2 incrementally. When we find group3 catches all data changes in slot5 with group2, we will do switchover, and delete slot5 in group2.

  • Pretty dashboard. We want to control and monitor everything through dashboard, in order to provide a better user experience.

  • P2P based cluster, now RebornDB is a proxy-based cluster solution, we may redesign whole architecture and use P2P like offical redis cluster later.


Building up a distributed key-value store is not an easy thing. The road ahead will be long and we have just made a small step now.

If you want to use a redis-like key-value store, saving more data and supporting resharding dynamically in distributed system, RebornDB is a good choice for you.

You can try it here and any advice and feedback is very welcome. :-)

How MySQL Is Able To Scale To 200 Million QPS – MySQL Cluster

This is a guest post by Andrew Morgan, MySQL Principal Product Manager at Oracle.


The purpose of this post is to introduce MySQL Cluster – which is the in-memory, real-time, scalable, highly available version of MySQL. Before addressing the incredible claim in the title of 200 Million Queries Per Second it makes sense to go through an introduction of MySQL Cluster and its architecture in order to understand how it can be achieved.

Introduction To MySQL Cluster

MySQL Cluster is a scalable, real-time in-memory, ACID-compliant transactional database, combining 99.999% availability with the low TCO of open source. Designed around a distributed, multi-master architecture with no single point of failure, MySQL Cluster scales horizontally on commodity hardware with auto-sharding to serve read and write intensive workloads, accessed via SQL and NoSQL interfaces.

Originally designed as an embedded telecoms database for in-network applications demanding carrier-grade availability and real-time performance, MySQL Cluster has been rapidly enhanced with new feature sets that extend use cases into web, mobile and enterprise applications deployed on-premise or in the cloud, including: – High volume OLTP – Real time analytics – E-commerce, inventory management, shopping carts, payment processing, fulfillment tracking, etc. – Online Gaming – Financial trading with fraud detection – Mobile and micro-payments – Session management & caching – Feed streaming, analysis and recommendations – Content management and delivery – Communications and presence services – Subscriber/user profile management and entitlements

MySQL Cluster Architecture

While transparent to the application, under the covers, there are three types of node which collectively provide service to the application. The figure shows a simplified architecture diagram of a MySQL Cluster consisting of twelve Data Nodes split across six node groups.MySQL-Cluster-Architecture

Data Nodes are the main nodes of a MySQL Cluster. They provide the following functionality: – Storage and management of both in-memory and disk-based data – Automatic and user defined partitioning (sharding) of tables – Synchronous replication of data between data nodes – Transactions and data retrieval – Automatic fail over – Automatic resynchronization after failure for self-healing

Tables are automatically sharded across the data nodes and each data node is a master accepting write operations, making it very simple to scale write-intensive workloads across commodity nodes, with complete application transparency.

By storing and distributing data in a shared-nothing architecture, i.e. without the use of a shared-disk, and synchronously replicating data to at least one replica, if a Data Node happens to fail, there will always be another Data Node storing the same information. This allows for requests and transactions to continue to be satisfied without interruption. Any transactions which are aborted during the short (sub-second) failover window following a Data node failure are rolled back and can be re-run.

It is possible to choose how to store data; either all in memory or with some on disk (non-indexed data only). In-memory storage can be especially useful for data that is frequently changing (the active working set). Data stored in-memory is routinely check pointed to disk locally and coordinated across all Data Nodes so that the MySQL Cluster can be recovered in case of a complete system failure – such as a power outage. Disk-based data can be used to store data with less strict performance requirements, where the data set is larger than the available RAM. As with most other database servers, a page-cache is used to cache frequently used disk-based data in the Data Nodes’ memory in order to increase the performance.

Application Nodes provide connectivity from the application logic to the data nodes. Applications can access the database using SQL through one or many MySQL Servers performing the function of SQL interfaces into the data stored within a MySQL Cluster. When going through a MySQL Server, any of the standard MySQL connectors can be used , offering a wide range of access technologies. Alternatively, a high performance (C++ based) interface called NDB API can be used for extra control, better real-time behavior and greater throughput. The NDB API provides a layer through which additional NoSQL interfaces can directly access the cluster, bypassing the SQL layer, allowing for lower latency and improved developer flexibility. Existing interfaces include Java, JPA, Memcached, JavaScript with Node.js and HTTP/REST (via an Apache Module). All Application Nodes can access data from all Data Nodes and so they can fail without causing a loss of service as applications can simply use the remaining nodes.

Management Nodes are responsible for publishing the cluster configuration to all nodes in the cluster and for node management. The Management Nodes are used at startup, when a node wants to join the cluster, and when there is a system reconfiguration. Management Nodes can be stopped and restarted without affecting the ongoing execution of the Data and Application Nodes. By default, the Management Node also provides arbitration services, in the event there is a network failure which leads to a split-brain or a cluster exhibiting network-partitioning.

Achieving Scalability Through Transparent Sharding


The rows from any given table are transparently split into multiple partitions/fragments. For each fragment there will be a single data node that holds all of its data and handles all reads and writes on that data. Each data node also has a buddy and together they form a node group; the buddy holds a secondary copy of the fragment as well as a primary fragment of its own. There is synchronous 2-phase commit protocol used to ensure that when a transaction has been committed the changes are guaranteed to be stored within both data nodes.

By default, a table’s Primary Key is used as the shard key and MySQL Cluster will perform an MD5 hash on that shard key to select which fragment/partition it should be stored in. If a transaction or query needs to access data from multiple data nodes then one of the data nodes takes on the role of the transaction coordinator and delegates work to the other required data nodes; the results are then combined before they’re presented to the application. Note that it is also possible to have transactions or queries that join data from multiple shards and multiple tables – this is a big advantage over typical NoSQL data stores that implement sharding.


The best (linear) scaling is achieved when high running queries/transactions can be satisfied by a single data node (as it reduces the network delays from the inter-data node messaging). To achieve this, the application can be made distribution aware – all this really means is that the person defining the schema can override what column(s) is used for the sharding key. As an example, the figure shows a table with a composite Primary Key made up of a user-id and a service name; by choosing to just use the user-id as the sharding key, all rows for a given user in this table will always be in the same fragment. Even more powerful, is the fact that if the same user-id column is used in your other tables and you designate it as the sharding key for those too then all of the given user’s data from all tables will be in the same fragment and queries/transactions on that user can be handled within a single data node.

Use NoSQL APIs For The Fastest Possible Access To Your Data

MySQL Cluster provides many ways to access the stored data; the most common method is SQL but as can be seen in the figure, there are also many native APIs that allow the application to read and write the data directly from the database without the inefficiency and development complexity of converting to SQL and passing through a MySQL Server. These APIs exist for C++, Java, JPA, JavaScript/Node.js, http and the Memcached protocol.


200 Million Queries Per Second Benchmark

There are two kinds of workloads that MySQL Cluster is designed to handle: – OLTP (On-Line Transaction Processing): Memory-optimized tables provide sub-millisecond low latency and extreme levels of concurrency for OLTP workloads while still providing durability; they can also be used alongside disk-based tables. – Ad-hoc Searches: MySQL Cluster has increased the amount of parallelism that can be used when performing a table scan – providing a significant speed-up when performing searches on un-indexed columns.

Having said that, MySQL Cluster is going to perform at its best with OLTP workloads; in particular when large numbers of queries/transactions are sent in in parallel. To this end, the flexAsynch benchmark has been used to measure how NoSQL performance scales as more data nodes are added to the cluster.


The benchmark was performed with each data node running on a dedicated 56 thread Intel E5-2697 v3 (Haswell) machine. The figure shows how the throughput scaled as the number of data nodes was increased in steps from 2 up to 32 (note that MySQL Cluster currently supports a maximum of 48 data nodes). As you can see, the the scaling is virtually linear and at 32 data nodes, the throughput hits 200 Million NoSQL Queries Per Second.

Note that the latest results and a more complete description of the tests can be found at the MySQL Cluster Benchmark page.

These 200 Million QPS benchmark was run as part of MySQL Cluster 7.4 (currently the latest GA version) – you can find out more of went into that release in this MySQL Cluster 7.4 blog post or this webinar replay.


Elements Of Scale: Composing And Scaling Data Platforms


This is a guest repost of Ben Stopford‘s epic post on Elements of Scale: Composing and Scaling Data Platforms. A masterful tour through the evolutionary forces that shape how systems adapt to key challenges.

As software engineers we are inevitably affected by the tools we surround ourselves with. Languages, frameworks, even processes all act to shape the software we build.

Likewise databases, which have trodden a very specific path, inevitably affect the way we treat mutability and share state in our applications.

Over the last decade we’ve explored what the world might look like had we taken a different path. Small open source projects try out different ideas. These grow. They are composed with others. The platforms that result utilise suites of tools, with each component often leveraging some fundamental hardware or systemic efficiency. The result, platforms that solve problems too unwieldy or too specific to work within any single tool.

So today’s data platforms range greatly in complexity. From simple caching layers or polyglotic persistence right through to wholly integrated data pipelines. There are many paths. They go to many different places. In some of these places at least, nice things are found.

So the aim for this talk is to explain how and why some of these popular approaches work. We’ll do this by first considering the building blocks from which they are composed. These are the intuitions we’ll need to pull together the bigger stuff later on.


 In a somewhat abstract sense, when we’re dealing with data, we’re really just arranging locality. Locality to the CPU. Locality to the other data we need. Accessing data sequentially is an important component of this. Computers are just good at sequential operations. Sequential operations can be predicted.

If you’re taking data from disk sequentially it’ll be pre-fetched into the disk buffer, the page cache and the different levels of CPU caching. This has a significant effect on performance. But it does little to help the addressing of data at random, be it in main memory, on disk or over the network. In fact pre-fetching actually hinders random workloads as the various caches and frontside bus fill with data which is unlikely to be used.


Whilst disk is somewhat renowned for its slow performance, main memory is often assumed to simply be fast. This is not as ubiquitously true as people often think. There are one to two orders of magnitude between random and sequential main memory workloads. Use a language that manages memory for you and things generally get a whole lot worse.

Streaming data sequentially from disk can actually outperform randomly addressed main memory. So disk may not always be quite the tortoise we think it is, at least not if we can arrange sequential access. SSD’s, particularly those that utilise PCIe, further complicate the picture as they demonstrate different tradeoffs, but the caching benefits of the two access patterns remain, regardless.


So lets imagine, as a simple thought experiment, that we want to create a very simple database. We’ll start with the basics: a file.

We want to keep writes and reads sequential, as it works well with the hardware. We can append writes to the end of the file efficiently. We can read by scanning the the file in its entirety. Any processing we wish to do can happen as the data streams through the CPU. We might filter, aggregate or even do something more complex. The world is our oyster!


So what about data that changes, updates etc?

We have two options. We could update the value in place. We’d need to use fixed width fields for this, but that’s ok for our little thought experiment. But update in place would mean random IO. We know that’s not good for performance.

Alternatively we could just append updates to the end of the file and deal with the superseded values when we read it back.

So we have our first tradeoff. Append to a ‘journal’ or ‘log’, and reap the benefits of sequential access. Alternatively if we use update in place we’ll be back to 300 or so writes per second, assuming we actually flush through to the underlying media.


Now in practice of course reading the file, in its entirety, can be pretty slow. We’ll only need to get into GB’s of data and the fastest disks will take seconds. This is what a database does when it ends up table scanning.

Also we often want something more specific, say customers named “bob”, so scanning the whole file would be overkill. We need an index.


Now there are lots of different types of indexes we could use. The simplest would be an ordered array of fixed-width values, in this case customer names, held with the corresponding offsets in the heap file. The ordered array could be searched with binary search. We could also of course use some form of tree, bitmap index, hash index, term index etc. Here we’re picturing a tree.

The thing with indexes like this is that they impose an overarching structure. The values are deliberately ordered so we can access them quickly when we want to do a read. The problem with the overarching structure is that it necessitates random writes as data flows in. So our wonderful, write optimised, append only file must be augmented by writes that scatter-gun the filesystem. This is going to slow us down.


Anyone who has put lots of indexes on a database table will be familiar with this problem. If we are using a regular rotating hard drive, we might run 1,000s of times slower if we maintain disk integrity of an index in this way.

Luckily there are a few ways around this problem. Here we are going to discuss three. These represent three extremes, and they are in truth simplifications of the real world, but the concepts are useful when we consider larger compositions.


Our first option is simply to place the index in main memory. This will compartmentalise the problem of random writes to RAM. The heap file stays on disk.

This is a simple and effective solution to our random writes problem. It is also one used by many real databases. MongoDB, Cassandra, Riak and many others use this type of optimisation. Often memory mapped files are used.

However, this strategy breaks down if we have far more data than we have main memory. This is particularly noticeable where there are lots of small objects. Our index would get very large. Thus our storage becomes bounded by the amount of main memory we have available. For many tasks this is fine, but if we have very large quantities of data this can be a burden.

A popular solution is to move away from having a single ‘overarching’ index. Instead we use a collection of smaller ones.


This is a simple idea. We batch up writes in main memory, as they come in. Once we have sufficient – say a few MB’s – we sort them and write them to disk as an individual mini-index. What we end up with is a chronology of small, immutable index files.

So what was the point of doing that? Our set of immutable files can be streamed sequentially. This brings us back to a world of fast writes, without us needing to keep the whole index in memory. Nice!

Of course there is a downside to this approach too. When we read, we have to consult the many small indexes individually. So all we have really done is shift the problem of RandomIO from writes onto reads. However this turns out to be a pretty good tradeoff in many cases. It’s easier to optimise random reads than it is to optimise random writes.

Keeping a small meta-index in memory or using a Bloom Filter provides a low-memory way of evaluating whether individual index files need to be consulted during a read operation. This gives us almost the same read performance as we’d get with a single overarching index whilst retaining fast, sequential writes.

In reality we will need to purge orphaned updates occasionally too, but that can be done with nice sequentially reads and writes.


What we have created is termed a Log Structured Merge Tree. A storage approach used in a lot of big data tools such as HBase, Cassandra, Google’s BigTable and many others. It balances write and read performance with comparatively small memory overhead.


So we can get around the ‘random-write penalty’ by storing our indexes in memory or, alternatively, using a write-optimised index structure like LSM. There is a third approach though. Pure brute force.

Think back to our original example of the file. We could read it in its entirety. This gave us many options in terms of how we go about processing the data within it. The brute force approach is simply to hold data by column rather than by row and stream only the columns required for a query, in their entirety, through the CPU. This approach is termed Columnar or Column Oriented.

(It should be noted that there is an unfortunate nomenclature clash between true column stores and those that follow the Big Table pattern. Whilst they share some similarities, in practice they are quite different. It is wise to consider them as different things.)


Column Orientation is another simple idea. Instead of storing data as a set of rows, appended to a single file, we split each row by column. We then store each column in a separate file.

We keep the order of the files the same, so row N has the same position (offset) in each column file. This is important because we will need to read multiple columns to service a single query, all at the same time. This means ‘joining’ columns on the fly. If the columns are in the same order we can do this in a tight loop which is very cache- and cpu-efficient. Many implementations make heavy use of vectorisation to further optimise throughput for simple join and filter operations.

Writes can leverage the benefit of being append-only. The downside is that we now have many files to update, one for every column in every individual write to the database. The most common solution to this is to batch writes in a similar way to the one used in the LSM approach above. Many columnar databases also impose an overall order to the table as a whole to increase their read performance for one chosen key.


By splitting data by column we significantly reduce the amount of data that needs to be brought from disk, so long as our query operates on a subset of all columns.

In addition to this, data in a single column generally compresses well. We can take advantage of the data type of the column to do this, if we have knowledge of it. This means we can often use efficient, low cost encodings such as run-length, delta, bit-packed etc. For some encodings predicates can be used directly on the uncompressed stream too.

The result is a brute force approach that will work particularly well for operations that require large scans. Aggregate functions like average, max, min, group by etc are typical of this.

This is very different to using the ‘heap file & index’ approach we covered earlier. A good way to understand this is to ask yourself: what is the difference between a columnar approach like this vs a ‘heap & index’ where indexes are added to every field?


The answer to this lies in the ordering of the index files. BTrees etc will be ordered by the fields they index. Joining the data in two indexes involves a streaming operation on one side, but on the other side the index lookups have to read random positions in the second index. This is generally less efficient than joining two indexes (columns) that retain the same ordering. Again we’re leveraging sequential access.


So many of the best technologies which we may want to use as components in a data platform will leverage one of these core efficiencies to excel for a certain set of workloads.

Storing indexes in memory, over a heap file, is favoured by many NoSQL stores such as Riak, Couchbase or MongoDB as well as some relational databases. It’s a simple model that works well.

Tools designed to work with larger data sets tend to take the LSM approach. This gives them fast ingestion as well as good read performance using disk based structures. HBase, Cassandra, RocksDB, LevelDB and even Mongo now support this approach.

Column-per-file engines are used heavily in MPP databases like Redshift or Vertica as well as in the Hadoop stack using Parquet. These are engines for data crunching problems that require large traversals. Aggregation is the home ground for these tools.

Other products like Kafka apply the use of a simple, hardware efficient contract to messaging. Messaging, at it’s simplest, is just appending to a file, or reading from a predefined offset. You read messages from an offset. You go away. You come back. You read from the offset you previously finished at. All nice sequential IO.

This is different to most message oriented middleware. Specifications like JMS and AMQP require the addition of indexes like the ones discussed above, to manage selectors and session information. This means they often end up performing more like a database than a file. Jim Gray made this point famously back in his 1995 publication Queue’s are Databases.

So all these approaches favour one tradeoff or other, often keeping things simple, and hardware sympathetic, as a means of scaling.


So we’ve covered some of the core approaches to storage engines. In truth we made some simplifications. The real world is a little more complex. But the concepts are useful nonetheless.

Scaling a data platform is more than just storage engines though. We need to consider parallelism.


When distributing data over many machines we have two core primitives to play with: partitioning and replication. Partitioning, sometimes called sharding, works well both for random access and brute force workloads.

If a hash-based partitioning model is used the data will be spread across a number of machines using a well-known hash function. This is similar to the way a hash table works, with each bucket being held on a different machine.

The result is that any value can be read by going directly to the machine that contains the data, via the hash function. This pattern is wonderfully scalable and is the only pattern that shows linear scalability as the number of client requests increases. Requests are isolated to a single machine. Each one will be served by just a single machine in the cluster.


We can also use partitioning to provide parallelism over batch computations, for example aggregate functions or more complex algorithms such as those we might use for clustering or machine learning. The key difference is that we exercise all machines at the same time, in a broadcast manner. This allows us to solve a large computational problem in a much shorter time, using a divide and conquer approach.

Batch systems work well for large problems, but provide little concurrency as they tend to exhaust the resources on the cluster when they execute.


So the two extremes are pretty simple: directed access at one end, broadcast, divide and conquer at the other. Where we need to be careful is in the middle ground that lies between the two. A good example of this is the use of secondary indexes in NoSQL stores that span many machines.

A secondary index is an index that isn’t on the primary key. This means the data will not be partitioned by the values in the index. Directed routing via a hash function is no longer an option. We have to broadcast requests to all machines. This limits concurrency. Every node must be involved in every query.

For this reason many key value stores have resisted the temptation to add secondary indexes, despite their obvious use. HBase and Voldemort are examples of this. But many others do expose them, MongoDB, Cassandra, Riak etc. This is good as secondary indexes are useful. But it’s important to understand the effect they will have on the overall concurrency of the system.


The route out of this concurrency bottleneck is replication. You’ll probably be familiar with replication either from using async slave databases or from replicated NoSQL stores like Mongo or Cassandra.

In practice replicas can be invisible (used only for recovery), read only (adding read concurrency) or read write (adding partition tolerance). Which of these you choose will trade off against the consistency of the system. This is simply the application of CAP theorem (although cap theorem also may not be as simple as you think).


This tradeoff with consistency* brings us to an important question. When does consistency matter?

Consistency is expensive. In the database world ACID is guaranteed by serialisability. This is essentially ensuring that all operations appear to occur in sequential order. It turns out to be a pretty expensive thing. In fact it’s prohibitive enough that many databases don’t offer it as an isolation level at all. Those that do never set it as the default.

Suffice to say that if you apply strong consistency to a system that does distributed writes you’ll likely end up in tortoise territory.

(* note the term consistency has two common usages. The C in ACID and the C in CAP. They are unfortunately not the same. I’m using the CAP definition: all nodes see the same data at the same time)


The solution to this consistency problem is simple. Avoid it. If you can’t avoid it isolate it to as few writers and as few machines as possible.

Avoiding consistency issues is often quite easy, particularly if your data is an immutable stream of facts. A set of web logs is a good example. They have no consistency concerns as they are just facts that never change.

There are other use cases which do necessitate consistency though. Transferring money between accounts is an oft used example. Non-commutative actions such as applying discount codes is another.

But often things that appear to need consistency, in a traditional sense, may not. For example if an action can be changed from a mutation to a new set of associated facts we can avoid mutable state. Consider marking a transaction as being potentially fraudulent. We could update it directly with the new field. Alternatively we could simply use a separate stream of facts that links back to the original transaction.


So in a data platform it’s useful to either remove the consistency requirement altogether, or at least isolate it. One way to isolate is to use the single writer principal, this gets you some of the way. Datomic is a good example of this. Another is to physically isolate the consistency requirement by splitting mutable and immutable worlds.

Approaches like Bloom/CALM extend this idea further by embracing the concept of disorder by default, imposing order only when necessary.

So those were some of the fundamental tradeoffs we need to consider. Now how to we pull these things together to build a data platform?


A typical application architecture might look something like the below. We have a set of processes which write data to a database and read it back again. This is fine for many simple workloads. Many successful applications have been built with this pattern. But we know it works less well as throughput grows. In the application space this is a problem we might tackle with message-passing, actors, load balancing etc.

The other problem is this approach treats the database as a black box. Databases are clever software. They provide a huge wealth of features. But they provide little mechanism for scaling out of an ACID world. This is a good thing in many ways. We default to safety. But it can become an annoyance when scaling is inhibited by general guarantees which may be overkill for the requirements we have.


The simplest route out of this is CQRS (Command Query Responsibility Segregation).

Another very simple idea. We separate read and write workloads. Writes go into something write-optimised. Something closer to a simple journal file. Reads come from something read-optimised. There are many ways to do this, be it tools like Goldengate for relational technologies or products that integrate replication internally such as Replica Sets in MongoDB.


Many databases do something like this under the hood. Druid is a nice example. Druid is an open source, distributed, time-series, columnar analytics engine. Columnar storage works best if we input data in large blocks, as the data must be spread across many files. To get good write performance Druid stores recent data in a write optimised store. This is gradually ported over to the read optimised store over time.

When Druid is queried the query routes to both the write optimised and read optimised components. The results are combined (‘reduced’) and returned to the user. Druid uses time, marked on each record, to determine ordering.

Composite approaches like this provide the benefits of CQRS behind a single abstraction.


Another similar approach is to use an Operational/Analytic Bridge. Read- and write-optimised views are separated using an event stream. The stream of state is retained indefinitely, so that the async views can be recomposed and augmented at a later date by replaying.

So the front section provides for synchronous reads and writes. This can be as simple as immediately reading data that was written or as complex as supporting ACID transactions.

The back end leverages asynchronicity, and the advantages of immutable state, to scale offline processing through replication, denormalisation or even completely different storage engines. The messaging-bridge, along with joining the two, allows applications to listen to the data flowing through the platform.

As a pattern this is well suited to mid-sized deployments where there is at least a partial, unavoidable requirement for a mutable view.


If we are designing for an immutable world, it’s easier to embrace larger data sets and more complex analytics. The batch pipeline, one almost ubiquitously implemented with the Hadoop stack, is typical of this.

The beauty of the Hadoop stack comes from it’s plethora of tools. Whether you want fast read-write access, cheap storage, batch processing, high throughput messaging or tools for extracting, processing and analysing data, the Hadoop ecosystem has it all.

The batch pipeline architecture pulls data from pretty much any source, push or pull. Ingests it into HDFS then processes it to provide increasingly optimised versions of the original data. Data might be enriched, cleansed, denormalised, aggregated, moved to a read optimised format such as Parquet or loaded into a serving layer or data mart. Data can be queried and processed throughout this process.

This architecture works well for immutable data, ingested and processed in large volume. Think 100’s of TBs plus. The evolution of this architecture will be slow though. Straight-through timings are often measured in hours.


The problem with the Batch Pipeline is that we often don’t want to wait hours to get a result. A common solution is to add a streaming layer aside it. This is sometimes referred to as theLambda Architecture.

The Lambda Architecture retains a batch pipeline, like the one above, but it circumvents it with a fast streaming layer. It’s a bit like building a bypass around a busy town. The streaming layer typically uses a streaming processing tool such as Storm or Samza.

The key insight of the Lambda Architecture is that we’re often happy to have an approximate answer quickly, but we would like an accurate answer in the end.

So the streaming layer bypasses the batch layer providing the best answers it can within a streaming window. These are written to a serving layer. Later the batch pipeline computes an accurate data and overwrites the approximation.

This is a clever way to balance accuracy with responsiveness. Some implementations of this pattern suffer if the two branches end up being dual coded in stream and batch layers. But it is often possible to simply abstract this logic into common libraries that can be reused, particularly as much of this processing is often written in external libraries such as Python or R anyway. Alternatively systems like Spark provide both stream and batch functionality in one system (although the streams in Spark are really micro-batches).

So this pattern again suits high volume data platforms, say in the 100TB+ range, that want to combine streams with existing, rich, batch based analytic function.


There is another approach to this problem of slow data pipelines. It’s sometimes termed theKappa architecture. I actually thought this name was ‘tongue in cheek’ but I’m now not so sure. Whichever it is, I’m going to use the term Stream Data Platform, which is a term in use also.

Stream Data Platform’s flip the batch pattern on its head. Rather than storing data in HDFS, and refining it with incremental batch jobs, the data is stored in a scale out messaging system, or log, such as Kafka. This becomes the system of record and the stream of data is processed in real time to create a set of tertiary views, indexes, serving layers or data marts.

This is broadly similar to the streaming layer of the Lambda architecture but with the batch layer removed. Obviously the requirement for this is that the messaging layer can store and vend very large volumes of data and there is a sufficiently powerful stream processor to handle the processing.

There is no free lunch so, for hard problems, Stream Data Platform’s will likely run no faster than an equivalent batch system, but switching the default approach from ‘store and process’ to ‘stream and process’ can provide greater opportunity for faster results.


Finally, the Stream Data Platform approach can be applied to the problem of ‘application integration’. This is a thorny and difficult problem that has seen focus from big vendors such as Informatica, Tibco and Oracle for many years. For the most part results have been beneficial, but not transformative. Application integration remains a topic looking for a real workable solution.

Stream Data Platform’s provide an interesting potential solution to this problem. They take many of the benefits of an O/A bridge – the variety of asynchronous storage formats and ability to recreate views – but leave the consistency requirement isolated in, often existing sources:


With the system of record being a log it’s easy to enforce immutability. Products like Kafka can retain enough volume internally to be used as a historic record. This means recovery can be a process of replaying and regenerating state, rather than constantly checkpointing.

Similarly styled approaches have been taken before in a number of large institutions with tools such as Goldengate, porting data to enterprise data warehouses or more recently data lakes. They were often thwarted by a lack of throughput in the replication layer and the complexity of managing changing schemas. It seems unlikely the first problem will continue. As for the later problem though, the jury is still out.


So we started with locality. With sequential addressing for both reads and writes. This dominates the tradeoffs inside the components we use. We looked at scaling these components out, leveraging primitives for both sharding and replication. Finally we rebranded consistency as a problem we should isolate in the platforms we build.

But data platforms themselves are really about balancing the sweet-spots of these individual components within a single, holistic form. Incrementally restructuring. Migrating the write-optimised to the read-optimised. Moving from the constraints of consistency to the open plains of streamed, asynchronous, immutable state.

This must be done with a few things in mind. Schemas are one. Time, the peril of the distributed, asynchronous world, is another. But these problems are manageable if carefully addressed. Certainly the future is likely to include more of these things, particularly as tooling, innovated in the big data space, percolates into platforms that address broader problems, both old and new.


MongoDB 3.0 with a new storage engine

A lot has happened in MongoDB technology over the past year. For starters:

  • The big news in MongoDB 3.0* is the WiredTiger storage engine. The top-level claims for that are that one should “typically” expect (individual cases can of course vary greatly):
    • 7-10X improvement in write performance.
    • No change in read performance (which however was boosted in MongoDB 2.6).
    • ~70% reduction in data size due to compression (disk only).
    • ~50% reduction in index size due to compression (disk and memory both).
  • MongoDB has been adding administration modules.
    • A remote/cloud version came out with, if I understand correctly, MongoDB 2.6.
    • An on-premise version came out with 3.0.
    • They have similar features, but are expected to grow apart from each other over time. They have different names.

*Newly-released MongoDB 3.0 is what was previously going to be MongoDB 2.8. My clients at MongoDB finally decided to give a “bigger” release a new first-digit version number.

To forestall confusion, let me quickly add:

  • MongoDB acquired the WiredTiger product and company, and continues to sell the product on a standalone basis, as well as bundling a version into MongoDB. This could cause confusion because …
  • … the standalone version of WiredTiger has numerous capabilities that are not in the bundled MongoDB storage engine.
  • There’s some ambiguity as to when MongoDB first “ships” a feature, in that …
  • … code goes to open source with an earlier version number than it goes into the packaged product.

I should also clarify that the addition of WiredTiger is really two different events:

  • MongoDB added the ability to have multiple plug-compatible storage engines. Depending on how one counts, MongoDB now ships two or three engines:
    • Its legacy engine, now called MMAP v1 (for “Memory Map”). MMAP continues to be enhanced.
    • The WiredTiger engine.
    • A “please don’t put this immature thing into production yet” memory-only engine.
  • WiredTiger is now the particular storage engine MongoDB recommends for most use cases.

I’m not aware of any other storage engines using this architecture at this time. In particular, last I heard TokuMX was not an example. (Edit: Actually, see Tim Callaghan’s comment below.)

Most of the issues in MongoDB write performance have revolved aroundlocking, the story on which is approximately:

  • Until MongoDB 2.2, locks were held at the process level. (One MongoDB process can control multiple databases.)
  • As of MongoDB 2.2, locks were held at the database level, and some sanity was added as to how long they would last.
  • As of MongoDB 3.0, MMAP locks are held at the collection level.
  • WiredTiger locks are held at the document level. Thus MongoDB 3.0 with WiredTiger breaks what was previously a huge write performance bottleneck.

In understanding that, I found it helpful to do a partial review of what “documents” and so on in MongoDB really are.

  • A MongoDB document is somewhat like a record, except that it can be more like what in a relational database would be all the records that define a business object, across dozens or hundreds of tables.*
  • A MongoDB collection is somewhat like a table, although the documents that comprise it do not need to each have the same structure.
  • MongoDB documents want to be capped at 16 MB in size. If you need one bigger, there’s a special capability called GridFS to break it into lots of little pieces (default = 1KB) while treating it as a single document logically.

*One consequence — MongoDB’s single-document ACID guarantees aren’t quite as lame as single-record ACID guarantees would be in an RDBMS.

By the way:

  • Row-level locking was a hugely important feature in RDBMS about 20 years ago. Sybase’s lack of it is a big part of what doomed them to second-tier status.
  • Going forward, MongoDB has made the unsurprising marketing decision to talk about “locks” as little as possible, relying instead on alternate terms such as “concurrency control”.

Since its replication mechanism is transparent to the storage engine, MongoDB allows one to use different storage engines for different replicas of data. Reasons one might want to do this include:

  • Fastest persistent writes (WiredTiger engine).
  • Fastest reads (wholly in-memory engine).
  • Migration from one engine to another.
  • Integration with some other data store. (Imagine, for example, a future storage engine that works over HDFS. It probably wouldn’t have top performance, but it might make Hadoop integration easier.)

In theory one can even do a bit of information lifecycle management (ILM), by using different storage engines for different subsets of database, by:

  • Pinning specific shards of data to specific servers.
  • Using different storage engines on those different servers.

That said, similar stories have long been told about MySQL, and I’m not aware of many users who run multiple storage engines side by side.

The MongoDB WiredTiger option is shipping with a couple of options for block-level compression (plus prefix compression that is being used for indexes only). The full WiredTiger product also has some forms of columnar compression for data.

One other feature in MongoDB 3.0 is the ability to have 50 replicas of data (the previous figure was 12). MongoDB can’t think of a great reason to have more than 3 replicas per data center or more than 2 replicas per metropolitan area, but some customers want to replicate data to numerous locations around the world.

ScaleDB: A NewSQL Alternative to Amazon RDS and Oracle RAC with 10x Performance Gain

In the world of relational databases, MySQL and Oracle RAC (Real Application Cluster) own a significant segment of the market. Oracle RAC owns the majority of the enterprise market, while MySQL gained significant popularity amongst many of the Web 2.0 sites.

Today, both of those databases are owned by Oracle (MySQL was acquired by Sun which was later acquired by Oracle).

The following diagrams show the enterprise database marketshare covered by Gartner and Cloud Database market share covered by Jalistic – a Java PaaS provider.

The acquisition of MySQL by Oracle raised concern over the future of the project due to the inherent conflict of interest between its two database products. Oracle RAC is the company’s main “cash cow” product, while MySQL competes for the same audience.

Shortly after Oracle’s acquisition of MySQL, the open source database was forked by one of its original founders into a new project named MariaDB. MariaDB was established to provide an alternative development and support option to MySQL and is now becoming the default database of choice of RedHat.

MySQL vs Oracle RAC Clustering Models

The two databases take a fairly different approach to scalability.

Oracle RAC is based on a shared storage model. With Oracle, the data is broken into strips that are spread across multiple devices, and multiple database servers operate concurrently and in sync over the (shared) data.

MySQL, on the other hand, does not use a shared data model. With MySQL, a given data set is managed by a single server (a model called shared nothing). With MySQL, scaling and High Availability (HA) is achieved by managing copies of the data. As only a single machine can update the data, this mode can only scale-up by adding more capacity to the machine that owns the database. As machines have limits to capacity yet must keep up with large amounts of data or many users, the database needs to be broken into several independent databases (a process called sharding). However, sharding is a complex process, is not dynamic and requires assumptions on data distribution, data growth and the queries. For these reasons, sharding is not possible with many applications. In particular, a cloud deployment is expected to be elastic to dynamically support changing needs and user behaviors.

For High Availability, the MySQL shared nothing approach uses Primary/Backup model with a single master and a backup node (called slave) that manage a copy of the data. Each update to the master requires that the same update will be executed on the slave. The primary and slave nodes require some handshake protocol to determine who is the master and sync the changes of the data. The master node performs updates/writes to the persistent file-system and the level of High Availability is set by the DBA, who decides if the slave can lag behind or needs to confirm the duplicate updates within each transaction.

For scaling, this model can use the slave as a read replica (or make additional copies of the data), a method called horizontal scaling in which read requests are spread across the different copies of the data. (However, all the writes need to go to the master and then be reflected on the replicas/slaves.)

Relational Databases on the Cloud

The high-end position of Oracle RAC, the low-cost and open source nature of MySQL, along with the adoption of the cloud as the infrastructure platform led to a vastly different method of deployment of databases in the cloud.

Oracle RAC took an approach similar to the old mainframe: to produce a pre-engineered appliance (called Exadata) that comes with the software and hardware integrated. That approach was specifically aimed at existing customers of Oracle RAC who needed a quick resolution to their scalability needs without redesigning their applications. Plugging a more optimized stack helped to push the scalability bar without changing the existing applications that rely on the database.

Amazon launched RDS, which is basically an on-demand version of the MySQL database. This approach fits nicely with the existing users of MySQL who are looking for a more affordable way to run their database in the cloud.

The main limitation of the Amazon RDS approach is that it inherits the limitations of MySQL and thus provides a limited and relatively complex read-scalability and doesn’t provide a good solution for write-scalability other than the scale-up approach.

ScaleDB – a NewSQL Alternative to Amazon RDS and Oracle RAC

ScaleDB is a NewSQL database that takes an approach similar to the read/write scalability model of Oracle RAC and implements it as a platform that transparently supports the MySQL (or MariaDB) database. As a result, existing MySQL/MariaDB applications can leverage the platform without any changes – they use MySQL or MariaDB as the front end engine which connects to the ScaleDB platform that provides a new and more concurrent database and storage services. This approach makes it possible to run multiple MySQL or MariaDB server instances against a distributed storage in a shared data model (similar to Oracle RAC).

The diagram below shows in high level how ScaleDB works.

Each Database Node runs a standard MariaDB database instance with ScaleDB plugged as the database engine and as an alternative storage and index device.

Scaling of the data and the index is done by processing requests concurrently from multiple database instances and leveraging the ScaleDB distributed storage tier (at the storage/file-system level) where data and index elements are spread evenly across multiple storage nodes of the cluster. Read more on how ScaleDB works.

ScaleDB vs Other NewSQL Databases

Most of the NewSQL databases are based on MySQL as the engine behind their implementation. The main difference between many NewSQLs and ScaleDB is that NewSQL databases brings the NoSQL approach into the SQL world, where ScaleDB takes the Oracle RAC shared storage approach to scale the database.

ScaleDB can deliver write/read scale while offering close to 100% compatibility, whereas in many of the alternative NewSQL approaches, scaling would often involve significant changes in the data model and queries semantics.

Benchmark – ScaleDB vs Amazon RDS

To demonstrate the difference between a standard cloud deployment of MySQL and a ScaleDB deployment and in order to find whether the ScaleDB approach can live up to its promise we conducted a benchmark comparing Amazon RDS scalability for write/read workloads with that of ScaleDB. We tested a dataset that does not fit to the memory (RAM) of a single machine and used  the largest machines offered by Amazon. We required that scaling would be dynamic and that all types of queries would be supported. These requirements made sharding a no-go option.

The benchmark is based on the popular YCSB – Yahoo Benchmark as the benchmarking framework.

The results of that benchmark are shown in the graphs below.

Both illustrate a relatively flat scalability with Amazon RDS and a close to linear scalability on the ScaleDB front.

Benchmark environment:

  • Benchmark Framework – YCSB – Yahoo Benchmark
  • Cloud environment: Amazon
  • Machine Type: Extra Large
  • Data Element Size (per row) – 1k
  • Data Capacity: 50GB
  • Zones – Single zone.
  • RDS was set with 1000 provisional ios
  • ScaleDB cluster setup – 2 database nodes, 2 data volumes (4 machines -data is striped over 2 machines and each machine had a mirror).
  • ScaleDB MySQL engine – MariaDB

Running ScaleDB on OpenStack and Other Clouds with Cloudify

The thing that got me excited about the project is that it serves as a perfect fit for many of our enterprise customers who are using Cloudify for moving their existing applications to their private cloud without code change. Those customers are looking for a simple way to scale their applications and many of them run today on Oracle RAC for that purpose.

The move of enterprises to a cloud-based environment also drives the demand for a more affordable approach to handle their database scaling, which is provided by ScaleDB.

On the other hand, setting up a database cluster can be a fairly complex task.

By creating a Cloudify recipe for ScaleDB, we could remove a large part of that complexity and set up an entire ScaleDB database and storage environment through a single click.

In this way we can run ScaleDB on demand as we would with RDS and scale on both read and write as with Oracle Exadata, only in a more affordable fashion.


1. Cloudify Recipe for running Scale DB

2. Yahoo Benchmark

3. ScaleDB Architecture white paper

4. ScaleDB user manual

(Via Nati Shalom’s Blog)


Under the hood: MySQL Pool Scanner (MPS)

Facebook has one of the largest MySQL database clusters in the world. This cluster comprises many thousands of servers across multiple data centers on two continents.

Operating a cluster of this size with a small team is achieved by automating nearly everything a conventional MySQL Database Administrator (DBA) might do so that the cluster can almost run itself. One of the core components of this automation is a system we call MPS, short for “MySQL Pool Scanner.”

MPS is a sophisticated state machine written mostly in Python. It replaces a DBA for many routine tasks and enables us to perform maintenance operations in bulk with little or no human intervention.

A closer look at a single database node
Every one of the thousands of database servers we have can house a certain number of MySQL instances. An instance is a separate MySQL process, listening on a separate port with its own data set. For simplicity, we’ll assume exactly two instances per server in our diagrams and examples.

The entire data set is split into thousands of shards, and every instance holds a group of such shards, each in its own database schema. A Facebook user’s profile is assigned to a shard when the profile is created and every shard holds data relating to thousands of users.

It’s more easily explained by a diagram of a single database server:


Every instance has a few copies of its data on other instances that are hosted on different servers, and usually in a different data center. This is done to achieve two main goals:

1.    High Availability – If a server goes down, we have the data available elsewhere, ready to be served.
2.    Performance – Different geographical regions have their own replicas so that reads are served                  locally.
The way we achieve this is through simple MySQL master/slave replication. Every instance is part of a replica set. A replica set has a master and multiple slaves. All writes to a replica set must occur on the master. The slaves subscribe to a stream of these write events, and the events are replayed on them as soon as they arrive. Since the master and the slaves have nearly identical data, a read can occur on any one of the instances in the replica set.

Here is a diagram of a simple replica set, where each server hosts only one instance, and the other instance is empty (we call these spares):


A server is essentially a container for instances, so in reality things can get much more complicated.
For example, a single server hosting a master instance may also be hosting a slave instance for a different master, like so:


There are two important “building block” operations MPS relies on:

1.    Creating a copy/placing a server

The first building block is an operation that creates a copy of an instance on a different host. We use a modified version of Xtrabackup to perform most copy operations. A replacement is the same operation if we remove an instance after a copy successfully completes.

First, the system allocates a new spare instance for the operation. We choose one of the slaves or the master and copy its data to the freshly allocated spare instance. This diagram shows a replacement operation, where an instance is removed when the copy is complete:


2.    Promoting a master instance

The second building block is the action of promoting a different instance to be the master in a replica set.
During a promotion, we select a target for the promotion, stop writes to the replica set, change the slaves to replicate from the new master, and resume writes. In the diagram, we show a deletion operation in which the old master is discarded after the promotion is completed successfully. For simplicity, the replica set below consists of only three instances:


These two operations (which are usually complex procedures for most companies running MySQL) have been completely automated to a point where MPS can run them hundreds or thousands of times a day in a fast and safe manner, without any human intervention.

Host management and states
Now that we’ve got the basics out of the way, we can dive into more abstract concepts that utilize these building blocks.

MPS works with a repository that holds the current state and metadata for all our database hosts, and current and past MPS copy operations. This registry is managed by the database servers themselves so that it scales with the database cluster and MPS doesn’t need a complex application server installation. MPS itself is in fact stateless, running on its own pool of hosts and relying on the repository for state management. States are processed separately and in parallel.

When a server “wakes up” in a datacenter (for example, a fresh rack has been connected and provisioned), it will start running a local agent every few minutes. This agent performs the following steps:

  1. Collect data about itself. (Where am I? What hardware do I have? What software versions am I running?)
  2. Triage the host for problems. (Did I wake up in an active cluster? Are my disks OK? Are my flash cards healthy?)
  3. Make sure the server is registered and contains up-to-date metadata in the central repository.
  4. On the first run, place instances on the server in an initial “reimage” state if there is no current record of this server. This is where new servers start their lives in MPS.
  5. So every few minutes, every healthy server “checks in” to this central repository and updates how it’s doing, allowing things like data use and system health to be kept in sync.

The smallest unit MPS manages at the moment is an instance. Each instance can be in various states. The important states are as follows:

  • Production: Instance is serving production traffic.
  • Spare: Instance is ready to be copied to or allocated to some other task.
  • Spare allocated: Instance has been chosen as the target for a copy, and a copy is in progress.
  • Spare deallocated: Temporary triaging state. Instance has been removed from production and is pending triaging and cleanup. No instances stay here for more than a few minutes.
  • Drained: The instance is not being used, and is being reserved for testing, data center maintenance, etc. An operator intervention is required to take a host out of this state.
  • Reimage: Servers with all instances in this state are being reimaged or are in the process of being repaired. Servers in this state are handed off and managed by a co-system called Windex, which was discussed in a previous post.

An instance may move between states due to MPS executing an action or an operator intervention. This state diagram shows the main states and the actions that may cause an instance to move between those states.


The diagram above describes only a small subset of possible paths an instance may take in MPS. The state changes described here are the ones that result from simple copy and maintenance operations. There are many other reasons for instances to change states, and hardcoding all the options and checks would create software that is difficult and complex to maintain. Meet “problems,” another fundamental concept in MPS.

A “problem” is a property that is attached to an instance. If all instances on a host have this problem, we consider it to be attached to the server itself. Another way to think of problems is like tags. MPS consults a decision matrix that helps it make decisions about instances with a specific problem. It is basically a map between tuples: (state, problem) – (action, state).

It is easier to understand with some examples:

  • (production, low-space) – (replace, spare deallocated): Replace an instance in production with limited space, moving it to a different server.
  • (spare de-allocated, old-kernel) – (move, reimage): If an instance happened to move through this state, it has no production data on it, so why not reimage it?
  • (production, master-in-fallback-location) – (promote, production): We should promote this master instance to the correct location, and leave the instance in the production state.

The various states and “problems” in MPS allow us to create a flexible and maintainable infrastructure to manage a server’s life cycle.

Examples of common failure resolution and maintenance operations
In a large data center, there are tens or hundreds of server failures a day. Here are a few examples of common day-to-day failures that MPS takes care of without human intervention:

  • Broken slave instances are detected and disabled until they are replaced in the background.
  • Broken master instances are demoted so that healthy replicas take the place of their fallen brethren and get replaced in the background.
  • Instances on servers that might run out of space due to growth are moved to underutilized servers.

With thousands of servers, site-wide maintenance tasks like upgrading to a new kernel, changing partition sizes, or upgrading firmware on controllers become very complex. The same goes for localized operations such as moving some racks or allocating test servers for our engineering teams. Here are some common maintenance operations an operator can ask MPS to perform with a single command:

  • Drain any number of database racks for maintenance and take them out of production. Most such operations complete in less than 24 hours.
  • Re-image thousands of machines (to perform kernel upgrades, for example) at a specified concurrency. MPS will replace each machine and then send it to Windex.
  • Allocate any number of spares to be used for a new project or testing. Want 200 servers to run tests? No problem.
  • Create a copy of the entire Facebook data set at a new data center at a specified concurrency–building out our new Lulea data center, for example!

Automating away the mundane tasks with MPS allow us to better scale the number of servers we manage, and frees up the MySQL Operations team to work on more exciting challenges.

(source: Facebook Engineering)

Need Help With Database Scalability? Understand I/O

This is a guest post by Zardosht Kasheff, Software Developer at Tokutek, a storage engine company that delivers 21st-Century capabilities to the leading open source data management platforms.

As software developers, we value abstraction. The simpler the API, the more attractive it becomes. Arguably, MongoDB’s greatest strengths are its elegant API and itsagility, which let developers simply code.

But when MongoDB runs into scalability problems on big data, developers need to peek underneath the covers to understand the underlying issues and how to fix them. Without understanding, one may end up with an inefficient solution that costs time and money. For example, one may shard prematurely, increasing hardware and management costs, when a simpler replication setup would do. Or, one may increase the size of a replica set when upgrading to SSDs would suffice.

This article shows how to reason about some big data scalability problems in an effort to find efficient solutions.

Defining the Issues

First, let’s define the application. We are discussing MongoDB applications. That means we are addressing a document-store database that supports secondary indexes and shardedclusters. In the context of other NoSQL technologies, such as Riak or Cassandra, we may discuss these I/O bottlenecks differently, but this article just focuses on the properties ofMongoDB.

Second, what does the application do? Are we processing transactions on-line (OLTP) or are we doing analytical processing (OLAP)? For this article, we are discussing OLTP applications. OLAP applications have big data challenges that MongoDB may or may not be able to address, but this article focuses on OLTP applications.

Third, what’s big data? By big data, we mean that we are accessing and using more data than we can fit in RAM on a single machine. As a result, if the data resides on one server, then most of it must reside on disk, and require I/O to access. Note that we are not discussing scenarios where the database is large, but the data accessed or used (sometimes called the “working set”) is small. An example would be storing years of data, but the application only frequently accesses the last day’s worth.

Fourth, what are the limiting factors in OLTP applications with big data? In short: I/O. Hard disk drives do at most hundreds of I/O’s per second. RAM, on the other hand, accesses data millions of times per second. The disparity in these limits causes I/O a bottleneck for big data applications.

Lastly, how do we solve our I/O bottlenecks? With analytical thinking. Formulas and direct instructions can get us most of the way there, but a lasting solution requires understanding. Users must look at the I/O characteristics of their application and make design decisions to best fit those characteristics.

Cost Model

To solve I/O bottlenecks, we first need to understand what database operations induce I/O.

One can argue that MongoDB, and many other databases, underneath all of the bells and whistles, perform three basic operations:

● Point Query: Find a single document. Given the location of a document somewhere either on disk or in memory, retrieve the document. On big data, where the document is likely not in memory, this operation probably causes an I/O.

● Range Query: Find some number of documents in succession in an index. This is generally a MUCH more effective operation than a point query, because the data we are looking for is packed together on disk and brought into memory with very few I/Os. A range query used to retrieve 100 documents may induce 1 I/O, whereas 100 point queries to retrieve 100 documents may induce 100 I/O’s.

● Write: Write a document to the database. For traditional databases such asMongoDB, this may cause I/O. For databases with write-optimized data structures, such as TokuMX, this induces very little I/O. Unlike traditional databases, write-optimized data structures are able to amortize the I/O performed against many inserts.

Understanding the I/O implications of these three basic operations leads to understanding the I/O used by MongoDB statements made against a database. MongoDB takes these three basic operations and builds four basic user level operations:

● Inserts. This writes a new document to the database.

● Queries. Using an index on a collection, this does a combination of range queries and point queries. If the index is a covering index or a clustering index (conceptually the same as TokuDB for MySQL’s clustering index), then the query is likely doing just range queries. If not, then a combination of range queries and point queries are used. I explain these concepts in an indexing talk. The talk uses MySQL, but all of the concepts apply to MongoDB and TokuMX as well.

● Updates and Deletes. These are a combination of queries and writes. A query is used to find the documents to be updated or deleted, and then writes are used update or remove the found documents.

Now that we understand the cost model, to resolve I/O bottlenecks, the user needs tounderstand where the application induces I/O. This is where we need to break some abstraction and peek at how the database behaves. Does the I/O come from queries? If so, how are the queries behaving that is causing the I/O? Does it come from updates? If it comes from updates, is it coming from the query used in the update or the insert used in the update? Once the user understands what is inducing the I/O, steps can be taken to resolve the bottleneck.

Assuming we understand the I/O characteristics of the application, we can talk about several approaches to addressing them. The approach I like to take is this: first attack the problem with software, and when that is not enough, then attack the problem with hardware. Software is cheaper and easier to maintain.

Possible Software Solutions

A possible software solution is one where we change the software or the application to reduce I/O usage. Here are possible solutions for different bottlenecks

Problem: Inserts Causing Too Much I/O.

Possible Solution: Use a write optimized database, such as TokuMX. One of TokuMX’sbenefits is drastically reducing the I/O requirements of writes in databases by using Fractal Trees indexes.

Problem: Queries Causing Too Much I/O.

Possible Solutions: Use better indexing. Reduce point queries by using range queries instead.

In my talk, “Understanding Indexing”, I explain how to reason about indexes to reduce I/O for queries. It’s difficult to summarize the talk in one paragraph, but the gist is as follows. One can reduce the I/O of the application by avoiding doing individual point queries to retrieve each document. To do this, we use covering or clustering indexes that smartly filter the documents analyzed by the query, and can report results using range queries.

Better indexing may not be sufficient. If you have an OLTP application and your queries are essentially point queries (because they retrieve very few documents), then even with proper indexes, you may still have an I/O bottleneck. In such cases, a hardware solution is probably necessary.

Also, additional indexes increase the cost of insertions, as each insertion must keep the indexes up to date as well, but write-optimized databases mitigate that cost. This is where we need to approach the problem with an understanding of our application. For some applications, this is practical, and for others it is not.

Problem: Updates/Deletes Cause Too Much I/O

Solution: Combine the solutions above.

Updates and deletes are tricky in that they are a combination of queries and inserts. Improving their performance requires a good understanding of the cost of the operation. Which part of the operation induces I/O? Is it the query? If so, one can try to improve the indexing. Is it the write? Is it both? Based on what part is inducing the I/O, we apply the solutions above.

One mistake many make is taking a write-optimized database such as TokuMX and expect it to eliminate I/O bottlenecks of updates and deletes without changing any of the indexing. A write-optimized database is not enough. The implicit query within an update/delete must be handled as well.

Possible Hardware Solutions

As mentioned above, when software solutions are not enough we look to hardware. Let’s look at these possibilities, and analyze their benefits and drawbacks:

● Buy more memory to hopefully get more, if not all, of your working set into memory.

● Increase your IOPS by moving to an SSD.

● Buy more machines and move to a scaled out solution. This can be:

○ Read scaling via replication

○ Sharding

Buying more memory

RAM is expensive, and there is only so much RAM one can put on a single machine. Simply put, if data is large enough, keeping it in RAM is not a feasible option. This solution may work for a wide range of applications, but our focus here is tackling applications that cannot do this.

Moving to SSDs

Making the storage device an SSD is a practical solution for improving throughput. If I/O is the limiting factor of your application, an increase in IOPS (I/Os per second) naturally leads to an increase in throughput. This may be simple and practical.

The cost model of SSDs is different than the cost model of hard disks. SSDs dramatically improve your I/O throughput, but are not cheap. They are smaller, cost more, and wear out faster. Therefore, the cost per GB of SSDs is quite higher than the cost per GB of hard disks. To keep costs down, data compression becomes a key.

So, the cost of the hardware increases, but the cost of managing the application does not.

Read scaling via replication

Read scaling with replication is effective for applications where queries are the bottleneck. The idea is as follows:

● Use replication to store multiple copies of your data on separate machines

● Distribute reads across the machines, thereby improving read throughput

By taking the reads that used to bottleneck on one machine and spreading them out, more resources are available for the application to either handle more concurrent queries with the same write workload or to increase the write workload on the application.

If inserts, updates, or deletes are your bottleneck, then replication may not be very effective, because the write work is duplicated on all servers that are added to the replica set. The machine that takes the data input (called the master in MySQL or primary in MongoDB) will still have the same bottleneck.


Sharding partitions your data across different replica sets based on a shard key. Different replica sets in the cluster are responsible for ranges of values in the shard key space. So, an application’s write throughput is increased by spreading the write workload across separate replica sets in a cluster. For high write workloads, sharding can be very effective.

By partitioning the data by ranges in the shard key space, queries that use the shard key can effectively do range queries on a few shards, making such queries very efficient. If one makes the shard key a hash, then all range queries must run on all shards in the cluster, but point queries on the shard key run on single shards.

Because MongoDB is schema-less and does not support joins, sharding is elegant and relatively easy to use. If the solutions above are not enough to handle your application and more resources are required, then one can argue that sharding is inevitable.

Nevertheless, sharding is arguably the most heavyweight solution and has a high cost. For starters, your entire hardware budget is multiplied. You do not just add machines to a shardedsetup, you add entire replica sets. You need to add and manage config servers. Due to these costs, one should really consider if sharding is truly necessary. Usually, any one of the solutions presented above are cheaper.

Another big challenge is selecting a shard key. A good shard key has the following properties:

● Many (if not all) of your queries use the shard key. Any query that does not use it must run on all shards. This can be a pain point.

● The shard key should do a good job of distributing writes to different replica sets in the cluster. If all writes are directed to the same replica set in the cluster, then that replica set becomes a bottleneck for writes, just as it was in a non-sharded setup. This makes something like a timestamp a very bad shard key.

These requirements are not always easy to fill. Sometimes, a good shard key does not exist, making sharding ineffective.

In conclusion, many solutions may work, but none is always guaranteed, not even sharding. This is why understanding the characteristics of the application is crucial. These solutions are tools. How best to use the tools, is up to the user.