RebornDB: The Next Generation Distributed Key-Value Store

pasted image 0 (1)There are many key-value stores in the world and they are widely used in many systems. E.g, we can use a Memcached to store a MySQL query result for later same query, use MongoDB to store documents for better searching, etc.

For different scenarios, we should choose different key-value store. There is no silver-bullet key-value store for all solutions. But if you just want a simple key-value store, easy to use, very fast, supporting many powerful data structures, redis may be a good choice for your start.

Redis is advanced key-value cache and store, under BSD license. It is very fast, has many data types(String, Hash, List, Set, Sorted Set …), uses RDB or AOF persistence and replication to guarantee data security, and supplies many language client libraries.

Most of all, market chooses Redis. There are many companies using Redis and it has proved its worth.

Although redis is great, it still has some disadvantages, and the biggest one is memory limitation.  Redis keeps all data in memory, which limits the whole dataset size and lets us save more data impossibly.

The official redis cluster solves this by splitting data into many redis servers, but it has not been proven in many practical environments yet. At the same time, it need us to change our client libraries to support “MOVED” redirection and other special commands, this is unacceptable in running production too. So redis cluster is not a good solution now.

QDB

We like redis, and want to go beyond its limitation, so we building a service named QDB, which is compatible with redis, saves data in disk to exceed memory limitation and keeps hot data in memory for performance.

Introduction

QDB is a redis like, fast key-value store.It has below good features:

  • Compatible with Redis. If you are familiar with Redis, you can use QDB easily. QDB supports most of Redis commands and data structures (String, Hash, List, Set, Sorted Set).

  • Saves data in disk (exceeding memory limitation) and keeps hot data in memory, thanks to the backend storage.

  • Mutli backend storage support, you can choose RocksDB, LevelDB or GoLevelDB (Later, we will use RocksDB for example).

  • Bidirectional synchronization with Redis, we can sync data from Redis as a slave and replicate data into Redis as a master.

Backend Storage

QDB uses LevelDB/RocksDB/GoLevelDB as the backend storage. These storages are all based on log structured merge tree (LSM Tree) with very fast write and good read performance, at the same time, they all use bloom filter and LRU cache to improve read performance.

LevelDB is the earliest version developed by google, RocksDB is a optimized version maintained by facebook, GoLevelDB is a pure go implementation of LevelDB. If you only want a quick trial and don’t want to build and install RocksDB or LevelDB, you can use GoLevelDB directly, but we don’t recommend you to use it in production because of its low performance.

LevelDB and RocksDB are both great for your production, but we prefer RocksDB because its awesome performance, Later we may only support RocksDB and GoLevelDB, one for production and one for trial and test.

RebornDB

QDB is great, we can save huge data in one machine with high write/read performance. But as the dataset grows, we still meet the problem that we cannot keep all data in one machine. At the same time, the QDB server will become both a bottleneck and a single point of failure.

We must consider cluster solution now.

Introduction

RebornDB is a proxy-based distributed redis cluster solution. It’s similar to twemproxy which is nearly the earliest and famousest redis proxy-based cluster solution.

But twemproxy has its own problems. It only supports static cluster topology, so we cannot add or remove redis for data re-sharding dynamically. If we run many twemproxys and want to add a backend redis, how to let all twemproxys update the configuration safely is another problem which increases complexity for IT operation. At the same time, twitter, the company developing twemproxy, has already given up and not used it in production now.

Unlike twemproxy, RebornDB has a killer feature: re-sharding dataset dynamically,  this is useful when your dataset grows quickly and you have to add more storage nodes for scalability. And above all, RebornDB will do re-sharding tranparently and not influence current running services.

Architecture

We can think RebornDB is a black box and use any existent redis client to communicate with it like a single redis server. The following picture shows the RebornDB architecture.

pasted image 0

RebornDB has following components: reborn-proxy, backend store, coordinator, reborn-config, and reborn-agent.

Reborn-proxy

The unique outward service for clientsis reborn-proxy. Any redis client can connect to any reborn-proxy and run commands.

Reborn-proxy parses command sent from client using RESP, dispatches it to the corresponding backend store, receives the reply and returns to the client.

Reborn-proxy is stateless, which means that you can horizontally scale redis-proxy easily to serve more requests.

We may have many reborn-proxys, how to let clients to discover them is another topic in distributed system design, but we will not dive into it here, some practical approaches are using DNS, LVS, HAProxy, etc…

