How MySQL Is Able To Scale To 200 Million QPS – MySQL Cluster

This is a guest post by Andrew Morgan, MySQL Principal Product Manager at Oracle.

MySQL_Cluster_400

The purpose of this post is to introduce MySQL Cluster – which is the in-memory, real-time, scalable, highly available version of MySQL. Before addressing the incredible claim in the title of 200 Million Queries Per Second it makes sense to go through an introduction of MySQL Cluster and its architecture in order to understand how it can be achieved.

Introduction To MySQL Cluster

MySQL Cluster is a scalable, real-time in-memory, ACID-compliant transactional database, combining 99.999% availability with the low TCO of open source. Designed around a distributed, multi-master architecture with no single point of failure, MySQL Cluster scales horizontally on commodity hardware with auto-sharding to serve read and write intensive workloads, accessed via SQL and NoSQL interfaces.

Originally designed as an embedded telecoms database for in-network applications demanding carrier-grade availability and real-time performance, MySQL Cluster has been rapidly enhanced with new feature sets that extend use cases into web, mobile and enterprise applications deployed on-premise or in the cloud, including: – High volume OLTP – Real time analytics – E-commerce, inventory management, shopping carts, payment processing, fulfillment tracking, etc. – Online Gaming – Financial trading with fraud detection – Mobile and micro-payments – Session management & caching – Feed streaming, analysis and recommendations – Content management and delivery – Communications and presence services – Subscriber/user profile management and entitlements

MySQL Cluster Architecture

While transparent to the application, under the covers, there are three types of node which collectively provide service to the application. The figure shows a simplified architecture diagram of a MySQL Cluster consisting of twelve Data Nodes split across six node groups.MySQL-Cluster-Architecture

Data Nodes are the main nodes of a MySQL Cluster. They provide the following functionality: – Storage and management of both in-memory and disk-based data – Automatic and user defined partitioning (sharding) of tables – Synchronous replication of data between data nodes – Transactions and data retrieval – Automatic fail over – Automatic resynchronization after failure for self-healing

Tables are automatically sharded across the data nodes and each data node is a master accepting write operations, making it very simple to scale write-intensive workloads across commodity nodes, with complete application transparency.

By storing and distributing data in a shared-nothing architecture, i.e. without the use of a shared-disk, and synchronously replicating data to at least one replica, if a Data Node happens to fail, there will always be another Data Node storing the same information. This allows for requests and transactions to continue to be satisfied without interruption. Any transactions which are aborted during the short (sub-second) failover window following a Data node failure are rolled back and can be re-run.

It is possible to choose how to store data; either all in memory or with some on disk (non-indexed data only). In-memory storage can be especially useful for data that is frequently changing (the active working set). Data stored in-memory is routinely check pointed to disk locally and coordinated across all Data Nodes so that the MySQL Cluster can be recovered in case of a complete system failure – such as a power outage. Disk-based data can be used to store data with less strict performance requirements, where the data set is larger than the available RAM. As with most other database servers, a page-cache is used to cache frequently used disk-based data in the Data Nodes’ memory in order to increase the performance.

Application Nodes provide connectivity from the application logic to the data nodes. Applications can access the database using SQL through one or many MySQL Servers performing the function of SQL interfaces into the data stored within a MySQL Cluster. When going through a MySQL Server, any of the standard MySQL connectors can be used , offering a wide range of access technologies. Alternatively, a high performance (C++ based) interface called NDB API can be used for extra control, better real-time behavior and greater throughput. The NDB API provides a layer through which additional NoSQL interfaces can directly access the cluster, bypassing the SQL layer, allowing for lower latency and improved developer flexibility. Existing interfaces include Java, JPA, Memcached, JavaScript with Node.js and HTTP/REST (via an Apache Module). All Application Nodes can access data from all Data Nodes and so they can fail without causing a loss of service as applications can simply use the remaining nodes.

Management Nodes are responsible for publishing the cluster configuration to all nodes in the cluster and for node management. The Management Nodes are used at startup, when a node wants to join the cluster, and when there is a system reconfiguration. Management Nodes can be stopped and restarted without affecting the ongoing execution of the Data and Application Nodes. By default, the Management Node also provides arbitration services, in the event there is a network failure which leads to a split-brain or a cluster exhibiting network-partitioning.

Achieving Scalability Through Transparent Sharding

MySQL-Cluster-Partitioning

The rows from any given table are transparently split into multiple partitions/fragments. For each fragment there will be a single data node that holds all of its data and handles all reads and writes on that data. Each data node also has a buddy and together they form a node group; the buddy holds a secondary copy of the fragment as well as a primary fragment of its own. There is synchronous 2-phase commit protocol used to ensure that when a transaction has been committed the changes are guaranteed to be stored within both data nodes.

By default, a table’s Primary Key is used as the shard key and MySQL Cluster will perform an MD5 hash on that shard key to select which fragment/partition it should be stored in. If a transaction or query needs to access data from multiple data nodes then one of the data nodes takes on the role of the transaction coordinator and delegates work to the other required data nodes; the results are then combined before they’re presented to the application. Note that it is also possible to have transactions or queries that join data from multiple shards and multiple tables – this is a big advantage over typical NoSQL data stores that implement sharding.

MySQL-Cluster-shard-keys

The best (linear) scaling is achieved when high running queries/transactions can be satisfied by a single data node (as it reduces the network delays from the inter-data node messaging). To achieve this, the application can be made distribution aware – all this really means is that the person defining the schema can override what column(s) is used for the sharding key. As an example, the figure shows a table with a composite Primary Key made up of a user-id and a service name; by choosing to just use the user-id as the sharding key, all rows for a given user in this table will always be in the same fragment. Even more powerful, is the fact that if the same user-id column is used in your other tables and you designate it as the sharding key for those too then all of the given user’s data from all tables will be in the same fragment and queries/transactions on that user can be handled within a single data node.

Use NoSQL APIs For The Fastest Possible Access To Your Data

MySQL Cluster provides many ways to access the stored data; the most common method is SQL but as can be seen in the figure, there are also many native APIs that allow the application to read and write the data directly from the database without the inefficiency and development complexity of converting to SQL and passing through a MySQL Server. These APIs exist for C++, Java, JPA, JavaScript/Node.js, http and the Memcached protocol.

NoSQL-Access-to-MySQL-Cluster-data

200 Million Queries Per Second Benchmark

There are two kinds of workloads that MySQL Cluster is designed to handle: – OLTP (On-Line Transaction Processing): Memory-optimized tables provide sub-millisecond low latency and extreme levels of concurrency for OLTP workloads while still providing durability; they can also be used alongside disk-based tables. – Ad-hoc Searches: MySQL Cluster has increased the amount of parallelism that can be used when performing a table scan – providing a significant speed-up when performing searches on un-indexed columns.

Having said that, MySQL Cluster is going to perform at its best with OLTP workloads; in particular when large numbers of queries/transactions are sent in in parallel. To this end, the flexAsynch benchmark has been used to measure how NoSQL performance scales as more data nodes are added to the cluster.

200-Million-Queries-Per-Second

The benchmark was performed with each data node running on a dedicated 56 thread Intel E5-2697 v3 (Haswell) machine. The figure shows how the throughput scaled as the number of data nodes was increased in steps from 2 up to 32 (note that MySQL Cluster currently supports a maximum of 48 data nodes). As you can see, the the scaling is virtually linear and at 32 data nodes, the throughput hits 200 Million NoSQL Queries Per Second.

Note that the latest results and a more complete description of the tests can be found at the MySQL Cluster Benchmark page.

These 200 Million QPS benchmark was run as part of MySQL Cluster 7.4 (currently the latest GA version) – you can find out more of went into that release in this MySQL Cluster 7.4 blog post or this webinar replay.

(Via HighScalability.com)

Designing For Scale – Three Principles And Three Practices From Tapad Engineering

This is a guest post by Toby Matejovsky, Director of Engineering at Tapad (@TapadEng).

Here at Tapad, scaling our technology strategically has been crucial to our immense growth. Over the last four years we’ve scaled our real-time bidding system to handle hundreds of thousands of queries per second. We’ve learned a number of lessons about scalability along that journey.

Here are a few concrete principles and practices we’ve distilled from those experiences:

  • Principle 1: Design for Many
  • Principle 2: Service-Oriented Architecture Beats Monolithic Application
  • Principle 3: Monitor Everything
  • Practice 1: Canary Deployments
  • Practice 2: Distributed Clock
  • Practice 3: Automate To Assist, Not To Control

Principle 1: Design For Many

There are three amounts that matter in software design: none, one, and many. We’ve learned to always design for the “many” case. This makes scaling more of a simple mechanical process, rather than a complicated project requiring re-architecting the entire codebase. The work to get there might not be as easy, but front-loading the effort pays dividends later when the application needs to scale suddenly.

For us– a guiding principle is to always consider the hypothetical ‘10x use case.’ How would our applications respond if they had to suddenly handle 10 times the current traffic? The system is only as good as the sum of its parts. The application layer might scale out easily, but if it fails because of interacting with a single database node then we haven’t achieved true scalability.

Principle 2: Service-Oriented Architecture Beats Monolithic Application

Here at Tapad we leverage a service-based architecture. The main advantages are the ability to allocate resources efficiently, and to make upgrades easier.

Imagine two systems:

  • One requires a lot of compute, not much memory.

  • One requires a lot of memory, not much compute.

If they were combined into a single system, but only the memory-intensive one needed to scale, every additional node would end up overcommitting on compute.

Virtualization solves the problem by making those overcommitted cores available to some other system, but the solution paints a misleading picture. It appears there are N systems available, but it is impossible to run all N at full capacity. If the cluster keeps enough compute available to run all N at full capacity, then money is being wasted – never a good thing.

Principle 3: Monitor Everything

Monitoring is an obvious requirement for any production system. We currently use Zabbix for alerting, and Graphite for tracking metrics over time. A typical Zabbix check looks like:

  • “Is process X running”

  • “Is node N responding to a request within M milliseconds”

  • “Node N’ is using > 80% of its available storage”

We recently switched out our Graphite backend to use Cassandra instead of whisper to better handle the volume of traffic (there are currently about half a million metrics tracked). We aggregate metrics in-memory with a customized version of Twitter’s Ostrich metrics library, and flush them to graphite every 10 seconds.

A example path for a given metric might look like this:

prd.nj1.foo01.pipeline.producer.avro_event_bar_count

We use Grafana for building real-time dashboards to track key metrics, and display those dashboards on big screens inside our office. When we switch our SOA to a more ephemeral container-based approach (e.g. running containers on Mesos with Marathon) we may have to re-think how these metrics are organized, as the instance-specific names like foo01 will end up looking like foo.43ffbdc8-ef60-11e4-97ce-005056a272f9. Graphite supports wildcards in queries, so something likesumSeries(prd.nj1.foo.*.pipeline.producer.avro_event_bar_count) could work.

Principles lay the groundwork for our decisions, but executing them successfully is equally important. Here are three best practices for working with distributed systems.

Practice 1: Canary Deployments

Some things are very challenging to test rigorously prior to a full-scale production launch. To mitigate risk, we upgrade a single node first and monitor it manually. Assuming it behaves as expected, the rest of the nodes are automatically deployed by Rundeck. Rundeck is a tool that can upgrade systems automatically and in parallel, rolling several instances at a time and moving on to the next set as soon as the upgraded nodes report a healthy status. Monitoring the canary deploy involves more than this single health check, which is why it’s upgraded out-of-band.

Practice 2: Distributed Clock

Because of clock skew and lag, there is no good concept of “now” in a distributed system.

  • Clock skew occurs because clocks are not particularly precise, even with NTP (Network Time Protocol).

  • Lag is a factor when passing messages around. If one server is cut off from the network, buffers messages for a while, then sends them after re-joining, the receiving system will get a batch of messages with relatively old timestamps. A system consuming all messages from these producers cannot be assured it has read 100% of messages up to a given time until it sees that each producer has passed the mark. This assumes that each producer guarantees ordering within its own stream, much like Kafka’s model.

Our solution is to create a sort of distributed clock, where producers record their most recent timestamps as child nodes of a particular Zookeeper “clock” node. The time is resolved by taking the minimum timestamp in that set. We also track lag relative to the resolving node’s system clock.

Practice 3: Automate To Assist, Not To Control

Our devops tools are designed to assist humans, rather than to automatically manage things, as human judgement is often required to respond to a system failure. There is risk in allowing a script to automatically failover a database or spin up new nodes. We have a pager duty rotation with primary, secondary, and tertiary engineers. The engineer can initiate the failover or spin up new nodes based on an alert. This means they are fully aware of the context.

The more well-understood a task is, the more it can be automated. One basic level of automation is using Kickstart/PXE boot. When a new VM starts up, it does a PXE boot and registers itself with Spacewalk, and Puppet handles installation of all required packages. Nothing custom is ever required, which enables us to easily build/rebuild sets of systems just by starting new VMs with particular names.

As we gain better understanding of a given system’s performance we can automate more parts. For example, scaling a shared-nothing system up and down depending on some reasonable change in traffic. For exceptional circumstances, we want a human to make the decision, assisted by all the information at their disposal.

(via HighScalability.com)

Elements Of Scale: Composing And Scaling Data Platforms

17330600816_c0b875ba8a_n

This is a guest repost of Ben Stopford‘s epic post on Elements of Scale: Composing and Scaling Data Platforms. A masterful tour through the evolutionary forces that shape how systems adapt to key challenges.

As software engineers we are inevitably affected by the tools we surround ourselves with. Languages, frameworks, even processes all act to shape the software we build.

Likewise databases, which have trodden a very specific path, inevitably affect the way we treat mutability and share state in our applications.

Over the last decade we’ve explored what the world might look like had we taken a different path. Small open source projects try out different ideas. These grow. They are composed with others. The platforms that result utilise suites of tools, with each component often leveraging some fundamental hardware or systemic efficiency. The result, platforms that solve problems too unwieldy or too specific to work within any single tool.

So today’s data platforms range greatly in complexity. From simple caching layers or polyglotic persistence right through to wholly integrated data pipelines. There are many paths. They go to many different places. In some of these places at least, nice things are found.

So the aim for this talk is to explain how and why some of these popular approaches work. We’ll do this by first considering the building blocks from which they are composed. These are the intuitions we’ll need to pull together the bigger stuff later on.

Slide04

 In a somewhat abstract sense, when we’re dealing with data, we’re really just arranging locality. Locality to the CPU. Locality to the other data we need. Accessing data sequentially is an important component of this. Computers are just good at sequential operations. Sequential operations can be predicted.

If you’re taking data from disk sequentially it’ll be pre-fetched into the disk buffer, the page cache and the different levels of CPU caching. This has a significant effect on performance. But it does little to help the addressing of data at random, be it in main memory, on disk or over the network. In fact pre-fetching actually hinders random workloads as the various caches and frontside bus fill with data which is unlikely to be used.

Slide05

Whilst disk is somewhat renowned for its slow performance, main memory is often assumed to simply be fast. This is not as ubiquitously true as people often think. There are one to two orders of magnitude between random and sequential main memory workloads. Use a language that manages memory for you and things generally get a whole lot worse.

Streaming data sequentially from disk can actually outperform randomly addressed main memory. So disk may not always be quite the tortoise we think it is, at least not if we can arrange sequential access. SSD’s, particularly those that utilise PCIe, further complicate the picture as they demonstrate different tradeoffs, but the caching benefits of the two access patterns remain, regardless.

Slide06

So lets imagine, as a simple thought experiment, that we want to create a very simple database. We’ll start with the basics: a file.

We want to keep writes and reads sequential, as it works well with the hardware. We can append writes to the end of the file efficiently. We can read by scanning the the file in its entirety. Any processing we wish to do can happen as the data streams through the CPU. We might filter, aggregate or even do something more complex. The world is our oyster!

read

So what about data that changes, updates etc?

We have two options. We could update the value in place. We’d need to use fixed width fields for this, but that’s ok for our little thought experiment. But update in place would mean random IO. We know that’s not good for performance.

Alternatively we could just append updates to the end of the file and deal with the superseded values when we read it back.

So we have our first tradeoff. Append to a ‘journal’ or ‘log’, and reap the benefits of sequential access. Alternatively if we use update in place we’ll be back to 300 or so writes per second, assuming we actually flush through to the underlying media.

Slide11