Backend store

Backend store is reborn-server (a modifed redis version) or QDB. We introduces a concept named group to manage one or more backend stores. A group must have a master and zero, one, or more slaves to form a replication topology.

We split whole dataset into 1024 slots(we will use hash(key) % 1024 to determine which slot the key belongs to), and save different slots in different groups. If you want to do re-sharding, you can add a new group and let RebornDB migrate all data in one slot from other group to it.

We can also use different backend stores in different groups. E.g, we want group1 to save hot data and group2 to save large cold data, and we can use reborn-server in group1 and QDB in group2. Reborn-server is much faster than QDB, so we can guarantee hot data read/write performance.

Coordinator

We use zookeeper or etcd as the coordinator, which coordinate all services when we need to do some write operations, like resharding,  failover, etc.

All RebornDB informations are saved in coordinator, e.g. key routing rules, with them reborn-proxy can dispatch commands to correct backend store.

Reborn-config

Reborn-config is the management tool, we can use it to add/remove group, add/remove store in a group, migrate data from one group to another, etc.

We must use reborn-config if we want to change RebornDB cluster information. E.g, we cann’t use “SLAVEOF NO ONE” command on backend store directly to promote a master, and must use “reborn-config server promote groupid server”. We must not only change the replication topology in group, but also update the information in coordinator too, only reborn-config can do this.

Reborn-config also supplies a web site so that you can manage RebornDB easily, and you can also use its HTTP restful API for more control.

Reborn-agent

Reborn-agent is a HA tool. You can use it to start/stop applications (reborn-config, qdb-server, reborn-server, reborn-proxy). We will discuss in detail in subsequent High Availabilitysection.

Resharding

RebornDB supports resharding dynamically. How do we support this?

As we said above, we will split whole dataset into 1024 slots, and let different groups store different slots. When we add a new group, we will migrate some slots from old groups into the new group. We call this migration when in resharding, and the minimum migration unit is slot in RebornDB.

Let’s start with a simple example below:

 

pasted image 0 (2)

 

We have two groups, group1 has slots 1,2, and group2 has slots 3, 4, 5. Now group2 has a high workload, we will add a group3 and migrate slot 5 into it.

We can use following command to migrate slot 5 from group2 to group3.

reborn-config slot migrate 5 5 3

This comamnd looks simple, but we need to do more work internally to guarantee migration safety. We must use a 2PC to tell reborn-proxy that we will migrate slot5 from group2 to group3, after all reborn-proxys confirm and reponse, we will begin to migrate.

 

pasted image 0 (3)

 

The migration flow is simple: get a key in slot5, migrate its data from group2 to group3, then delete the key in group2, once again. In the end, group2 has no slot5 data and all slot5 data is in group3.

When slot5 is in migration, Reborn-proxy may handle a command which the key belongs to slot5, but reborn-proxy cannot know whether this key is in group2 or in group3 at that time. So reborn-proxy will send a key migration command to group2 first, then dispatch this command to group3.

The key migration is atomic, so we can be sure that the key is in group3 after doing migration command, whether it was in group2 or group3 before.

If no data belongs to slot5 exists in group2, we will stop migration, and the topology looks like below:

 

pasted image 0 (4)

 

High Availability

RebornDB uses reborn-agent to supply HA solution.

Reborn-agent will check applications it started whether are alive or not every second. If the reborn-agent finds an application is down, it will restart it again.

Reborn-agent is similar like supervisor but has more great features.

Reborn-agent supplies HTTP Restful API so that we can add/remove applications which need to be monitored dynamically. E.g, we can use HTTP “/api/start_redis” API to start a new reborn-server, or “/api/start_proxy” API to start a new reborn-proxy, we can also use “/api/stop” to stop a running application and remove it from monitoring list.

Reborn-agent is not only for local application monitoring, but also for backend store HA. Mutli reborn-agents will first elect a leader through coordinator, a leader reborn-agent will check backend store alive every second, if it finds the backend store is down, it will do failover. If the down backend store is slave, reborn-agent will only set it offline in coordinator, but if the down backend store is master, reborn-agent will select a new master from slaves, and do failover.

Todo……