Now in practice of course reading the file, in its entirety, can be pretty slow. We’ll only need to get into GB’s of data and the fastest disks will take seconds. This is what a database does when it ends up table scanning.

Also we often want something more specific, say customers named “bob”, so scanning the whole file would be overkill. We need an index.

Slide13

Now there are lots of different types of indexes we could use. The simplest would be an ordered array of fixed-width values, in this case customer names, held with the corresponding offsets in the heap file. The ordered array could be searched with binary search. We could also of course use some form of tree, bitmap index, hash index, term index etc. Here we’re picturing a tree.

The thing with indexes like this is that they impose an overarching structure. The values are deliberately ordered so we can access them quickly when we want to do a read. The problem with the overarching structure is that it necessitates random writes as data flows in. So our wonderful, write optimised, append only file must be augmented by writes that scatter-gun the filesystem. This is going to slow us down.

Slide14

Anyone who has put lots of indexes on a database table will be familiar with this problem. If we are using a regular rotating hard drive, we might run 1,000s of times slower if we maintain disk integrity of an index in this way.

Luckily there are a few ways around this problem. Here we are going to discuss three. These represent three extremes, and they are in truth simplifications of the real world, but the concepts are useful when we consider larger compositions.

Slide15

Our first option is simply to place the index in main memory. This will compartmentalise the problem of random writes to RAM. The heap file stays on disk.

This is a simple and effective solution to our random writes problem. It is also one used by many real databases. MongoDB, Cassandra, Riak and many others use this type of optimisation. Often memory mapped files are used.

However, this strategy breaks down if we have far more data than we have main memory. This is particularly noticeable where there are lots of small objects. Our index would get very large. Thus our storage becomes bounded by the amount of main memory we have available. For many tasks this is fine, but if we have very large quantities of data this can be a burden.

A popular solution is to move away from having a single ‘overarching’ index. Instead we use a collection of smaller ones.

Slide16

This is a simple idea. We batch up writes in main memory, as they come in. Once we have sufficient – say a few MB’s – we sort them and write them to disk as an individual mini-index. What we end up with is a chronology of small, immutable index files.

So what was the point of doing that? Our set of immutable files can be streamed sequentially. This brings us back to a world of fast writes, without us needing to keep the whole index in memory. Nice!

Of course there is a downside to this approach too. When we read, we have to consult the many small indexes individually. So all we have really done is shift the problem of RandomIO from writes onto reads. However this turns out to be a pretty good tradeoff in many cases. It’s easier to optimise random reads than it is to optimise random writes.

Keeping a small meta-index in memory or using a Bloom Filter provides a low-memory way of evaluating whether individual index files need to be consulted during a read operation. This gives us almost the same read performance as we’d get with a single overarching index whilst retaining fast, sequential writes.

In reality we will need to purge orphaned updates occasionally too, but that can be done with nice sequentially reads and writes.

Slide17

What we have created is termed a Log Structured Merge Tree. A storage approach used in a lot of big data tools such as HBase, Cassandra, Google’s BigTable and many others. It balances write and read performance with comparatively small memory overhead.

Slide18

So we can get around the ‘random-write penalty’ by storing our indexes in memory or, alternatively, using a write-optimised index structure like LSM. There is a third approach though. Pure brute force.

Think back to our original example of the file. We could read it in its entirety. This gave us many options in terms of how we go about processing the data within it. The brute force approach is simply to hold data by column rather than by row and stream only the columns required for a query, in their entirety, through the CPU. This approach is termed Columnar or Column Oriented.

(It should be noted that there is an unfortunate nomenclature clash between true column stores and those that follow the Big Table pattern. Whilst they share some similarities, in practice they are quite different. It is wise to consider them as different things.)

Slide19

Column Orientation is another simple idea. Instead of storing data as a set of rows, appended to a single file, we split each row by column. We then store each column in a separate file.

We keep the order of the files the same, so row N has the same position (offset) in each column file. This is important because we will need to read multiple columns to service a single query, all at the same time. This means ‘joining’ columns on the fly. If the columns are in the same order we can do this in a tight loop which is very cache- and cpu-efficient. Many implementations make heavy use of vectorisation to further optimise throughput for simple join and filter operations.

Writes can leverage the benefit of being append-only. The downside is that we now have many files to update, one for every column in every individual write to the database. The most common solution to this is to batch writes in a similar way to the one used in the LSM approach above. Many columnar databases also impose an overall order to the table as a whole to increase their read performance for one chosen key.

Slide20

By splitting data by column we significantly reduce the amount of data that needs to be brought from disk, so long as our query operates on a subset of all columns.

In addition to this, data in a single column generally compresses well. We can take advantage of the data type of the column to do this, if we have knowledge of it. This means we can often use efficient, low cost encodings such as run-length, delta, bit-packed etc. For some encodings predicates can be used directly on the uncompressed stream too.

The result is a brute force approach that will work particularly well for operations that require large scans. Aggregate functions like average, max, min, group by etc are typical of this.

This is very different to using the ‘heap file & index’ approach we covered earlier. A good way to understand this is to ask yourself: what is the difference between a columnar approach like this vs a ‘heap & index’ where indexes are added to every field?

merge

The answer to this lies in the ordering of the index files. BTrees etc will be ordered by the fields they index. Joining the data in two indexes involves a streaming operation on one side, but on the other side the index lookups have to read random positions in the second index. This is generally less efficient than joining two indexes (columns) that retain the same ordering. Again we’re leveraging sequential access.

Slide21

So many of the best technologies which we may want to use as components in a data platform will leverage one of these core efficiencies to excel for a certain set of workloads.

Storing indexes in memory, over a heap file, is favoured by many NoSQL stores such as Riak, Couchbase or MongoDB as well as some relational databases. It’s a simple model that works well.

Tools designed to work with larger data sets tend to take the LSM approach. This gives them fast ingestion as well as good read performance using disk based structures. HBase, Cassandra, RocksDB, LevelDB and even Mongo now support this approach.

Column-per-file engines are used heavily in MPP databases like Redshift or Vertica as well as in the Hadoop stack using Parquet. These are engines for data crunching problems that require large traversals. Aggregation is the home ground for these tools.

Other products like Kafka apply the use of a simple, hardware efficient contract to messaging. Messaging, at it’s simplest, is just appending to a file, or reading from a predefined offset. You read messages from an offset. You go away. You come back. You read from the offset you previously finished at. All nice sequential IO.

This is different to most message oriented middleware. Specifications like JMS and AMQP require the addition of indexes like the ones discussed above, to manage selectors and session information. This means they often end up performing more like a database than a file. Jim Gray made this point famously back in his 1995 publication Queue’s are Databases.

So all these approaches favour one tradeoff or other, often keeping things simple, and hardware sympathetic, as a means of scaling.

four

So we’ve covered some of the core approaches to storage engines. In truth we made some simplifications. The real world is a little more complex. But the concepts are useful nonetheless.

Scaling a data platform is more than just storage engines though. We need to consider parallelism.

Slide27

When distributing data over many machines we have two core primitives to play with: partitioning and replication. Partitioning, sometimes called sharding, works well both for random access and brute force workloads.

If a hash-based partitioning model is used the data will be spread across a number of machines using a well-known hash function. This is similar to the way a hash table works, with each bucket being held on a different machine.

The result is that any value can be read by going directly to the machine that contains the data, via the hash function. This pattern is wonderfully scalable and is the only pattern that shows linear scalability as the number of client requests increases. Requests are isolated to a single machine. Each one will be served by just a single machine in the cluster.

Slide28

We can also use partitioning to provide parallelism over batch computations, for example aggregate functions or more complex algorithms such as those we might use for clustering or machine learning. The key difference is that we exercise all machines at the same time, in a broadcast manner. This allows us to solve a large computational problem in a much shorter time, using a divide and conquer approach.

Batch systems work well for large problems, but provide little concurrency as they tend to exhaust the resources on the cluster when they execute.

Slide29

So the two extremes are pretty simple: directed access at one end, broadcast, divide and conquer at the other. Where we need to be careful is in the middle ground that lies between the two. A good example of this is the use of secondary indexes in NoSQL stores that span many machines.

A secondary index is an index that isn’t on the primary key. This means the data will not be partitioned by the values in the index. Directed routing via a hash function is no longer an option. We have to broadcast requests to all machines. This limits concurrency. Every node must be involved in every query.

For this reason many key value stores have resisted the temptation to add secondary indexes, despite their obvious use. HBase and Voldemort are examples of this. But many others do expose them, MongoDB, Cassandra, Riak etc. This is good as secondary indexes are useful. But it’s important to understand the effect they will have on the overall concurrency of the system.

Slide30

The route out of this concurrency bottleneck is replication. You’ll probably be familiar with replication either from using async slave databases or from replicated NoSQL stores like Mongo or Cassandra.

In practice replicas can be invisible (used only for recovery), read only (adding read concurrency) or read write (adding partition tolerance). Which of these you choose will trade off against the consistency of the system. This is simply the application of CAP theorem (although cap theorem also may not be as simple as you think).

Slide31

This tradeoff with consistency* brings us to an important question. When does consistency matter?

Consistency is expensive. In the database world ACID is guaranteed by serialisability. This is essentially ensuring that all operations appear to occur in sequential order. It turns out to be a pretty expensive thing. In fact it’s prohibitive enough that many databases don’t offer it as an isolation level at all. Those that do never set it as the default.

Suffice to say that if you apply strong consistency to a system that does distributed writes you’ll likely end up in tortoise territory.

(* note the term consistency has two common usages. The C in ACID and the C in CAP. They are unfortunately not the same. I’m using the CAP definition: all nodes see the same data at the same time)

Slide33

The solution to this consistency problem is simple. Avoid it. If you can’t avoid it isolate it to as few writers and as few machines as possible.

Avoiding consistency issues is often quite easy, particularly if your data is an immutable stream of facts. A set of web logs is a good example. They have no consistency concerns as they are just facts that never change.

There are other use cases which do necessitate consistency though. Transferring money between accounts is an oft used example. Non-commutative actions such as applying discount codes is another.

But often things that appear to need consistency, in a traditional sense, may not. For example if an action can be changed from a mutation to a new set of associated facts we can avoid mutable state. Consider marking a transaction as being potentially fraudulent. We could update it directly with the new field. Alternatively we could simply use a separate stream of facts that links back to the original transaction.

Slide34

So in a data platform it’s useful to either remove the consistency requirement altogether, or at least isolate it. One way to isolate is to use the single writer principal, this gets you some of the way. Datomic is a good example of this. Another is to physically isolate the consistency requirement by splitting mutable and immutable worlds.

Approaches like Bloom/CALM extend this idea further by embracing the concept of disorder by default, imposing order only when necessary.

So those were some of the fundamental tradeoffs we need to consider. Now how to we pull these things together to build a data platform?

Slide35

A typical application architecture might look something like the below. We have a set of processes which write data to a database and read it back again. This is fine for many simple workloads. Many successful applications have been built with this pattern. But we know it works less well as throughput grows. In the application space this is a problem we might tackle with message-passing, actors, load balancing etc.

The other problem is this approach treats the database as a black box. Databases are clever software. They provide a huge wealth of features. But they provide little mechanism for scaling out of an ACID world. This is a good thing in many ways. We default to safety. But it can become an annoyance when scaling is inhibited by general guarantees which may be overkill for the requirements we have.

Slide36

The simplest route out of this is CQRS (Command Query Responsibility Segregation).

Another very simple idea. We separate read and write workloads. Writes go into something write-optimised. Something closer to a simple journal file. Reads come from something read-optimised. There are many ways to do this, be it tools like Goldengate for relational technologies or products that integrate replication internally such as Replica Sets in MongoDB.

Slide37

Many databases do something like this under the hood. Druid is a nice example. Druid is an open source, distributed, time-series, columnar analytics engine. Columnar storage works best if we input data in large blocks, as the data must be spread across many files. To get good write performance Druid stores recent data in a write optimised store. This is gradually ported over to the read optimised store over time.

When Druid is queried the query routes to both the write optimised and read optimised components. The results are combined (‘reduced’) and returned to the user. Druid uses time, marked on each record, to determine ordering.

Composite approaches like this provide the benefits of CQRS behind a single abstraction.

Slide38

Another similar approach is to use an Operational/Analytic Bridge. Read- and write-optimised views are separated using an event stream. The stream of state is retained indefinitely, so that the async views can be recomposed and augmented at a later date by replaying.

So the front section provides for synchronous reads and writes. This can be as simple as immediately reading data that was written or as complex as supporting ACID transactions.

The back end leverages asynchronicity, and the advantages of immutable state, to scale offline processing through replication, denormalisation or even completely different storage engines. The messaging-bridge, along with joining the two, allows applications to listen to the data flowing through the platform.

As a pattern this is well suited to mid-sized deployments where there is at least a partial, unavoidable requirement for a mutable view.

Slide39

If we are designing for an immutable world, it’s easier to embrace larger data sets and more complex analytics. The batch pipeline, one almost ubiquitously implemented with the Hadoop stack, is typical of this.

The beauty of the Hadoop stack comes from it’s plethora of tools. Whether you want fast read-write access, cheap storage, batch processing, high throughput messaging or tools for extracting, processing and analysing data, the Hadoop ecosystem has it all.

The batch pipeline architecture pulls data from pretty much any source, push or pull. Ingests it into HDFS then processes it to provide increasingly optimised versions of the original data. Data might be enriched, cleansed, denormalised, aggregated, moved to a read optimised format such as Parquet or loaded into a serving layer or data mart. Data can be queried and processed throughout this process.

This architecture works well for immutable data, ingested and processed in large volume. Think 100’s of TBs plus. The evolution of this architecture will be slow though. Straight-through timings are often measured in hours.

pipe

The problem with the Batch Pipeline is that we often don’t want to wait hours to get a result. A common solution is to add a streaming layer aside it. This is sometimes referred to as theLambda Architecture.

The Lambda Architecture retains a batch pipeline, like the one above, but it circumvents it with a fast streaming layer. It’s a bit like building a bypass around a busy town. The streaming layer typically uses a streaming processing tool such as Storm or Samza.

The key insight of the Lambda Architecture is that we’re often happy to have an approximate answer quickly, but we would like an accurate answer in the end.

So the streaming layer bypasses the batch layer providing the best answers it can within a streaming window. These are written to a serving layer. Later the batch pipeline computes an accurate data and overwrites the approximation.

This is a clever way to balance accuracy with responsiveness. Some implementations of this pattern suffer if the two branches end up being dual coded in stream and batch layers. But it is often possible to simply abstract this logic into common libraries that can be reused, particularly as much of this processing is often written in external libraries such as Python or R anyway. Alternatively systems like Spark provide both stream and batch functionality in one system (although the streams in Spark are really micro-batches).

So this pattern again suits high volume data platforms, say in the 100TB+ range, that want to combine streams with existing, rich, batch based analytic function.

Slide40

There is another approach to this problem of slow data pipelines. It’s sometimes termed theKappa architecture. I actually thought this name was ‘tongue in cheek’ but I’m now not so sure. Whichever it is, I’m going to use the term Stream Data Platform, which is a term in use also.

Stream Data Platform’s flip the batch pattern on its head. Rather than storing data in HDFS, and refining it with incremental batch jobs, the data is stored in a scale out messaging system, or log, such as Kafka. This becomes the system of record and the stream of data is processed in real time to create a set of tertiary views, indexes, serving layers or data marts.

This is broadly similar to the streaming layer of the Lambda architecture but with the batch layer removed. Obviously the requirement for this is that the messaging layer can store and vend very large volumes of data and there is a sufficiently powerful stream processor to handle the processing.

There is no free lunch so, for hard problems, Stream Data Platform’s will likely run no faster than an equivalent batch system, but switching the default approach from ‘store and process’ to ‘stream and process’ can provide greater opportunity for faster results.

Slide41

Finally, the Stream Data Platform approach can be applied to the problem of ‘application integration’. This is a thorny and difficult problem that has seen focus from big vendors such as Informatica, Tibco and Oracle for many years. For the most part results have been beneficial, but not transformative. Application integration remains a topic looking for a real workable solution.

Stream Data Platform’s provide an interesting potential solution to this problem. They take many of the benefits of an O/A bridge – the variety of asynchronous storage formats and ability to recreate views – but leave the consistency requirement isolated in, often existing sources:

sdp