Although RebornDB has many great features, we still need more work to improve it. We may do following things later:

  • More user friendly. Now running RebornDB is not easy, we may do some work like initializing slots, adding server to group, assigning slots to one group, etc. How to reduce the user threshold must be considered for us in future work.

  • Replication migration. Now we migrate slot key by key, it is not fast when a slot has much data. Using replication migration may be better. In the above example, group2 first generate a snapshot from which group3 can get all slot5 data at that point time, then group3 syncs the changes from group2 incrementally. When we find group3 catches all data changes in slot5 with group2, we will do switchover, and delete slot5 in group2.

  • Pretty dashboard. We want to control and monitor everything through dashboard, in order to provide a better user experience.

  • P2P based cluster, now RebornDB is a proxy-based cluster solution, we may redesign whole architecture and use P2P like offical redis cluster later.

Conclusion

Building up a distributed key-value store is not an easy thing. The road ahead will be long and we have just made a small step now.

If you want to use a redis-like key-value store, saving more data and supporting resharding dynamically in distributed system, RebornDB is a good choice for you.

You can try it here and any advice and feedback is very welcome. :-)
(via HighScalability.com)

Linux Kernel 4.1 Released

Version 4.1 of the Linux kernel was released this week, and it includes a number of new features in the following areas.

Power Management

Several power-saving features have been integrated into the kernel, and the majority of them target high-end Intel chips. The improvements consist of tweaks to improve performance and reduce power consumption, and early reports claim they can help extend battery life by up to an hour.

EXT4 Encryption

It’s now possible to encrypt EXT4 filesystems, thanks to code contributed by Google. The code originally was developed for Android and has been pushed up-line to the mainstream kernel. This will be interesting news to government contractors and others who work in secure environments, where filesystem-level encryption is mandated.

Graphics

Version 4.1 includes improvements that effect several GPUs, including the GeForce GTX series. NVIDIA enjoys a troubled relationship with Linux, to which many users and developers will attest, and the company’s unwillingness to release open-source driver code has created a lot of problems for the kernel team. Linus has been quite vocal on the subject. Here’s a brief summary of his views on the subject (contains cursing): https://www.youtube.com/watch?v=iYWzMvlj2RQ.

The new version includes improvements that make it easier to configure Nouveau for the GeForce GTX 750. In the past, the process involved the manual extraction of firmware blobs, which is exactly as painful as it sounds.

Intel GPU support in virtual machines had been a bit troublesome in the past, but Linux 4.1 now supports XenGT vGPU for use with the Intel Xen hypervisor. A version for KVM is in the works too.

Version 4.1 also supports DisplayPort MST for Radeon cards.

Better Laptop Support

Updates to the Atmel MXT driver mean that the touchscreen and touchpad of Google’s Pixel 2 Chromebook are now fully supported.

Linux 4.1 also now can control the keyboard backlights on Dell laptops, which is great news for midnight typists!

The updated Toshiba ACPI driver now supports a host of fancy features, including adaptive keyboard buttons, USB sleep and charge, hotkeys, keyboard backlighting and more.

Game Support

X-Box One controllers gain force feedback and rumbler support in kernel version 4.1.

These changes are just the highlights; there are tons of other entries in the change logs.

Currently, 4.1 has not yet made it into the repos of the major distributions. That’s understandable, as it’s only just been released, but it’s only a matter of time until it is included. For instance, Ubuntu will start supporting it in October 2015. If you want to start using it before then, you can install it from a custom repository (as provided by the community), or compile it from source.
(via Linuxjournal.com)

POWER8 packs more than twice the big data punch

Last week, in London, the top minds in financial technology came face to face at the STAC Summit to discuss the technology challenges facing the financial industry. The impact big data is having on the financial industry was a hot topic, and many discussions revolved around having the best technology on hand for handling data faster to gain a competitive advantage.

It was in this setting that IBM shared recent benchmark results revealing that an IBM POWER8-based system server can deliver more than twice the performance of the best x86 server when running standard financial industry workloads.

IBM-POWER8-x86-STAC-benchmarks

In fact, IBM’s POWER8-based systems set four new performance records for financial workloads. According to the most recent published and certified STAC-A2 benchmarks, IBM POWER8-based systems deliver:

  • 3x performance over the best 2-socket solution using x86 CPUs
  • 1x performance for path scaling over the best 4-socket solution using x86 CPUs (NEW PUBLIC RECORD)
  • 7x performance over the best 2 x86 CPU and one Xeon Phi co-processor
  • 16 percent increase for asset capacity over the best 4-socket solution (NEW PUBLIC RECORD)