With the system of record being a log it’s easy to enforce immutability. Products like Kafka can retain enough volume internally to be used as a historic record. This means recovery can be a process of replaying and regenerating state, rather than constantly checkpointing.

Similarly styled approaches have been taken before in a number of large institutions with tools such as Goldengate, porting data to enterprise data warehouses or more recently data lakes. They were often thwarted by a lack of throughput in the replication layer and the complexity of managing changing schemas. It seems unlikely the first problem will continue. As for the later problem though, the jury is still out.

~

So we started with locality. With sequential addressing for both reads and writes. This dominates the tradeoffs inside the components we use. We looked at scaling these components out, leveraging primitives for both sharding and replication. Finally we rebranded consistency as a problem we should isolate in the platforms we build.

But data platforms themselves are really about balancing the sweet-spots of these individual components within a single, holistic form. Incrementally restructuring. Migrating the write-optimised to the read-optimised. Moving from the constraints of consistency to the open plains of streamed, asynchronous, immutable state.

This must be done with a few things in mind. Schemas are one. Time, the peril of the distributed, asynchronous world, is another. But these problems are manageable if carefully addressed. Certainly the future is likely to include more of these things, particularly as tooling, innovated in the big data space, percolates into platforms that address broader problems, both old and new.

(via HighScalability.com)

How And Why Swiftype Moved From EC2 To Real Hardware

16165342874_c49e5f0d37_m

This is a guest post by Oleksiy Kovyrin, Head of Technical Operations at Swiftype. Swiftype currently powers search on over 100,000 websites and serves more than 1 billion queries every month.

When Matt and Quin founded Swiftype in 2012, they chose to build the company’s infrastructure using Amazon Web Services. The cloud seemed like the best fit because it was easy to add new servers without managing hardware and there were no upfront costs.

Unfortunately, while some of the services (like Route53 and S3) ended up being really useful and incredibly stable for us, the decision to use EC2 created several major problems that plagued the team during our first year.

Swiftype’s customers demand exceptional performance and always-on availability and our ability to provide that is heavily dependent on how stable and reliable our basic infrastructure is. With Amazon we experienced networking issues, hanging VM instances, unpredictable performance degradation (probably due to noisy neighbors sharing our hardware, but there was no way to know) and numerous other problems. No matter what problems we experienced, Amazon always had the same solution: pay Amazon more money by purchasing redundant or higher-end services.

The more time we spent working around the problems with EC2, the less time we could spend developing new features for our customers. We knew it was possible to make our infrastructure work in the cloud, but the effort, time and resources it would take to do so was much greater than migrating away.

After a year of fighting the cloud, we made a decision to leave EC2 for real hardware. Fortunately, this no longer means buying your own servers and racking them up in a colo. Managed hosting providers facilitate a good balance of physical hardware, virtualized instances, and rapid provisioning. Given our previous experience with hosting providers, we made the decision to choose SoftLayer. Their excellent service and infrastructure quality, provisioning speed, and customer support made them the best choice for us.

After more than a month of hard work preparing the inter-data center migration, we were able to execute the transition with zero downtime and no negative impact on our customers.The migration to real hardware resulted in enormous improvements in service stability from day one, provided a huge (~2x) performance boost to all key infrastructure components, and reduced our monthly hosting bill by ~50%.

This article will explain how we planned for and implemented the migration process, detail the performance improvements we saw after the transition, and offer insight for younger companies about when it might make sense to do the same.

Preparing For The Switch

Before the migration, we had around 40 instances on Amazon EC2. We would experience a serious production issue (instance outage, networking issue, etc) at least 2-3 times a week, sometimes daily. Once we decided to move to real hardware, we knew we had our work cut out for us because we needed to switch data centers without bringing down the service. The preparation process involved two major steps, each of which has a dedicated explanation in their own sections below:

  1. Connecting EC2 and SoftLayer. First, we built a skeleton of our new infrastructure (the smallest subset of servers to be able to run all key production services with development-level load) in SoftLayer’s data center. Once the new data center was set up, we built a system of VPN tunnels between our old and our new data centers to ensure transparent network connectivity between components in both data centers.

  2. Architectural changes to our applications. Next, we needed to make changes to our applications to make them work both in the cloud and on our new infrastructure. Once the application could live in both data centers simultaneously, we built a data-replication pipeline to make sure both the cloud infrastructure and the SoftLayer deployment (databases, search indexes, etc) were always in-sync.

Step 1: Connecting EC2 And Softlayer

One of the first things we had to do to prepare for our migration was figure out how to connect our EC2 and our SoftLayer networks together. Unfortunately the “proper” way of connecting a set of EC2 servers to another private network – using the Virtual Private Cloud (VPC) feature of EC2 – was not an option for us since we could not convert our existing set of instances into a VPC without downtime. After some consideration and careful planning, we realized that the only servers that really needed to be able to connect to each other across the data center boundary were our MongoDB nodes. Everything else we could make data center-local (Redis clusters, search servers, application clusters, etc).

image00

Since the number of instances we needed to interconnect was relatively small, we implemented a very simple solution that proved to be stable and effective for our needs:

  • Each data center had a dedicated OpenVPN server deployed in it that NAT’ed all client traffic to its private network address.

  • Each node that needed to be able to connect to another data center would set up a VPN channel there and set up local routing to properly forward all connections directed at the other DC into that tunnel.

Here are some features that made this configuration very convenient for us:

  • Since we did not control network infrastructure on either side, we could not really force all servers on either end to funnel their traffic through a central router connected to the other DC. In our solution, each VPN server decided (with the help of some automation) which traffic to route through the tunnel to ensure complete inter-DC connectivity for all of its clients.

  • Even if a VPN tunnel collapsed (surprisingly, this only happened a few times during the weeks of the project), it would only mean one server lost its outgoing connectivity to the other DC (one node dropped out of MongoDB cluster, some worker server would lose connectivity to the central Resque box, etc). None of those one-off connectivity losses would affect our infrastructure since all important infrastructure components had redundant servers on both sides.

Step 2: Architectural Changes To Our Applications

There were many small changes we had to make in our infrastructure in the weeks of preparation for the migration, but having deep understanding of each and every component of it helped us make appropriate decisions reducing a chance of a disaster during the transitional period. I would argue that infrastructure of almost any complexity could be migrated with enough time and engineering resources to carefully consider each and every network connection established between applications and backend services.

image01

Here are the main steps we had to take to ensure smooth and transparent migration:

  • All stateless services (caches, application clusters, web layer) were independently deployed on each side.

  • For each stateful backend service (database, search cluster, async queues, etc) we had to consider if we wanted (or could afford to) replicate the data to the other side or if we had to incur inter-data center latency for all connections. Relying on the VPN was always considered the last resort option and eventually we were able to reduce the amount of traffic between data centers to a few small streams of replication (mostly MongoDB) and connections to primary/main copies of services that could not be replicated.

  • If a service could be replicated, we would do that and then make application servers always use or prefer the local copy of the service instead of going to the other side.

  • For services that we could not replicate with their internal replication capabilities (like our search backends) we made the changes in our application to implement replication between data centers where asynchronous workers on each side would pull the data from their respective queues and we would always write all asynchronous jobs into queues for both data centers.

Step 3: Flipping The Switch

When both sides were ready to serve 100% of our traffic, we prepared for the final switchover by reducing our DNS TTL down to a few seconds to ensure fast traffic change.

Finally, we switched traffic to the new data center. Requests switched to the new infrastructure with zero impact on our customers. Once traffic to EC2 had drained, we disabled the old data center and forwarded all remaining connections from the old infrastructure to the new one. DNS updates take time, so some residual traffic was visible on our old servers for at least a week after the cut-off time.

A Clear Improvement: Results After Moving From EC2 To Real Hardware

Stability improved. We went from 2-3 serious outages a week (most of these were not customer-visible, since we did our best to make the system resilient to failures, but many outages would wake someone up or force someone to abandon family time) down to 1-2 outages a month, which we were able to handle more thoroughly by spending engineering resources on increasing system resilience to failures and reducing a chance of them making any impact on our customer-visible availability.

Performance improved. Thanks to the modern hardware available from SoftLayer we have seen a consistent performance increase for all of our backend services (especially IO-bound ones like databases and search clusters, but for CPU-bound app servers as well) and, what is more important, the performance was much more predictable: no sudden dips or spikes unrelated to our own software’s activity. This allowed us to start working on real capacity planning instead of throwing more slow instances at all performance problems.

Costs decreased. Last, but certainly not least for a young startup, the monthly cost of our infrastructure dropped by at least 50%, which allowed us to over-provision some of the services to improve performance and stability even further, greatly benefiting our customers.

Provisioning flexibility improved, but provisioning time increased. We are now able to exactly specify servers to meet their workload (lots of disk doesn’t mean we need a powerful CPU). However, we can no longer start new servers in minutes with an API call. SoftLayer generally can add a new server to our fleet within 1-2 hours. This is a big trade-off for some companies, but it was one that works well for Swiftype.

Conclusion

Since switching to real hardware, we’ve grown considerably – our data and query volume is up 20x – but our API performance is better than ever. Knowing exactly how our servers will perform lets us plan for growth in a way we couldn’t before.

In our experience, the cloud may be a good idea when you need to rapidly spin up new hardware, but it only works well when you’re making a huge (Netflix-level) effort to survive in it. If your goal is to build a business from day one and you do not have spare engineering resources to spend on paying the “cloud tax”, using real hardware may be a much better idea.
(via HighScalability.com)

Web Server Load-Balancing with HAProxy on Ubuntu 14.04

What is HAProxy?

HAProxy(High Availability Proxy) is an open-source load-balancer which can load balance any TCP service. HAProxy is a free, very fast and reliable solution that offers load-balancing, high-availability, and proxying for TCP and HTTP-based applications. It is particularly well suited for very high traffic web sites and powers many of the world’s most visited ones.

Since it’s existence, it has become the de-facto standard open-source load-balancer. Although it does not advertise itself, but is used widely. Below is a basic diagram of how the setup looks like:
haproxy

Installing HAProxy

I am using Ubuntu 14.04 and install it by:

apt-get install haproxy

You can check the version by:

haproxy -v

We need to enable HAProxy to be started by the init script /etc/default/haproxy. Set ENABLED option to 1 as:

ENABLED=1

To verify if this change is done properly, execute the init script of HAProxy without any parameters. You should see the following:

$ service haproxy <press_tab_key>
reload   restart  start    status   stop

HAProxy is now installed. Let us now create a setup in which we have 2(two) Apache Web Server instances and 1(one) HAProxy instance. Below is the setup information:

We will be using three systems, spawned virtually through VirtualBox:

Instance 1 – Load Balancer

Hostname: haproxy
OS: Ubuntu
Private IP: 192.168.205.15

Instance 2 – Web Server 1

Hostname: webser01
OS: Ubuntu with LAMP
Private IP: 192.168.205.16

Instance 2 – Web Server 2

Hostname: webserver02
OS: Ubuntu with LAMP
Private IP: 192.168.205.17

Here is the diagram of how the setup looks like:
haproxy_example_setup
Let us now configure HAProxy.

Configuring HAProxy

Backup the original file by renaming it:

mv /etc/haproxy/haproxy.cfg{,.original}

We’ll create our own haproxy.cfg file. Using your favorite text editor create the/etc/haproxy/haproxy.cfg file as:

global
        log /dev/log   local0
        log 127.0.0.1   local1 notice
        maxconn 4096
        user haproxy
        group haproxy
        daemon

defaults
        log     global
        mode    http
        option  httplog
        option  dontlognull
        retries 3
        option redispatch
        maxconn 2000
        contimeout     5000
        clitimeout     50000
        srvtimeout     50000

listen webfarm 0.0.0.0:80
    mode http
    stats enable
    stats uri /haproxy?stats
    balance roundrobin
    option httpclose
    option forwardfor
    server webserver01 192.168.205.16:80 check
    server webserver02 192.168.205.17:80 check

Explanation:

global
        log /dev/log   local0
        log 127.0.0.1   local1 notice
        maxconn 4096
        user haproxy
        group haproxy
        daemon

The log directive mentions a syslog server to which log messages will be sent.
The maxconn directive specifies the number of concurrent connections on the front-end. The default value is 2000 and should be tuned according to your system’s configuration.
The user and group directives changes the HAProxy process to the specified user/group. These shouldn’t be changed.

defaults
        log     global
        mode    http
        option  httplog
        option  dontlognull
        retries 3
        option redispatch
        maxconn 2000
        contimeout     5000
        clitimeout     50000
        srvtimeout     50000

The above section has the default values. The option redispatch enables session redistribution in case of connection failures. So session stickness is overriden if a web server instance goes down.
The retries directive sets the number of retries to perform on a web server instance after a connection failure.
The values to be modified are the various timeout directives. The contimeout option specifies the maximum time to wait for a connection attempt to a web server instance to succeed.
The clitimeout and srvtimeout apply when the client or server is expected to acknowledge or send data during the TCP process. HAProxy recommends setting the client and server timeouts to the same value.

listen webfarm 0.0.0.0:80
    mode http
    stats enable
    stats uri /haproxy?stats
    balance roundrobin
    option httpclose
    option forwardfor
    server webserver01 192.168.205.16:80 check
    server webserver02 192.168.205.17:80 check

Above block contains configuration for both the frontend and backend. We are configuring HAProxy to listen on port 80 for webfarm which is just a name for identifying an application.
The stats directives enable the connection statistics page. This page can viewed with the URL mentioned in stats uri so in this case, it is http://192.168.205.15/haproxy?stats a demo of this page can be viewed here.
The balance directive specifies the load balancing algorithm to use. Algorithm options available are:

  • Round Robin (roundrobin),
  • Static Round Robin (static-rr),
  • Least Connections (leastconn),
  • Source (source),
  • URI (uri) and
  • URL parameter (url_param).

Information about each algorithm can be obtained from the official documentation.

The server directive declares a backend server, the syntax is:

server <server_name> <server_address>[:port] [param*]

The name we mention here will appear in logs and alerts. There are some more parameters supported by this directive and we’ll be using the check parameter in this article. The check option enables health checks on the web server instance otherwise, the web server instance is ?always considered available.

Once you’re done configuring start the HAProxy service:

sudo service haproxy start

Testing Load-Balancing and Fail-over

We will append the server name in both the default index.html file located by default at /var/www/index.html

On the Instance 2 – Web Server 1 (webserver01 with IP- 192.168.205.16), append below line as:

sudo sh -c “echo \<h1\>Hostname: webserver01 \(192.168.205.16\)\<\/h1\> >> /var/www/index.html”

On the Instance 3 – Web Server 2 (webserver02 with IP- 192.168.205.17), append below line as:

sudo sh -c “echo \<h1\>Hostname: webserver02 \(192.168.205.17\)\<\/h1\> >> /var/www/index.html”

Now open up the web browser on local machine and browse through the haproxy IP i.e. http://192.168.205.15

Each time you refresh the tab, you’ll see the load is being distributed to each web server. Below is screenshot of my browser:

For the first time when I visit http://192.168.205.15 , I get:

webserver01

And for the second time, i.e. when I refresh the page, I get:

webserver02

You can also check the haproxy stats by visiting http://192.168.205.15/haproxy?stats

There’s more that you can do to this setup. Some ideas include:

  • take one or both web servers offline to test what happens when you access HAProxy
  • configure HAProxy to serve a custom maintenance page
  • configure the web interface so you can visually monitor HAProxy statistics
  • change the scheduler to something other than round-robin
  • configure prioritization/weights for particular servers

(via Howtoforge.com)

Monkey – Architecture of a Linux based Web Server

Introduction

Monkey is an open source project started on 2001 with the goal to learn C, the long story is here . Along this years, the code have been improved in many aspects, since nomenclatures to heavy architecture changes, all have been made for good and nowadays thanks to the community of core developers and contributors around the project, Monkey is one of the top performance web servers around, and i would claim that the best option for Embedded Linux.

Understanding the basics of a human readable protocol: HTTP

The Hyper Text Transfer Protocol is basically a language with simple grammar to communicate two components: a HTTP client and a HTTP server. In a common context, the communication starts from a client performing a request to the server and for hence the server replying back with some result for the request performed. As a result we can consider a status response plus a content or simply an error.