Why the financial industry created the STAC benchmark

STAC-A2 is a set of standard benchmarks that help estimate the relative performance of full systems running complete financial applications. This enables clients in the financial industry to evaluate how IBM POWER8-based systems will perform on real applications.

The IBM Power System S 824 delivered more than twice the performance of the best x86 based server measuredSTAC-A2 gives a much more accurate view of the expected performance as compared to micro benchmarks or simple code loops. STAC recently performed STAC-A2 Benchmark tests on a stack consisting of the STAC-A2 Pack for Linux on an IBM Power System S824 server using two IBM POWER8 Processor cards at 3.52 GHz and 1TB of DRAM, with Red Hat Enterprise Linux version 7.

And, as reported above, according to audited STAC results, the IBM Power System S824 delivered more than twice the performance of the best x86 based server measured. Those are the kind of results that matter—real results for real client challenges.

POWER8 processors are based on high performance, multi-threaded cores with each core of the Power System S824 server running up to eight simultaneous threads at 3.5 GHz. Power System S824 also has a very high bandwidth memory interface that runs at 192 GB/s per socket which is almost three times the speed of a typical x86 processor. These factors along with a balanced system structure including a large internal 8MB per core L3 are the primary reasons why financial computing workloads run significantly faster on POWER8-based systems than alternatives.

The STAC-A2 financial industry benchmarks add to the performance data that the Cabot Partners published recently. Cabot evaluated the performance of POWER8-based systems versus x86-based systems, evaluating functionality, performance and price/performance across several industries, including life sciences, financial services, oil and gas and analytics, referencing standard benchmarks as well as application oriented benchmark data.

The findings in the STAC-A2 benchmarking report position POWER8 as the ideal platform for the financial industry. This data, combined with the recently published Cabot Partners report, represents overwhelming proof that IBM POWER8-based systems take the performance lead in the financial services space (and beyond)—clearly packing a stronger punch when compared to the competition.

 

(via Smartercomputingblog.com)

Socket Sharding in NGINX Release 1.9.1

NGINX release 1.9.1 introduces a new feature that enables use of the SO_REUSEPORT socket option, which is available in newer versions of many operating systems, including DragonFly BSD and Linux (kernel version 3.9 and later). This option allows multiple sockets to listen on the same IP address and port combination. The kernel then load balances incoming connections across the sockets, effectively sharding the socket.

(For NGINX Plus customers, this feature will be available in Release 7, which is scheduled for later this year.)

The SO_REUSEPORT socket option has many real-world applications, such as the potential for easy rolling upgrades of services. For NGINX, it improves performance by more evenly distributing connections across workers.

As depicted in the figure, when the SO_REUSEPORT option is not enabled, a single listening socket by default notifies workers about incoming connections as the workers become available. If you include the accept_mutex off directive in the events context, the single listener instead notifies all workers about a new connection at the same time, putting them in competition to grab it. This is known as the thundering herd problem.

Slack-for-iOS-Upload-1-e1432652484191

With the SO_REUSEPORT option enabled, there is a separate listening socket for each worker. The kernel determines which available socket (and by implication, which worker) gets the connection. While this does decrease latency and improve performance as workers accept connections, it can also mean that workers are given new connections before they are ready to handle them.

Slack-for-iOS-Upload-e1432652376641

Configuring Socket Sharding

To enable the SO_REUSEPORT socket option, include the new reuseport parameter to the listen directive, as in this example:


http {
     server {
          listen       80 reuseport;
          server_name  localhost;
          …
     }
}

Including the reuseport parameter disables accept_mutex for the socket, because the lock is redundant with reuseport. It can still be worth setting accept_mutex if there are ports on which you don’t set reuseport.

Benchmarking the Performance Improvement

I ran a wrk benchmark with 4 NGINX workers on a 36-core AWS instance. To eliminate network effects, I ran both client and NGINX on localhost, and also had NGINX return the string OK instead of a file. I compared three NGINX configurations: the default (equivalent to accept_mutex on), with accept_mutex off, and with reuseport. As shown in the figure, reuseport increases requests per second by 2 to 3 times, and reduces both latency and the standard deviation for latency.
reuseport-benchmark