Each HTTP request performed by the client is composed by a request method, URI, protocol version, and optionally a bunch of headers, so described that, we can say that a server must take care of:

  • Listen for new connections
  • Accept connections
  • Once the connection is accepted, start reading the HTTP request sent by the client
  • Parse the HTTP request, understand what the client wants
  • Depending of the request type, the sever can: serve some content, close the connection because some exception, proxy back the request to somebody else, etc.
  • Close the connection or keep it opened waiting for more requests. This depends of the protocol version and client HTTP headers.

Depending of the server target, it can be implemented in many ways with different architecture strategies, so the architecture described in this post only aims to describe what have worked better for us in terms of high performance and low resources usage.

Architecture design facts

  • Monkey is a web server designed with a strong focus in Linux. It do not aims to be portable across other operating system, focusing in the top and widely used mainstream operating system allow us to put our energies and effort in one place in the best way, and of course take the most of Linux Kernel to achieve high performance.
  • Event driven: well known as asynchronous, an event driver web server aims to use non-blocking system calls to perform it works reducing the computing time in the user-space context, e.g: if we are sending a file content to a client, we do not block the whole process or thread when sending the data, instead we instruct the kernel through a system call to send N bytes from the file and just notify me where i am able to send more bytes, in the meanwhile.. i process other connections and send other pending data.
  • Embedded Friendly: our embedded context is Embedded Linux, we care a lot of resources consumption, that means that under a heavy load don’t use more than 2.5MB of memory. Even Monkey binary size is around 80KB, once is load in memory it takes like 350KB, and depending of the load, more resources can be needed.
  • Small core, flexible API: it implements a basic core to handle HTTP protocol, it exposes a flexible API through the plugin interface where is possible to hook plugins for transport layer, security, request type and event handlers.

Contexts

In Monkey, we have defined two contexts of work: process context and thread context. The process context represents the main process waiting for incoming connections and the scheduler balancing the new connection for the worker threads. The thread context belongs to each thread working the active connections:

architecture_general

The number of workers are defined in the configuration, it scale properly well in single and multi-core CPUs solutions. There is no need to set thread affinity through CPU masks, the Linux Kernel Scheduler is smart enough to assign CPU time to each worker request, by default all workers are assign to all CPUs.

From a system administrator point of view, is possible to assign each worker to a different set of CPUs, but this approach is not suggested unless we are totally aware about what the Linux scheduler does in terms of interruptions,  context switches and CPU time for Kernel and User space applications. Do it only if you can do it better than the running scheduler.

Scheduler

Before to enter in the server loop, the scheduler launch and initialize each worker, taking care of set the initial data structures and the interfaces for the interaction between the components mentioned, this stage involves the creation of a epoll(7) queue per worker. Is good to mention that each epoll(7) queue created through epoll_create(2) is managed through a specific file descriptor.

Once the workers are up and running, the next Scheduler job is to to manage the incoming connections. So for each new connection accepted, it determinate who is the lowest loaded worker and assign the connection to it. The chosen worker is the one that have less connections in its epoll(7) interface, so the scheduler  goes around the worker counters and chose one. On this specific point the scheduler have two file descriptors: the connection file descriptor returned by accept(2) and the file descriptor that represents the epoll(7) of the chosen worker. So it basically register the new file descriptor in the proper epoll(7) queue.

Workers

Each worker or thread, runs in an infinite loop through the epoll(7) interface, which is basically a Linux specific polling mechanism to register, enqueue and notify about events in file descriptors registered by the Scheduler (sockets on this case).

The worker stay in a loop waiting for events in the epoll_wait(2) system call. Every time the Scheduler register a new file descriptor, an event will be reported in the worker epoll(7) interface, and it will do same when for subsequent events such as “there is data available for read” (EPOLLIN), “now you can write to the socket” (EPOLLOUT), “connection closed” (EPOLLHUP), etc.

So for each event triggered, the worker keeps a status of the connection to determinate if is a new connection, its receiving the HTTP request, HTTP request completed, parsing the request or sending out some response. Besides events, every a fixed time of seconds set in the configuration, it checks the connections that timed out due to an incomplete request or another anomaly.

Plugins Architecture

Monkey defines three categories of API where the plugins can hook: Context, Events, Stages and Networking.

Context
Define callbacks  that can be invoked when the server is starting up, it covers the process and thread contexts described earlier.

Events
For every type of event reported in a worker loop, a plugin can implement a hook to perform specific actions:

api_events-300x106

Stages
Every new connection, enter in a stage status, so for each step of the HTTP cycle it passed along different phases, and each plugin can hook to a specific one:

api_stages-300x108

Networking
Monkey is not aware about networking, for hence it intentionally depends of a plugin that provides the transport layer, this approach allows to change from common sockets communication to encrypted one as SSL in a easy manner. The networking plugin only needs to provide the required API functions for the communication:

level_networking-300x176

Scaling up

Every time a connection have performed a successful request, this is allocated in a global list of the worker scope (implemented through a pthread_key). for each event reported, the worker needs to lookup the internal data associated to it, so the file descriptor or socket number  acts like a primary key for the search. The solution of data structure implemented for Monkey v1.2, is the use of red-black tree algorithm. This algorithm have shown to behave very fairly and scalable when handling thousands of active connections per worker, maintaining a good balance between performance and cost.

The cost of each file descriptor lookup is critical for the server performance, having a O(n) solution will work fine for a few connections but under high concurrency a O(log(n)) solution will end up providing the highest performance.

Memory Management

One of the success key to reduce overhead in a server, is to reduce as much as possible the memory allocation requests performed  to the system within the main loop. Current Monkey implementation only performs 1 memory allocation per new connection, if it needed because the incoming request will post too much data, it will allocate more memory as it needs. Other web server solutions implements caching mechanism to reduce even more the memory allocations, as our focus is Embedded Linux we focus into speed at low resources usage, and implement a caching mechanism will increase our costs. So we dropped that common approach to do not abuse of system memory, just a decision based in the target.

Linux Kernel system calls

The Linux Kernel exposes a useful of non-portable set of system calls to achieve high performance when creating networking applications. The first one is epoll(7), as described earlier this interface allow to watch a set of file descriptors for certain defined events. Similar solutions like select(2) or poll(2) do not perform so well as epoll(7) does.

When sending a static file, the old-fashioned way is to open the file, get the file descriptor and perform multiples read(2)/write(2) to write out the file content. This operation requires the Kernel to copy data between Kernel and User spaces back and forward which obviously generate an overhead. As solution, the Linux Kernel implements a Zero-Copy strategy through the system call sendfile(2). This system call do not copy data to user space, instead it allows to send it directly to other file descriptor achieving good performance reducing the latency of the old fashioned way described.

In our architecture, the Logger plugin requires to transfer data through a pipe(2)  (a unidirectional data channel that can be used for interprocess communication). A common mechanism is to use read(2)and write(2) on each end, but in a similar way as sendfile(2) works, a new system call takes place for this kind of situation called splice(2). This system call moves data from one point to other without the copy-data overhead. The main difference between sendfile(2) and splice(2), is that splice(2) requires that one end must be a pipe(2).

In my previous post, i mentioned how to usage the new Linux Kernel feature called TCP_FASTOPEN, being something very simple to implement, it requires the cooperation of both sides: the client and the server. If you have full control of your networking application (client and server), consider to use TCP_FASTOPEN, it will increase performance decreasing the TCP handshake roundtrip.

Monkey Plugins

Based in the architecture and API described, the following plugins are distributed as part of the core:

Liana: basic sockets connectivity layer

PolarSSL: provides a transport layer based in SSL

Cheetah: plugin that provides a command line interface to query the internals of a running server through a unix socket

Mandril: security layer that aims to restrict the access by URI strings or sub networks.

Dirlisting: directory listing

Logger: log writer

CGI: old fashioned CGI interface

FastCGI: provide fast-cgi support

 

Bonus track: Full HTTP Stack for web services implementation
Besides to be a common web server to serve static or dynamic content, Monkey is a full stack for the development of web applications. In order to provide an easy API for web application or web services development, we have created Duda I/O , which is an event-driven C framework for rapid development based in Monkey stack.

Duda implements a core API of pseudo-objects and provide extra features  through a packages system, everything in a friendly C API. The most relevant features supported at the moment are WebSocket, JSON, SQLite3, Redis, Base64 and SHA1.

Due to it high performance nature and open source ecosystem around, is being used in production from Embedded Linux products to Big Data solutions. The License of Duda allows to create closed-sourced services or applications and link them to Duda I/O stack at zero cost.

For more details please refer to Duda I/O main site.

Monkey organization believes in Open Source and is fully committed to create the best networking technology for different needs. If you are interested into participate as a contributor or testing our stack, feel free to reach us on our mailing lists or irc channel #monkey at irc.freenode.net.

(via Edsiper.linuxchile.cl)

AppLovin: Marketing To Mobile Consumers Worldwide By Processing 30 Billion Requests A Day

16442530119_470a4487ee_m

This is a guest post from AppLovin‘s VP of engineering, Basil Shikin, on the infrastructure of its mobile marketing platform. Major brands like Uber, Disney, Yelp and Hotels.com use AppLovin’s mobile marketing platform. It processes 30 billion requests a day and 60 terabytes of data a day.

AppLovin’s marketing platform provides marketing automation and analytics for brands who want to reach their consumers on mobile. The platform enables brands to use real-time data signals to make effective marketing decisions across one billion mobile consumers worldwide.

Core Stats

  • 30 Billion ad requests per day

  • 300,000 ad requests per second, peaking at 500,000 ad requests per second

  • 5ms average response latency

  • 3 Million events per second

  • 60TB of data processed daily

  • ~1000 servers

  • 9 data centers

  • ~40 reporting dimensions

  • 500,000 metrics data points per minute

  • 1 Pb Spark cluster

  • 15GB/s peak disk writes across all servers

  • 9GB/s peak disk reads across all servers

  • Founded in 2012, AppLovin is headquartered in Palo Alto, with offices in San Francisco, New York, London and Berlin.

 

Technology Stack

 

Third Party Services

Data Storage

  • Aerospike for user profile storage

  • Vertica for aggregated statistics and real-time reporting

  • Aggregating 350,000 rows per second and writing to Vertica at 34,000 rows per second

  • Peak 12,000 user profiles per second written to Aerospike

  • MySQL for ad data

  • Spark for offline processing and deep data analysis

  • Redis for basic caching

  • Thrift for all data storage and transfers

  • Each data point replicated in 4 data centers

  • Each service is replicated at least in 2 data centers (at most in 8)

  • Amazon Web Services used for long term data storage and backups

Core App And Services

  • Custom C/C++ Nginx module for high performance ad serving

  • Java for data processing and auxiliary services

  • PHP / Javascript for UI

  • Jenkins for continuous integration and deployment

  • Zookeeper for distributed locking

  • HAProxy and IPVS for high availability

  • Coverity for Java/C++ static code analysis

  • Checkstyle and PMD for PHP static code analysis

  • Syslog for DC-centralized log server

  • Hibernate for transaction-based services

Servers And Provisioning

  • Ubuntu

  • Cobbler for bare metal provisioning

  • Chef for configuring servers

  • Berkshelf for Chef dependencies

  • Docker with Test Kitchen for running infrastructure tests

 

Monitoring Stack

 

Server Monitoring

  • Icinga for all servers

  • ~100 custom Nagios plugins for deep server monitoring

  • 550 various probes per server

  • Graphite as data format

  • Grafana for displaying all graphs

  • PagerDuty for issue escalation

  • Smokeping for network mesh monitoring

Application Monitoring

  • VividCortex for MySQL monitoring

  • JSON /health endpoint on each service

  • Cross-DC database consistency monitoring

  • 9 4K 65” TVs for showing all graphs across the office

  • Statistical deviation monitoring

  • Fraudulent users monitoring

  • Third-party systems monitoring

  • Deployments are recorded in all graphs

Intelligent Monitoring

  • Intelligent alerting system with a feedback loop: a system that can introspect anything can learn anything

  • Third-party stats about AppLovin are also monitored

  • Alerting is a cross-team exercise: developers, ops, business, data scientists are involved

 

Architecture Overview

 

General Considerations

  • Store everything in RAM

  • If it does not fit, save it to SSD

  • L2 cache level optimizations matter

  • Use right tool for the right job

  • Architecture allows swapping any component

  • Upgrade only if an alternative is 10x better

  • Write your own components if there is nothing suitable out there

  • Replicate important data at least 3x

  • Make sure every message can be re-played without data corruption

  • Automate everything

  • Zero-copy message passing

Message Processing

  • Custom message processing system that guarantees message delivery

  • 3x replication for each message

  • Sending a message = writing to disk

  • Any service may fail, but no data are lost

  • Message dispatching system connects all components together, provides isolation and extensibility of the system

  • Cross-DC failure tolerance

Ad Serving

  • Nginx is really fast: can serve an ad in less than a millisecond

  • Keep all ad serving data in memory: read only

  • jemalloc gave a 30% speed improvement

  • Use Aerospike for user profiles: less than 1ms to fetch a profile

  • Pre-compute all ad serving data on one box and dispatch across all servers

  • Torrents are used to propagate serving data across all servers. Using Torrents resulted in 83% network load drop on the originating server compared to HTTP-based distribution.

  • mmap is used to share ad serving data across nginx processes

  • XXHash is the fastest hash function with a low collision rate. 75x faster than SHA-1 for computing checksums

  • 5% of real traffic goes to staging environment

  • Ability to run 3 A/B tests at once (20%/20%/10% of traffic for three separate tests, 50% for control)

  • A/B test results are available in regular reporting

 

Data Warehouse

  • All data are replicated

  • Running most reports takes under 2 seconds

  • Aggregation is key to allow fast reports on large amounts of data

  • Non-aggregated data for the last 48 hours is usually to resolve most issues

  • 7 days of raw logs is usually enough for debug

  • Some reports must be pre-computed

  • Always think multiple data centers: every data point goes to a multiple locations

  • Backup in S3 for catastrophic failures

  • All raw data are stored in Spark cluster

 

Team

 

Structure

  • 70 full-time employees

  • 15 developers (platform, ad serving, frontend, mobile)

  • 4 data scientists

  • 5 dev. ops.

  • Engineers in Palo Alto, CA

  • Business in San Francisco, CA

  • Offices in New York, London and Berlin

Interaction

  • HipChat to discuss most issues

  • Asana for project-based communication

  • All code is reviewed

  • Frequent group code reviews

  • Quarterly company outings

  • Regular town hall meetings with CEO

  • All engineers (junior to CTO) write code

  • Interviews are tough: offers are really rare

Development Cycle

  • Developers, business side or data science team comes up with an idea

  • Idea is reviewed and scheduled to be executed on a Monday meeting

  • Feature is implemented in a branch; development environment is used for basic testing

  • A pull request is created

  • Code is reviewed and iterated upon

  • For big features group code reviews are common

  • The feature gets merged to master

  • The feature gets deployed to staging with the next build

  • The feature gets tested on 5% real traffic

  • Statistics are examined

  • If the feature is successful it graduates to production

  • Feature is closely monitored for couple days

Avoiding Issues

  • The system is designed to handle failure of any component

  • No failure of a single component can harm ad serving or data consistency

  • Omniscient monitoring

  • Engineers watch and analyze key business reports

  • High quality of code is essential

  • Some features take multiple code reviews and iterations before graduating

  • Alarms are triggered when:

    • Stats for staging are different from production

    • FATAL errors on critical services

    • Error rate exceeds threshold

    • Any irregular activity is detected

  • data are never dropped

  • Most log lines can be easily parsed

  • Rolling back of any change is easy by design

  • After every failure: fix, make sure same thing does not happen in the future, and add monitoring

 

Lessons Learned

 

Product Development

  • Being able to swap any component easily is key to growth

  • Failures drive innovative solutions

  • Staging environment is essential: always be ready to loose 5%

  • A/B testing is essential

  • Monitor everything

  • Build intelligent alerting system

  • Engineers should be aware of business goals

  • Business people should be aware of limitations of engineering

  • Make builds and continuous integration fast. Jenkins run on a 2 bare metal servers with 32 CPU, 128G RAM and SSD drives

Infrastructure

  • Monitoring all data points is critical

  • Automation is important

  • Every component should support HA by design

  • Kernel optimizations can have up to 25% performance improvement

  • Process and IRQ balancing lead to another 20% performance improvement

  • Power saving features impact performance

  • Use SSDs as much as possible

  • When optimizing, profile everything. Flame graphs are great!

(via HighScalability.com)