I also ran a more real-world benchmark with the client and NGINX on separate hosts and with NGINX returning an HTML file. As shown in the chart, with reuseport the decrease in latency was similar to the previous benchmark, and the standard deviation decreased even more dramatically (almost ten-fold). Other results (not shown in the chart) were also encouraging. With reuseport, the load was spread evenly across the worker processes. In the default condition (equivalent to accept_mutex on), some workers got a higher percentage of the load, and with accept_mutex off all workers experienced high load.

Latency (ms) Latency stdev (ms) CPU Load
Default 15.65 26.59 0.3
accept_mutex off 15.59 26.48 10
reuseport 12.35 3.15 0.3

(Via Nginx’s blog)

Web Crawling & Analytics Case Study – Database Vs Self Hosted Message Queuing Vs Cloud Message Queuing

image00

The Business Problem:

 To build a repository of used car prices and identify trends based on data available from used car dealers. The solution to the problem necessarily involved building large scale crawlers to crawl & parse thousands of used car dealer websites everyday.

1st Solution: Database Driven Solution to Crawling & Parsing

Our initial Infrastructure consisted of a crawling, parsing and database insertion web services all written in Python. When the crawling web service finishes with crawling a web site it pushes the output data to the database & the parsing web service picks it from there & after parsing the data, pushes the structured data into the database.

 

arch-with-db

 Problems with Database driven approach:

  • Bottlenecks: Writing the data into database and reading it back proved be a huge bottleneck and slowed down the entire process & left to high & low capacity issues in the crawling & parsing functions.

  • High Processing Cost: Due to the slow response time of many websites the parsing service would remain mostly idle which lead to a very high cost of servers & processing.

We tried to speed up the process by directly posting the data to the parsing service from crawling service but this resulted in loss of data when the parsing service was busy. Additionally, the approach presented a massive scaling challenge from read & write bottlenecks from the database.

2nd Solution: Self Hosted / Custom Deployment Using RabbitMQ

arch-with-rabbit-mq

To overcome the above mentioned problems and to achieve the ability to scale we moved to a new architecture using RabbitMQ. In the new architecture crawlers and parsers were Amazon EC2 micro instances. We used Fabric to push commands to the scripts running in the instances. The crawling instance would pull the used car dealer website from the website queue, crawl the relevant pages and push output the data to a crawled pages queue.The parsing instance would pull the data from the crawled pages queue, parse them and push data into parsed data queue and a data base insertion script would transfer that data into Postgres.

 This approach speeded up the crawling and parsing cycle. Scaling was just a matter of adding more instances created from specialized AMIs.

Problems with RabbitMQ Approach:

  • Setting up, deploying & maintaining this infrastructure across hundreds of servers was a nightmare for a small team

  • We suffered data losses every time there was a deployment & maintenance issues. Due to the tradeoff we were forced to make between speed and persistence of data in RabbitMQ, there was a chance we lost some valuable data if the server hosting RabbitMQ crashed.

3rd Solution: Cloud Messaging Deployment Using IronMQ & IronWorker

The concept of having multiple queues and multiple crawlers and parsers pushing and pulling data from them gave us a chance to scale the infrastructure massively. We were looking for solutions which could help us overcome the above problems using a similar architecture but without the headache of deployment & maintenance management.

The architecture, business logic & processing methods of using Iron.io & Ironworkers were similar to RabbitMQ but without the deployment & maintenance efforts. All our code is written in python and since Iron.io supports python we could set up the crawl & parsing workers and queues within 24 hours with minimal deployment & maintenance efforts. Reading and writing data into IronMQ is fast and all the messages in IronMQ are persistent and the chance of losing data is very less.


arch-with-iron-io

COMPARISON MATRIX BETWEEN DIFFERENT APPROACHES

Key Variables Database Driven Batch Processing Self Hosted – RabbitMQ Cloud Based – Iron MQ
Speed of processing a batch Slow Fast Fast
Data Loss from Server Crashes & Production Issues Low Risk Medium Risk Low Risk
Custom Programming for Queue Management High Effort Low Effort Low Effort
Set Up for Queue Management NA Medium Effort Low Effort
Deployment & Maintenance of Queues NA High Effort Low Effort

(Via DataScienceCentral.com)

How MySQL Is Able To Scale To 200 Million QPS – MySQL Cluster

This is a guest post by Andrew Morgan, MySQL Principal Product Manager at Oracle.

MySQL_Cluster_400

The purpose of this post is to introduce MySQL Cluster – which is the in-memory, real-time, scalable, highly available version of MySQL. Before addressing the incredible claim in the title of 200 Million Queries Per Second it makes sense to go through an introduction of MySQL Cluster and its architecture in order to understand how it can be achieved.

Introduction To MySQL Cluster

MySQL Cluster is a scalable, real-time in-memory, ACID-compliant transactional database, combining 99.999% availability with the low TCO of open source. Designed around a distributed, multi-master architecture with no single point of failure, MySQL Cluster scales horizontally on commodity hardware with auto-sharding to serve read and write intensive workloads, accessed via SQL and NoSQL interfaces.

Originally designed as an embedded telecoms database for in-network applications demanding carrier-grade availability and real-time performance, MySQL Cluster has been rapidly enhanced with new feature sets that extend use cases into web, mobile and enterprise applications deployed on-premise or in the cloud, including: – High volume OLTP – Real time analytics – E-commerce, inventory management, shopping carts, payment processing, fulfillment tracking, etc. – Online Gaming – Financial trading with fraud detection – Mobile and micro-payments – Session management & caching – Feed streaming, analysis and recommendations – Content management and delivery – Communications and presence services – Subscriber/user profile management and entitlements

MySQL Cluster Architecture

While transparent to the application, under the covers, there are three types of node which collectively provide service to the application. The figure shows a simplified architecture diagram of a MySQL Cluster consisting of twelve Data Nodes split across six node groups.MySQL-Cluster-Architecture

Data Nodes are the main nodes of a MySQL Cluster. They provide the following functionality: – Storage and management of both in-memory and disk-based data – Automatic and user defined partitioning (sharding) of tables – Synchronous replication of data between data nodes – Transactions and data retrieval – Automatic fail over – Automatic resynchronization after failure for self-healing

Tables are automatically sharded across the data nodes and each data node is a master accepting write operations, making it very simple to scale write-intensive workloads across commodity nodes, with complete application transparency.

By storing and distributing data in a shared-nothing architecture, i.e. without the use of a shared-disk, and synchronously replicating data to at least one replica, if a Data Node happens to fail, there will always be another Data Node storing the same information. This allows for requests and transactions to continue to be satisfied without interruption. Any transactions which are aborted during the short (sub-second) failover window following a Data node failure are rolled back and can be re-run.

It is possible to choose how to store data; either all in memory or with some on disk (non-indexed data only). In-memory storage can be especially useful for data that is frequently changing (the active working set). Data stored in-memory is routinely check pointed to disk locally and coordinated across all Data Nodes so that the MySQL Cluster can be recovered in case of a complete system failure – such as a power outage. Disk-based data can be used to store data with less strict performance requirements, where the data set is larger than the available RAM. As with most other database servers, a page-cache is used to cache frequently used disk-based data in the Data Nodes’ memory in order to increase the performance.

Application Nodes provide connectivity from the application logic to the data nodes. Applications can access the database using SQL through one or many MySQL Servers performing the function of SQL interfaces into the data stored within a MySQL Cluster. When going through a MySQL Server, any of the standard MySQL connectors can be used , offering a wide range of access technologies. Alternatively, a high performance (C++ based) interface called NDB API can be used for extra control, better real-time behavior and greater throughput. The NDB API provides a layer through which additional NoSQL interfaces can directly access the cluster, bypassing the SQL layer, allowing for lower latency and improved developer flexibility. Existing interfaces include Java, JPA, Memcached, JavaScript with Node.js and HTTP/REST (via an Apache Module). All Application Nodes can access data from all Data Nodes and so they can fail without causing a loss of service as applications can simply use the remaining nodes.

Management Nodes are responsible for publishing the cluster configuration to all nodes in the cluster and for node management. The Management Nodes are used at startup, when a node wants to join the cluster, and when there is a system reconfiguration. Management Nodes can be stopped and restarted without affecting the ongoing execution of the Data and Application Nodes. By default, the Management Node also provides arbitration services, in the event there is a network failure which leads to a split-brain or a cluster exhibiting network-partitioning.

Achieving Scalability Through Transparent Sharding

MySQL-Cluster-Partitioning

The rows from any given table are transparently split into multiple partitions/fragments. For each fragment there will be a single data node that holds all of its data and handles all reads and writes on that data. Each data node also has a buddy and together they form a node group; the buddy holds a secondary copy of the fragment as well as a primary fragment of its own. There is synchronous 2-phase commit protocol used to ensure that when a transaction has been committed the changes are guaranteed to be stored within both data nodes.

By default, a table’s Primary Key is used as the shard key and MySQL Cluster will perform an MD5 hash on that shard key to select which fragment/partition it should be stored in. If a transaction or query needs to access data from multiple data nodes then one of the data nodes takes on the role of the transaction coordinator and delegates work to the other required data nodes; the results are then combined before they’re presented to the application. Note that it is also possible to have transactions or queries that join data from multiple shards and multiple tables – this is a big advantage over typical NoSQL data stores that implement sharding.

MySQL-Cluster-shard-keys

The best (linear) scaling is achieved when high running queries/transactions can be satisfied by a single data node (as it reduces the network delays from the inter-data node messaging). To achieve this, the application can be made distribution aware – all this really means is that the person defining the schema can override what column(s) is used for the sharding key. As an example, the figure shows a table with a composite Primary Key made up of a user-id and a service name; by choosing to just use the user-id as the sharding key, all rows for a given user in this table will always be in the same fragment. Even more powerful, is the fact that if the same user-id column is used in your other tables and you designate it as the sharding key for those too then all of the given user’s data from all tables will be in the same fragment and queries/transactions on that user can be handled within a single data node.

Use NoSQL APIs For The Fastest Possible Access To Your Data

MySQL Cluster provides many ways to access the stored data; the most common method is SQL but as can be seen in the figure, there are also many native APIs that allow the application to read and write the data directly from the database without the inefficiency and development complexity of converting to SQL and passing through a MySQL Server. These APIs exist for C++, Java, JPA, JavaScript/Node.js, http and the Memcached protocol.

NoSQL-Access-to-MySQL-Cluster-data

200 Million Queries Per Second Benchmark

There are two kinds of workloads that MySQL Cluster is designed to handle: – OLTP (On-Line Transaction Processing): Memory-optimized tables provide sub-millisecond low latency and extreme levels of concurrency for OLTP workloads while still providing durability; they can also be used alongside disk-based tables. – Ad-hoc Searches: MySQL Cluster has increased the amount of parallelism that can be used when performing a table scan – providing a significant speed-up when performing searches on un-indexed columns.

Having said that, MySQL Cluster is going to perform at its best with OLTP workloads; in particular when large numbers of queries/transactions are sent in in parallel. To this end, the flexAsynch benchmark has been used to measure how NoSQL performance scales as more data nodes are added to the cluster.

200-Million-Queries-Per-Second

The benchmark was performed with each data node running on a dedicated 56 thread Intel E5-2697 v3 (Haswell) machine. The figure shows how the throughput scaled as the number of data nodes was increased in steps from 2 up to 32 (note that MySQL Cluster currently supports a maximum of 48 data nodes). As you can see, the the scaling is virtually linear and at 32 data nodes, the throughput hits 200 Million NoSQL Queries Per Second.

Note that the latest results and a more complete description of the tests can be found at the MySQL Cluster Benchmark page.

These 200 Million QPS benchmark was run as part of MySQL Cluster 7.4 (currently the latest GA version) – you can find out more of went into that release in this MySQL Cluster 7.4 blog post or this webinar replay.

(Via HighScalability.com)

Designing For Scale – Three Principles And Three Practices From Tapad Engineering

This is a guest post by Toby Matejovsky, Director of Engineering at Tapad (@TapadEng).

Here at Tapad, scaling our technology strategically has been crucial to our immense growth. Over the last four years we’ve scaled our real-time bidding system to handle hundreds of thousands of queries per second. We’ve learned a number of lessons about scalability along that journey.

Here are a few concrete principles and practices we’ve distilled from those experiences:

  • Principle 1: Design for Many
  • Principle 2: Service-Oriented Architecture Beats Monolithic Application
  • Principle 3: Monitor Everything
  • Practice 1: Canary Deployments
  • Practice 2: Distributed Clock
  • Practice 3: Automate To Assist, Not To Control

Principle 1: Design For Many

There are three amounts that matter in software design: none, one, and many. We’ve learned to always design for the “many” case. This makes scaling more of a simple mechanical process, rather than a complicated project requiring re-architecting the entire codebase. The work to get there might not be as easy, but front-loading the effort pays dividends later when the application needs to scale suddenly.

For us– a guiding principle is to always consider the hypothetical ‘10x use case.’ How would our applications respond if they had to suddenly handle 10 times the current traffic? The system is only as good as the sum of its parts. The application layer might scale out easily, but if it fails because of interacting with a single database node then we haven’t achieved true scalability.

Principle 2: Service-Oriented Architecture Beats Monolithic Application

Here at Tapad we leverage a service-based architecture. The main advantages are the ability to allocate resources efficiently, and to make upgrades easier.

Imagine two systems:

  • One requires a lot of compute, not much memory.

  • One requires a lot of memory, not much compute.

If they were combined into a single system, but only the memory-intensive one needed to scale, every additional node would end up overcommitting on compute.

Virtualization solves the problem by making those overcommitted cores available to some other system, but the solution paints a misleading picture. It appears there are N systems available, but it is impossible to run all N at full capacity. If the cluster keeps enough compute available to run all N at full capacity, then money is being wasted – never a good thing.

Principle 3: Monitor Everything

Monitoring is an obvious requirement for any production system. We currently use Zabbix for alerting, and Graphite for tracking metrics over time. A typical Zabbix check looks like:

  • “Is process X running”

  • “Is node N responding to a request within M milliseconds”

  • “Node N’ is using > 80% of its available storage”

We recently switched out our Graphite backend to use Cassandra instead of whisper to better handle the volume of traffic (there are currently about half a million metrics tracked). We aggregate metrics in-memory with a customized version of Twitter’s Ostrich metrics library, and flush them to graphite every 10 seconds.

A example path for a given metric might look like this:

prd.nj1.foo01.pipeline.producer.avro_event_bar_count

We use Grafana for building real-time dashboards to track key metrics, and display those dashboards on big screens inside our office. When we switch our SOA to a more ephemeral container-based approach (e.g. running containers on Mesos with Marathon) we may have to re-think how these metrics are organized, as the instance-specific names like foo01 will end up looking like foo.43ffbdc8-ef60-11e4-97ce-005056a272f9. Graphite supports wildcards in queries, so something likesumSeries(prd.nj1.foo.*.pipeline.producer.avro_event_bar_count) could work.

Principles lay the groundwork for our decisions, but executing them successfully is equally important. Here are three best practices for working with distributed systems.

Practice 1: Canary Deployments

Some things are very challenging to test rigorously prior to a full-scale production launch. To mitigate risk, we upgrade a single node first and monitor it manually. Assuming it behaves as expected, the rest of the nodes are automatically deployed by Rundeck. Rundeck is a tool that can upgrade systems automatically and in parallel, rolling several instances at a time and moving on to the next set as soon as the upgraded nodes report a healthy status. Monitoring the canary deploy involves more than this single health check, which is why it’s upgraded out-of-band.

Practice 2: Distributed Clock

Because of clock skew and lag, there is no good concept of “now” in a distributed system.

  • Clock skew occurs because clocks are not particularly precise, even with NTP (Network Time Protocol).

  • Lag is a factor when passing messages around. If one server is cut off from the network, buffers messages for a while, then sends them after re-joining, the receiving system will get a batch of messages with relatively old timestamps. A system consuming all messages from these producers cannot be assured it has read 100% of messages up to a given time until it sees that each producer has passed the mark. This assumes that each producer guarantees ordering within its own stream, much like Kafka’s model.

Our solution is to create a sort of distributed clock, where producers record their most recent timestamps as child nodes of a particular Zookeeper “clock” node. The time is resolved by taking the minimum timestamp in that set. We also track lag relative to the resolving node’s system clock.

Practice 3: Automate To Assist, Not To Control

Our devops tools are designed to assist humans, rather than to automatically manage things, as human judgement is often required to respond to a system failure. There is risk in allowing a script to automatically failover a database or spin up new nodes. We have a pager duty rotation with primary, secondary, and tertiary engineers. The engineer can initiate the failover or spin up new nodes based on an alert. This means they are fully aware of the context.

The more well-understood a task is, the more it can be automated. One basic level of automation is using Kickstart/PXE boot. When a new VM starts up, it does a PXE boot and registers itself with Spacewalk, and Puppet handles installation of all required packages. Nothing custom is ever required, which enables us to easily build/rebuild sets of systems just by starting new VMs with particular names.

As we gain better understanding of a given system’s performance we can automate more parts. For example, scaling a shared-nothing system up and down depending on some reasonable change in traffic. For exceptional circumstances, we want a human to make the decision, assisted by all the information at their disposal.

(via HighScalability.com)