POWER8 packs more than twice the big data punch

Last week, in London, the top minds in financial technology came face to face at the STAC Summit to discuss the technology challenges facing the financial industry. The impact big data is having on the financial industry was a hot topic, and many discussions revolved around having the best technology on hand for handling data faster to gain a competitive advantage.

It was in this setting that IBM shared recent benchmark results revealing that an IBM POWER8-based system server can deliver more than twice the performance of the best x86 server when running standard financial industry workloads.


In fact, IBM’s POWER8-based systems set four new performance records for financial workloads. According to the most recent published and certified STAC-A2 benchmarks, IBM POWER8-based systems deliver:

  • 3x performance over the best 2-socket solution using x86 CPUs
  • 1x performance for path scaling over the best 4-socket solution using x86 CPUs (NEW PUBLIC RECORD)
  • 7x performance over the best 2 x86 CPU and one Xeon Phi co-processor
  • 16 percent increase for asset capacity over the best 4-socket solution (NEW PUBLIC RECORD)

Why the financial industry created the STAC benchmark

STAC-A2 is a set of standard benchmarks that help estimate the relative performance of full systems running complete financial applications. This enables clients in the financial industry to evaluate how IBM POWER8-based systems will perform on real applications.

The IBM Power System S 824 delivered more than twice the performance of the best x86 based server measuredSTAC-A2 gives a much more accurate view of the expected performance as compared to micro benchmarks or simple code loops. STAC recently performed STAC-A2 Benchmark tests on a stack consisting of the STAC-A2 Pack for Linux on an IBM Power System S824 server using two IBM POWER8 Processor cards at 3.52 GHz and 1TB of DRAM, with Red Hat Enterprise Linux version 7.

And, as reported above, according to audited STAC results, the IBM Power System S824 delivered more than twice the performance of the best x86 based server measured. Those are the kind of results that matter—real results for real client challenges.

POWER8 processors are based on high performance, multi-threaded cores with each core of the Power System S824 server running up to eight simultaneous threads at 3.5 GHz. Power System S824 also has a very high bandwidth memory interface that runs at 192 GB/s per socket which is almost three times the speed of a typical x86 processor. These factors along with a balanced system structure including a large internal 8MB per core L3 are the primary reasons why financial computing workloads run significantly faster on POWER8-based systems than alternatives.

The STAC-A2 financial industry benchmarks add to the performance data that the Cabot Partners published recently. Cabot evaluated the performance of POWER8-based systems versus x86-based systems, evaluating functionality, performance and price/performance across several industries, including life sciences, financial services, oil and gas and analytics, referencing standard benchmarks as well as application oriented benchmark data.

The findings in the STAC-A2 benchmarking report position POWER8 as the ideal platform for the financial industry. This data, combined with the recently published Cabot Partners report, represents overwhelming proof that IBM POWER8-based systems take the performance lead in the financial services space (and beyond)—clearly packing a stronger punch when compared to the competition.


(via Smartercomputingblog.com)

Socket Sharding in NGINX Release 1.9.1

NGINX release 1.9.1 introduces a new feature that enables use of the SO_REUSEPORT socket option, which is available in newer versions of many operating systems, including DragonFly BSD and Linux (kernel version 3.9 and later). This option allows multiple sockets to listen on the same IP address and port combination. The kernel then load balances incoming connections across the sockets, effectively sharding the socket.

(For NGINX Plus customers, this feature will be available in Release 7, which is scheduled for later this year.)

The SO_REUSEPORT socket option has many real-world applications, such as the potential for easy rolling upgrades of services. For NGINX, it improves performance by more evenly distributing connections across workers.

As depicted in the figure, when the SO_REUSEPORT option is not enabled, a single listening socket by default notifies workers about incoming connections as the workers become available. If you include the accept_mutex off directive in the events context, the single listener instead notifies all workers about a new connection at the same time, putting them in competition to grab it. This is known as the thundering herd problem.


With the SO_REUSEPORT option enabled, there is a separate listening socket for each worker. The kernel determines which available socket (and by implication, which worker) gets the connection. While this does decrease latency and improve performance as workers accept connections, it can also mean that workers are given new connections before they are ready to handle them.


Configuring Socket Sharding

To enable the SO_REUSEPORT socket option, include the new reuseport parameter to the listen directive, as in this example:

http {
     server {
          listen       80 reuseport;
          server_name  localhost;

Including the reuseport parameter disables accept_mutex for the socket, because the lock is redundant with reuseport. It can still be worth setting accept_mutex if there are ports on which you don’t set reuseport.

Benchmarking the Performance Improvement

I ran a wrk benchmark with 4 NGINX workers on a 36-core AWS instance. To eliminate network effects, I ran both client and NGINX on localhost, and also had NGINX return the string OK instead of a file. I compared three NGINX configurations: the default (equivalent to accept_mutex on), with accept_mutex off, and with reuseport. As shown in the figure, reuseport increases requests per second by 2 to 3 times, and reduces both latency and the standard deviation for latency.

I also ran a more real-world benchmark with the client and NGINX on separate hosts and with NGINX returning an HTML file. As shown in the chart, with reuseport the decrease in latency was similar to the previous benchmark, and the standard deviation decreased even more dramatically (almost ten-fold). Other results (not shown in the chart) were also encouraging. With reuseport, the load was spread evenly across the worker processes. In the default condition (equivalent to accept_mutex on), some workers got a higher percentage of the load, and with accept_mutex off all workers experienced high load.

Latency (ms) Latency stdev (ms) CPU Load
Default 15.65 26.59 0.3
accept_mutex off 15.59 26.48 10
reuseport 12.35 3.15 0.3

(Via Nginx’s blog)

Web Crawling & Analytics Case Study – Database Vs Self Hosted Message Queuing Vs Cloud Message Queuing


The Business Problem:

 To build a repository of used car prices and identify trends based on data available from used car dealers. The solution to the problem necessarily involved building large scale crawlers to crawl & parse thousands of used car dealer websites everyday.

1st Solution: Database Driven Solution to Crawling & Parsing

Our initial Infrastructure consisted of a crawling, parsing and database insertion web services all written in Python. When the crawling web service finishes with crawling a web site it pushes the output data to the database & the parsing web service picks it from there & after parsing the data, pushes the structured data into the database.



 Problems with Database driven approach:

  • Bottlenecks: Writing the data into database and reading it back proved be a huge bottleneck and slowed down the entire process & left to high & low capacity issues in the crawling & parsing functions.

  • High Processing Cost: Due to the slow response time of many websites the parsing service would remain mostly idle which lead to a very high cost of servers & processing.

We tried to speed up the process by directly posting the data to the parsing service from crawling service but this resulted in loss of data when the parsing service was busy. Additionally, the approach presented a massive scaling challenge from read & write bottlenecks from the database.

2nd Solution: Self Hosted / Custom Deployment Using RabbitMQ


To overcome the above mentioned problems and to achieve the ability to scale we moved to a new architecture using RabbitMQ. In the new architecture crawlers and parsers were Amazon EC2 micro instances. We used Fabric to push commands to the scripts running in the instances. The crawling instance would pull the used car dealer website from the website queue, crawl the relevant pages and push output the data to a crawled pages queue.The parsing instance would pull the data from the crawled pages queue, parse them and push data into parsed data queue and a data base insertion script would transfer that data into Postgres.

 This approach speeded up the crawling and parsing cycle. Scaling was just a matter of adding more instances created from specialized AMIs.

Problems with RabbitMQ Approach:

  • Setting up, deploying & maintaining this infrastructure across hundreds of servers was a nightmare for a small team

  • We suffered data losses every time there was a deployment & maintenance issues. Due to the tradeoff we were forced to make between speed and persistence of data in RabbitMQ, there was a chance we lost some valuable data if the server hosting RabbitMQ crashed.

3rd Solution: Cloud Messaging Deployment Using IronMQ & IronWorker

The concept of having multiple queues and multiple crawlers and parsers pushing and pulling data from them gave us a chance to scale the infrastructure massively. We were looking for solutions which could help us overcome the above problems using a similar architecture but without the headache of deployment & maintenance management.

The architecture, business logic & processing methods of using Iron.io & Ironworkers were similar to RabbitMQ but without the deployment & maintenance efforts. All our code is written in python and since Iron.io supports python we could set up the crawl & parsing workers and queues within 24 hours with minimal deployment & maintenance efforts. Reading and writing data into IronMQ is fast and all the messages in IronMQ are persistent and the chance of losing data is very less.



Key Variables Database Driven Batch Processing Self Hosted – RabbitMQ Cloud Based – Iron MQ
Speed of processing a batch Slow Fast Fast
Data Loss from Server Crashes & Production Issues Low Risk Medium Risk Low Risk
Custom Programming for Queue Management High Effort Low Effort Low Effort
Set Up for Queue Management NA Medium Effort Low Effort
Deployment & Maintenance of Queues NA High Effort Low Effort

(Via DataScienceCentral.com)

How MySQL Is Able To Scale To 200 Million QPS – MySQL Cluster

This is a guest post by Andrew Morgan, MySQL Principal Product Manager at Oracle.


The purpose of this post is to introduce MySQL Cluster – which is the in-memory, real-time, scalable, highly available version of MySQL. Before addressing the incredible claim in the title of 200 Million Queries Per Second it makes sense to go through an introduction of MySQL Cluster and its architecture in order to understand how it can be achieved.

Introduction To MySQL Cluster

MySQL Cluster is a scalable, real-time in-memory, ACID-compliant transactional database, combining 99.999% availability with the low TCO of open source. Designed around a distributed, multi-master architecture with no single point of failure, MySQL Cluster scales horizontally on commodity hardware with auto-sharding to serve read and write intensive workloads, accessed via SQL and NoSQL interfaces.

Originally designed as an embedded telecoms database for in-network applications demanding carrier-grade availability and real-time performance, MySQL Cluster has been rapidly enhanced with new feature sets that extend use cases into web, mobile and enterprise applications deployed on-premise or in the cloud, including: – High volume OLTP – Real time analytics – E-commerce, inventory management, shopping carts, payment processing, fulfillment tracking, etc. – Online Gaming – Financial trading with fraud detection – Mobile and micro-payments – Session management & caching – Feed streaming, analysis and recommendations – Content management and delivery – Communications and presence services – Subscriber/user profile management and entitlements

MySQL Cluster Architecture

While transparent to the application, under the covers, there are three types of node which collectively provide service to the application. The figure shows a simplified architecture diagram of a MySQL Cluster consisting of twelve Data Nodes split across six node groups.MySQL-Cluster-Architecture

Data Nodes are the main nodes of a MySQL Cluster. They provide the following functionality: – Storage and management of both in-memory and disk-based data – Automatic and user defined partitioning (sharding) of tables – Synchronous replication of data between data nodes – Transactions and data retrieval – Automatic fail over – Automatic resynchronization after failure for self-healing

Tables are automatically sharded across the data nodes and each data node is a master accepting write operations, making it very simple to scale write-intensive workloads across commodity nodes, with complete application transparency.

By storing and distributing data in a shared-nothing architecture, i.e. without the use of a shared-disk, and synchronously replicating data to at least one replica, if a Data Node happens to fail, there will always be another Data Node storing the same information. This allows for requests and transactions to continue to be satisfied without interruption. Any transactions which are aborted during the short (sub-second) failover window following a Data node failure are rolled back and can be re-run.

It is possible to choose how to store data; either all in memory or with some on disk (non-indexed data only). In-memory storage can be especially useful for data that is frequently changing (the active working set). Data stored in-memory is routinely check pointed to disk locally and coordinated across all Data Nodes so that the MySQL Cluster can be recovered in case of a complete system failure – such as a power outage. Disk-based data can be used to store data with less strict performance requirements, where the data set is larger than the available RAM. As with most other database servers, a page-cache is used to cache frequently used disk-based data in the Data Nodes’ memory in order to increase the performance.

Application Nodes provide connectivity from the application logic to the data nodes. Applications can access the database using SQL through one or many MySQL Servers performing the function of SQL interfaces into the data stored within a MySQL Cluster. When going through a MySQL Server, any of the standard MySQL connectors can be used , offering a wide range of access technologies. Alternatively, a high performance (C++ based) interface called NDB API can be used for extra control, better real-time behavior and greater throughput. The NDB API provides a layer through which additional NoSQL interfaces can directly access the cluster, bypassing the SQL layer, allowing for lower latency and improved developer flexibility. Existing interfaces include Java, JPA, Memcached, JavaScript with Node.js and HTTP/REST (via an Apache Module). All Application Nodes can access data from all Data Nodes and so they can fail without causing a loss of service as applications can simply use the remaining nodes.

Management Nodes are responsible for publishing the cluster configuration to all nodes in the cluster and for node management. The Management Nodes are used at startup, when a node wants to join the cluster, and when there is a system reconfiguration. Management Nodes can be stopped and restarted without affecting the ongoing execution of the Data and Application Nodes. By default, the Management Node also provides arbitration services, in the event there is a network failure which leads to a split-brain or a cluster exhibiting network-partitioning.

Achieving Scalability Through Transparent Sharding


The rows from any given table are transparently split into multiple partitions/fragments. For each fragment there will be a single data node that holds all of its data and handles all reads and writes on that data. Each data node also has a buddy and together they form a node group; the buddy holds a secondary copy of the fragment as well as a primary fragment of its own. There is synchronous 2-phase commit protocol used to ensure that when a transaction has been committed the changes are guaranteed to be stored within both data nodes.

By default, a table’s Primary Key is used as the shard key and MySQL Cluster will perform an MD5 hash on that shard key to select which fragment/partition it should be stored in. If a transaction or query needs to access data from multiple data nodes then one of the data nodes takes on the role of the transaction coordinator and delegates work to the other required data nodes; the results are then combined before they’re presented to the application. Note that it is also possible to have transactions or queries that join data from multiple shards and multiple tables – this is a big advantage over typical NoSQL data stores that implement sharding.


The best (linear) scaling is achieved when high running queries/transactions can be satisfied by a single data node (as it reduces the network delays from the inter-data node messaging). To achieve this, the application can be made distribution aware – all this really means is that the person defining the schema can override what column(s) is used for the sharding key. As an example, the figure shows a table with a composite Primary Key made up of a user-id and a service name; by choosing to just use the user-id as the sharding key, all rows for a given user in this table will always be in the same fragment. Even more powerful, is the fact that if the same user-id column is used in your other tables and you designate it as the sharding key for those too then all of the given user’s data from all tables will be in the same fragment and queries/transactions on that user can be handled within a single data node.

Use NoSQL APIs For The Fastest Possible Access To Your Data

MySQL Cluster provides many ways to access the stored data; the most common method is SQL but as can be seen in the figure, there are also many native APIs that allow the application to read and write the data directly from the database without the inefficiency and development complexity of converting to SQL and passing through a MySQL Server. These APIs exist for C++, Java, JPA, JavaScript/Node.js, http and the Memcached protocol.


200 Million Queries Per Second Benchmark

There are two kinds of workloads that MySQL Cluster is designed to handle: – OLTP (On-Line Transaction Processing): Memory-optimized tables provide sub-millisecond low latency and extreme levels of concurrency for OLTP workloads while still providing durability; they can also be used alongside disk-based tables. – Ad-hoc Searches: MySQL Cluster has increased the amount of parallelism that can be used when performing a table scan – providing a significant speed-up when performing searches on un-indexed columns.

Having said that, MySQL Cluster is going to perform at its best with OLTP workloads; in particular when large numbers of queries/transactions are sent in in parallel. To this end, the flexAsynch benchmark has been used to measure how NoSQL performance scales as more data nodes are added to the cluster.


The benchmark was performed with each data node running on a dedicated 56 thread Intel E5-2697 v3 (Haswell) machine. The figure shows how the throughput scaled as the number of data nodes was increased in steps from 2 up to 32 (note that MySQL Cluster currently supports a maximum of 48 data nodes). As you can see, the the scaling is virtually linear and at 32 data nodes, the throughput hits 200 Million NoSQL Queries Per Second.

Note that the latest results and a more complete description of the tests can be found at the MySQL Cluster Benchmark page.

These 200 Million QPS benchmark was run as part of MySQL Cluster 7.4 (currently the latest GA version) – you can find out more of went into that release in this MySQL Cluster 7.4 blog post or this webinar replay.

(Via HighScalability.com)

Designing For Scale – Three Principles And Three Practices From Tapad Engineering

This is a guest post by Toby Matejovsky, Director of Engineering at Tapad (@TapadEng).

Here at Tapad, scaling our technology strategically has been crucial to our immense growth. Over the last four years we’ve scaled our real-time bidding system to handle hundreds of thousands of queries per second. We’ve learned a number of lessons about scalability along that journey.

Here are a few concrete principles and practices we’ve distilled from those experiences:

  • Principle 1: Design for Many
  • Principle 2: Service-Oriented Architecture Beats Monolithic Application
  • Principle 3: Monitor Everything
  • Practice 1: Canary Deployments
  • Practice 2: Distributed Clock
  • Practice 3: Automate To Assist, Not To Control

Principle 1: Design For Many

There are three amounts that matter in software design: none, one, and many. We’ve learned to always design for the “many” case. This makes scaling more of a simple mechanical process, rather than a complicated project requiring re-architecting the entire codebase. The work to get there might not be as easy, but front-loading the effort pays dividends later when the application needs to scale suddenly.

For us– a guiding principle is to always consider the hypothetical ‘10x use case.’ How would our applications respond if they had to suddenly handle 10 times the current traffic? The system is only as good as the sum of its parts. The application layer might scale out easily, but if it fails because of interacting with a single database node then we haven’t achieved true scalability.

Principle 2: Service-Oriented Architecture Beats Monolithic Application

Here at Tapad we leverage a service-based architecture. The main advantages are the ability to allocate resources efficiently, and to make upgrades easier.

Imagine two systems:

  • One requires a lot of compute, not much memory.

  • One requires a lot of memory, not much compute.

If they were combined into a single system, but only the memory-intensive one needed to scale, every additional node would end up overcommitting on compute.

Virtualization solves the problem by making those overcommitted cores available to some other system, but the solution paints a misleading picture. It appears there are N systems available, but it is impossible to run all N at full capacity. If the cluster keeps enough compute available to run all N at full capacity, then money is being wasted – never a good thing.

Principle 3: Monitor Everything

Monitoring is an obvious requirement for any production system. We currently use Zabbix for alerting, and Graphite for tracking metrics over time. A typical Zabbix check looks like:

  • “Is process X running”

  • “Is node N responding to a request within M milliseconds”

  • “Node N’ is using > 80% of its available storage”

We recently switched out our Graphite backend to use Cassandra instead of whisper to better handle the volume of traffic (there are currently about half a million metrics tracked). We aggregate metrics in-memory with a customized version of Twitter’s Ostrich metrics library, and flush them to graphite every 10 seconds.

A example path for a given metric might look like this:


We use Grafana for building real-time dashboards to track key metrics, and display those dashboards on big screens inside our office. When we switch our SOA to a more ephemeral container-based approach (e.g. running containers on Mesos with Marathon) we may have to re-think how these metrics are organized, as the instance-specific names like foo01 will end up looking like foo.43ffbdc8-ef60-11e4-97ce-005056a272f9. Graphite supports wildcards in queries, so something likesumSeries(prd.nj1.foo.*.pipeline.producer.avro_event_bar_count) could work.

Principles lay the groundwork for our decisions, but executing them successfully is equally important. Here are three best practices for working with distributed systems.

Practice 1: Canary Deployments

Some things are very challenging to test rigorously prior to a full-scale production launch. To mitigate risk, we upgrade a single node first and monitor it manually. Assuming it behaves as expected, the rest of the nodes are automatically deployed by Rundeck. Rundeck is a tool that can upgrade systems automatically and in parallel, rolling several instances at a time and moving on to the next set as soon as the upgraded nodes report a healthy status. Monitoring the canary deploy involves more than this single health check, which is why it’s upgraded out-of-band.

Practice 2: Distributed Clock

Because of clock skew and lag, there is no good concept of “now” in a distributed system.

  • Clock skew occurs because clocks are not particularly precise, even with NTP (Network Time Protocol).

  • Lag is a factor when passing messages around. If one server is cut off from the network, buffers messages for a while, then sends them after re-joining, the receiving system will get a batch of messages with relatively old timestamps. A system consuming all messages from these producers cannot be assured it has read 100% of messages up to a given time until it sees that each producer has passed the mark. This assumes that each producer guarantees ordering within its own stream, much like Kafka’s model.

Our solution is to create a sort of distributed clock, where producers record their most recent timestamps as child nodes of a particular Zookeeper “clock” node. The time is resolved by taking the minimum timestamp in that set. We also track lag relative to the resolving node’s system clock.

Practice 3: Automate To Assist, Not To Control

Our devops tools are designed to assist humans, rather than to automatically manage things, as human judgement is often required to respond to a system failure. There is risk in allowing a script to automatically failover a database or spin up new nodes. We have a pager duty rotation with primary, secondary, and tertiary engineers. The engineer can initiate the failover or spin up new nodes based on an alert. This means they are fully aware of the context.

The more well-understood a task is, the more it can be automated. One basic level of automation is using Kickstart/PXE boot. When a new VM starts up, it does a PXE boot and registers itself with Spacewalk, and Puppet handles installation of all required packages. Nothing custom is ever required, which enables us to easily build/rebuild sets of systems just by starting new VMs with particular names.

As we gain better understanding of a given system’s performance we can automate more parts. For example, scaling a shared-nothing system up and down depending on some reasonable change in traffic. For exceptional circumstances, we want a human to make the decision, assisted by all the information at their disposal.

(via HighScalability.com)

Elements Of Scale: Composing And Scaling Data Platforms


This is a guest repost of Ben Stopford‘s epic post on Elements of Scale: Composing and Scaling Data Platforms. A masterful tour through the evolutionary forces that shape how systems adapt to key challenges.

As software engineers we are inevitably affected by the tools we surround ourselves with. Languages, frameworks, even processes all act to shape the software we build.

Likewise databases, which have trodden a very specific path, inevitably affect the way we treat mutability and share state in our applications.

Over the last decade we’ve explored what the world might look like had we taken a different path. Small open source projects try out different ideas. These grow. They are composed with others. The platforms that result utilise suites of tools, with each component often leveraging some fundamental hardware or systemic efficiency. The result, platforms that solve problems too unwieldy or too specific to work within any single tool.

So today’s data platforms range greatly in complexity. From simple caching layers or polyglotic persistence right through to wholly integrated data pipelines. There are many paths. They go to many different places. In some of these places at least, nice things are found.

So the aim for this talk is to explain how and why some of these popular approaches work. We’ll do this by first considering the building blocks from which they are composed. These are the intuitions we’ll need to pull together the bigger stuff later on.


 In a somewhat abstract sense, when we’re dealing with data, we’re really just arranging locality. Locality to the CPU. Locality to the other data we need. Accessing data sequentially is an important component of this. Computers are just good at sequential operations. Sequential operations can be predicted.

If you’re taking data from disk sequentially it’ll be pre-fetched into the disk buffer, the page cache and the different levels of CPU caching. This has a significant effect on performance. But it does little to help the addressing of data at random, be it in main memory, on disk or over the network. In fact pre-fetching actually hinders random workloads as the various caches and frontside bus fill with data which is unlikely to be used.


Whilst disk is somewhat renowned for its slow performance, main memory is often assumed to simply be fast. This is not as ubiquitously true as people often think. There are one to two orders of magnitude between random and sequential main memory workloads. Use a language that manages memory for you and things generally get a whole lot worse.

Streaming data sequentially from disk can actually outperform randomly addressed main memory. So disk may not always be quite the tortoise we think it is, at least not if we can arrange sequential access. SSD’s, particularly those that utilise PCIe, further complicate the picture as they demonstrate different tradeoffs, but the caching benefits of the two access patterns remain, regardless.


So lets imagine, as a simple thought experiment, that we want to create a very simple database. We’ll start with the basics: a file.

We want to keep writes and reads sequential, as it works well with the hardware. We can append writes to the end of the file efficiently. We can read by scanning the the file in its entirety. Any processing we wish to do can happen as the data streams through the CPU. We might filter, aggregate or even do something more complex. The world is our oyster!


So what about data that changes, updates etc?

We have two options. We could update the value in place. We’d need to use fixed width fields for this, but that’s ok for our little thought experiment. But update in place would mean random IO. We know that’s not good for performance.

Alternatively we could just append updates to the end of the file and deal with the superseded values when we read it back.

So we have our first tradeoff. Append to a ‘journal’ or ‘log’, and reap the benefits of sequential access. Alternatively if we use update in place we’ll be back to 300 or so writes per second, assuming we actually flush through to the underlying media.


Now in practice of course reading the file, in its entirety, can be pretty slow. We’ll only need to get into GB’s of data and the fastest disks will take seconds. This is what a database does when it ends up table scanning.

Also we often want something more specific, say customers named “bob”, so scanning the whole file would be overkill. We need an index.


Now there are lots of different types of indexes we could use. The simplest would be an ordered array of fixed-width values, in this case customer names, held with the corresponding offsets in the heap file. The ordered array could be searched with binary search. We could also of course use some form of tree, bitmap index, hash index, term index etc. Here we’re picturing a tree.

The thing with indexes like this is that they impose an overarching structure. The values are deliberately ordered so we can access them quickly when we want to do a read. The problem with the overarching structure is that it necessitates random writes as data flows in. So our wonderful, write optimised, append only file must be augmented by writes that scatter-gun the filesystem. This is going to slow us down.


Anyone who has put lots of indexes on a database table will be familiar with this problem. If we are using a regular rotating hard drive, we might run 1,000s of times slower if we maintain disk integrity of an index in this way.

Luckily there are a few ways around this problem. Here we are going to discuss three. These represent three extremes, and they are in truth simplifications of the real world, but the concepts are useful when we consider larger compositions.


Our first option is simply to place the index in main memory. This will compartmentalise the problem of random writes to RAM. The heap file stays on disk.

This is a simple and effective solution to our random writes problem. It is also one used by many real databases. MongoDB, Cassandra, Riak and many others use this type of optimisation. Often memory mapped files are used.

However, this strategy breaks down if we have far more data than we have main memory. This is particularly noticeable where there are lots of small objects. Our index would get very large. Thus our storage becomes bounded by the amount of main memory we have available. For many tasks this is fine, but if we have very large quantities of data this can be a burden.

A popular solution is to move away from having a single ‘overarching’ index. Instead we use a collection of smaller ones.


This is a simple idea. We batch up writes in main memory, as they come in. Once we have sufficient – say a few MB’s – we sort them and write them to disk as an individual mini-index. What we end up with is a chronology of small, immutable index files.

So what was the point of doing that? Our set of immutable files can be streamed sequentially. This brings us back to a world of fast writes, without us needing to keep the whole index in memory. Nice!

Of course there is a downside to this approach too. When we read, we have to consult the many small indexes individually. So all we have really done is shift the problem of RandomIO from writes onto reads. However this turns out to be a pretty good tradeoff in many cases. It’s easier to optimise random reads than it is to optimise random writes.

Keeping a small meta-index in memory or using a Bloom Filter provides a low-memory way of evaluating whether individual index files need to be consulted during a read operation. This gives us almost the same read performance as we’d get with a single overarching index whilst retaining fast, sequential writes.

In reality we will need to purge orphaned updates occasionally too, but that can be done with nice sequentially reads and writes.


What we have created is termed a Log Structured Merge Tree. A storage approach used in a lot of big data tools such as HBase, Cassandra, Google’s BigTable and many others. It balances write and read performance with comparatively small memory overhead.


So we can get around the ‘random-write penalty’ by storing our indexes in memory or, alternatively, using a write-optimised index structure like LSM. There is a third approach though. Pure brute force.

Think back to our original example of the file. We could read it in its entirety. This gave us many options in terms of how we go about processing the data within it. The brute force approach is simply to hold data by column rather than by row and stream only the columns required for a query, in their entirety, through the CPU. This approach is termed Columnar or Column Oriented.

(It should be noted that there is an unfortunate nomenclature clash between true column stores and those that follow the Big Table pattern. Whilst they share some similarities, in practice they are quite different. It is wise to consider them as different things.)


Column Orientation is another simple idea. Instead of storing data as a set of rows, appended to a single file, we split each row by column. We then store each column in a separate file.

We keep the order of the files the same, so row N has the same position (offset) in each column file. This is important because we will need to read multiple columns to service a single query, all at the same time. This means ‘joining’ columns on the fly. If the columns are in the same order we can do this in a tight loop which is very cache- and cpu-efficient. Many implementations make heavy use of vectorisation to further optimise throughput for simple join and filter operations.

Writes can leverage the benefit of being append-only. The downside is that we now have many files to update, one for every column in every individual write to the database. The most common solution to this is to batch writes in a similar way to the one used in the LSM approach above. Many columnar databases also impose an overall order to the table as a whole to increase their read performance for one chosen key.


By splitting data by column we significantly reduce the amount of data that needs to be brought from disk, so long as our query operates on a subset of all columns.

In addition to this, data in a single column generally compresses well. We can take advantage of the data type of the column to do this, if we have knowledge of it. This means we can often use efficient, low cost encodings such as run-length, delta, bit-packed etc. For some encodings predicates can be used directly on the uncompressed stream too.

The result is a brute force approach that will work particularly well for operations that require large scans. Aggregate functions like average, max, min, group by etc are typical of this.

This is very different to using the ‘heap file & index’ approach we covered earlier. A good way to understand this is to ask yourself: what is the difference between a columnar approach like this vs a ‘heap & index’ where indexes are added to every field?


The answer to this lies in the ordering of the index files. BTrees etc will be ordered by the fields they index. Joining the data in two indexes involves a streaming operation on one side, but on the other side the index lookups have to read random positions in the second index. This is generally less efficient than joining two indexes (columns) that retain the same ordering. Again we’re leveraging sequential access.


So many of the best technologies which we may want to use as components in a data platform will leverage one of these core efficiencies to excel for a certain set of workloads.

Storing indexes in memory, over a heap file, is favoured by many NoSQL stores such as Riak, Couchbase or MongoDB as well as some relational databases. It’s a simple model that works well.

Tools designed to work with larger data sets tend to take the LSM approach. This gives them fast ingestion as well as good read performance using disk based structures. HBase, Cassandra, RocksDB, LevelDB and even Mongo now support this approach.

Column-per-file engines are used heavily in MPP databases like Redshift or Vertica as well as in the Hadoop stack using Parquet. These are engines for data crunching problems that require large traversals. Aggregation is the home ground for these tools.

Other products like Kafka apply the use of a simple, hardware efficient contract to messaging. Messaging, at it’s simplest, is just appending to a file, or reading from a predefined offset. You read messages from an offset. You go away. You come back. You read from the offset you previously finished at. All nice sequential IO.

This is different to most message oriented middleware. Specifications like JMS and AMQP require the addition of indexes like the ones discussed above, to manage selectors and session information. This means they often end up performing more like a database than a file. Jim Gray made this point famously back in his 1995 publication Queue’s are Databases.

So all these approaches favour one tradeoff or other, often keeping things simple, and hardware sympathetic, as a means of scaling.


So we’ve covered some of the core approaches to storage engines. In truth we made some simplifications. The real world is a little more complex. But the concepts are useful nonetheless.

Scaling a data platform is more than just storage engines though. We need to consider parallelism.


When distributing data over many machines we have two core primitives to play with: partitioning and replication. Partitioning, sometimes called sharding, works well both for random access and brute force workloads.

If a hash-based partitioning model is used the data will be spread across a number of machines using a well-known hash function. This is similar to the way a hash table works, with each bucket being held on a different machine.

The result is that any value can be read by going directly to the machine that contains the data, via the hash function. This pattern is wonderfully scalable and is the only pattern that shows linear scalability as the number of client requests increases. Requests are isolated to a single machine. Each one will be served by just a single machine in the cluster.


We can also use partitioning to provide parallelism over batch computations, for example aggregate functions or more complex algorithms such as those we might use for clustering or machine learning. The key difference is that we exercise all machines at the same time, in a broadcast manner. This allows us to solve a large computational problem in a much shorter time, using a divide and conquer approach.

Batch systems work well for large problems, but provide little concurrency as they tend to exhaust the resources on the cluster when they execute.


So the two extremes are pretty simple: directed access at one end, broadcast, divide and conquer at the other. Where we need to be careful is in the middle ground that lies between the two. A good example of this is the use of secondary indexes in NoSQL stores that span many machines.

A secondary index is an index that isn’t on the primary key. This means the data will not be partitioned by the values in the index. Directed routing via a hash function is no longer an option. We have to broadcast requests to all machines. This limits concurrency. Every node must be involved in every query.

For this reason many key value stores have resisted the temptation to add secondary indexes, despite their obvious use. HBase and Voldemort are examples of this. But many others do expose them, MongoDB, Cassandra, Riak etc. This is good as secondary indexes are useful. But it’s important to understand the effect they will have on the overall concurrency of the system.


The route out of this concurrency bottleneck is replication. You’ll probably be familiar with replication either from using async slave databases or from replicated NoSQL stores like Mongo or Cassandra.

In practice replicas can be invisible (used only for recovery), read only (adding read concurrency) or read write (adding partition tolerance). Which of these you choose will trade off against the consistency of the system. This is simply the application of CAP theorem (although cap theorem also may not be as simple as you think).


This tradeoff with consistency* brings us to an important question. When does consistency matter?

Consistency is expensive. In the database world ACID is guaranteed by serialisability. This is essentially ensuring that all operations appear to occur in sequential order. It turns out to be a pretty expensive thing. In fact it’s prohibitive enough that many databases don’t offer it as an isolation level at all. Those that do never set it as the default.

Suffice to say that if you apply strong consistency to a system that does distributed writes you’ll likely end up in tortoise territory.

(* note the term consistency has two common usages. The C in ACID and the C in CAP. They are unfortunately not the same. I’m using the CAP definition: all nodes see the same data at the same time)


The solution to this consistency problem is simple. Avoid it. If you can’t avoid it isolate it to as few writers and as few machines as possible.

Avoiding consistency issues is often quite easy, particularly if your data is an immutable stream of facts. A set of web logs is a good example. They have no consistency concerns as they are just facts that never change.

There are other use cases which do necessitate consistency though. Transferring money between accounts is an oft used example. Non-commutative actions such as applying discount codes is another.

But often things that appear to need consistency, in a traditional sense, may not. For example if an action can be changed from a mutation to a new set of associated facts we can avoid mutable state. Consider marking a transaction as being potentially fraudulent. We could update it directly with the new field. Alternatively we could simply use a separate stream of facts that links back to the original transaction.


So in a data platform it’s useful to either remove the consistency requirement altogether, or at least isolate it. One way to isolate is to use the single writer principal, this gets you some of the way. Datomic is a good example of this. Another is to physically isolate the consistency requirement by splitting mutable and immutable worlds.

Approaches like Bloom/CALM extend this idea further by embracing the concept of disorder by default, imposing order only when necessary.

So those were some of the fundamental tradeoffs we need to consider. Now how to we pull these things together to build a data platform?


A typical application architecture might look something like the below. We have a set of processes which write data to a database and read it back again. This is fine for many simple workloads. Many successful applications have been built with this pattern. But we know it works less well as throughput grows. In the application space this is a problem we might tackle with message-passing, actors, load balancing etc.

The other problem is this approach treats the database as a black box. Databases are clever software. They provide a huge wealth of features. But they provide little mechanism for scaling out of an ACID world. This is a good thing in many ways. We default to safety. But it can become an annoyance when scaling is inhibited by general guarantees which may be overkill for the requirements we have.


The simplest route out of this is CQRS (Command Query Responsibility Segregation).

Another very simple idea. We separate read and write workloads. Writes go into something write-optimised. Something closer to a simple journal file. Reads come from something read-optimised. There are many ways to do this, be it tools like Goldengate for relational technologies or products that integrate replication internally such as Replica Sets in MongoDB.


Many databases do something like this under the hood. Druid is a nice example. Druid is an open source, distributed, time-series, columnar analytics engine. Columnar storage works best if we input data in large blocks, as the data must be spread across many files. To get good write performance Druid stores recent data in a write optimised store. This is gradually ported over to the read optimised store over time.

When Druid is queried the query routes to both the write optimised and read optimised components. The results are combined (‘reduced’) and returned to the user. Druid uses time, marked on each record, to determine ordering.

Composite approaches like this provide the benefits of CQRS behind a single abstraction.


Another similar approach is to use an Operational/Analytic Bridge. Read- and write-optimised views are separated using an event stream. The stream of state is retained indefinitely, so that the async views can be recomposed and augmented at a later date by replaying.

So the front section provides for synchronous reads and writes. This can be as simple as immediately reading data that was written or as complex as supporting ACID transactions.

The back end leverages asynchronicity, and the advantages of immutable state, to scale offline processing through replication, denormalisation or even completely different storage engines. The messaging-bridge, along with joining the two, allows applications to listen to the data flowing through the platform.

As a pattern this is well suited to mid-sized deployments where there is at least a partial, unavoidable requirement for a mutable view.


If we are designing for an immutable world, it’s easier to embrace larger data sets and more complex analytics. The batch pipeline, one almost ubiquitously implemented with the Hadoop stack, is typical of this.

The beauty of the Hadoop stack comes from it’s plethora of tools. Whether you want fast read-write access, cheap storage, batch processing, high throughput messaging or tools for extracting, processing and analysing data, the Hadoop ecosystem has it all.

The batch pipeline architecture pulls data from pretty much any source, push or pull. Ingests it into HDFS then processes it to provide increasingly optimised versions of the original data. Data might be enriched, cleansed, denormalised, aggregated, moved to a read optimised format such as Parquet or loaded into a serving layer or data mart. Data can be queried and processed throughout this process.

This architecture works well for immutable data, ingested and processed in large volume. Think 100’s of TBs plus. The evolution of this architecture will be slow though. Straight-through timings are often measured in hours.


The problem with the Batch Pipeline is that we often don’t want to wait hours to get a result. A common solution is to add a streaming layer aside it. This is sometimes referred to as theLambda Architecture.

The Lambda Architecture retains a batch pipeline, like the one above, but it circumvents it with a fast streaming layer. It’s a bit like building a bypass around a busy town. The streaming layer typically uses a streaming processing tool such as Storm or Samza.

The key insight of the Lambda Architecture is that we’re often happy to have an approximate answer quickly, but we would like an accurate answer in the end.

So the streaming layer bypasses the batch layer providing the best answers it can within a streaming window. These are written to a serving layer. Later the batch pipeline computes an accurate data and overwrites the approximation.

This is a clever way to balance accuracy with responsiveness. Some implementations of this pattern suffer if the two branches end up being dual coded in stream and batch layers. But it is often possible to simply abstract this logic into common libraries that can be reused, particularly as much of this processing is often written in external libraries such as Python or R anyway. Alternatively systems like Spark provide both stream and batch functionality in one system (although the streams in Spark are really micro-batches).

So this pattern again suits high volume data platforms, say in the 100TB+ range, that want to combine streams with existing, rich, batch based analytic function.


There is another approach to this problem of slow data pipelines. It’s sometimes termed theKappa architecture. I actually thought this name was ‘tongue in cheek’ but I’m now not so sure. Whichever it is, I’m going to use the term Stream Data Platform, which is a term in use also.

Stream Data Platform’s flip the batch pattern on its head. Rather than storing data in HDFS, and refining it with incremental batch jobs, the data is stored in a scale out messaging system, or log, such as Kafka. This becomes the system of record and the stream of data is processed in real time to create a set of tertiary views, indexes, serving layers or data marts.

This is broadly similar to the streaming layer of the Lambda architecture but with the batch layer removed. Obviously the requirement for this is that the messaging layer can store and vend very large volumes of data and there is a sufficiently powerful stream processor to handle the processing.

There is no free lunch so, for hard problems, Stream Data Platform’s will likely run no faster than an equivalent batch system, but switching the default approach from ‘store and process’ to ‘stream and process’ can provide greater opportunity for faster results.


Finally, the Stream Data Platform approach can be applied to the problem of ‘application integration’. This is a thorny and difficult problem that has seen focus from big vendors such as Informatica, Tibco and Oracle for many years. For the most part results have been beneficial, but not transformative. Application integration remains a topic looking for a real workable solution.

Stream Data Platform’s provide an interesting potential solution to this problem. They take many of the benefits of an O/A bridge – the variety of asynchronous storage formats and ability to recreate views – but leave the consistency requirement isolated in, often existing sources:


With the system of record being a log it’s easy to enforce immutability. Products like Kafka can retain enough volume internally to be used as a historic record. This means recovery can be a process of replaying and regenerating state, rather than constantly checkpointing.

Similarly styled approaches have been taken before in a number of large institutions with tools such as Goldengate, porting data to enterprise data warehouses or more recently data lakes. They were often thwarted by a lack of throughput in the replication layer and the complexity of managing changing schemas. It seems unlikely the first problem will continue. As for the later problem though, the jury is still out.


So we started with locality. With sequential addressing for both reads and writes. This dominates the tradeoffs inside the components we use. We looked at scaling these components out, leveraging primitives for both sharding and replication. Finally we rebranded consistency as a problem we should isolate in the platforms we build.

But data platforms themselves are really about balancing the sweet-spots of these individual components within a single, holistic form. Incrementally restructuring. Migrating the write-optimised to the read-optimised. Moving from the constraints of consistency to the open plains of streamed, asynchronous, immutable state.

This must be done with a few things in mind. Schemas are one. Time, the peril of the distributed, asynchronous world, is another. But these problems are manageable if carefully addressed. Certainly the future is likely to include more of these things, particularly as tooling, innovated in the big data space, percolates into platforms that address broader problems, both old and new.

(via HighScalability.com)

How And Why Swiftype Moved From EC2 To Real Hardware


This is a guest post by Oleksiy Kovyrin, Head of Technical Operations at Swiftype. Swiftype currently powers search on over 100,000 websites and serves more than 1 billion queries every month.

When Matt and Quin founded Swiftype in 2012, they chose to build the company’s infrastructure using Amazon Web Services. The cloud seemed like the best fit because it was easy to add new servers without managing hardware and there were no upfront costs.

Unfortunately, while some of the services (like Route53 and S3) ended up being really useful and incredibly stable for us, the decision to use EC2 created several major problems that plagued the team during our first year.

Swiftype’s customers demand exceptional performance and always-on availability and our ability to provide that is heavily dependent on how stable and reliable our basic infrastructure is. With Amazon we experienced networking issues, hanging VM instances, unpredictable performance degradation (probably due to noisy neighbors sharing our hardware, but there was no way to know) and numerous other problems. No matter what problems we experienced, Amazon always had the same solution: pay Amazon more money by purchasing redundant or higher-end services.

The more time we spent working around the problems with EC2, the less time we could spend developing new features for our customers. We knew it was possible to make our infrastructure work in the cloud, but the effort, time and resources it would take to do so was much greater than migrating away.

After a year of fighting the cloud, we made a decision to leave EC2 for real hardware. Fortunately, this no longer means buying your own servers and racking them up in a colo. Managed hosting providers facilitate a good balance of physical hardware, virtualized instances, and rapid provisioning. Given our previous experience with hosting providers, we made the decision to choose SoftLayer. Their excellent service and infrastructure quality, provisioning speed, and customer support made them the best choice for us.

After more than a month of hard work preparing the inter-data center migration, we were able to execute the transition with zero downtime and no negative impact on our customers.The migration to real hardware resulted in enormous improvements in service stability from day one, provided a huge (~2x) performance boost to all key infrastructure components, and reduced our monthly hosting bill by ~50%.

This article will explain how we planned for and implemented the migration process, detail the performance improvements we saw after the transition, and offer insight for younger companies about when it might make sense to do the same.

Preparing For The Switch

Before the migration, we had around 40 instances on Amazon EC2. We would experience a serious production issue (instance outage, networking issue, etc) at least 2-3 times a week, sometimes daily. Once we decided to move to real hardware, we knew we had our work cut out for us because we needed to switch data centers without bringing down the service. The preparation process involved two major steps, each of which has a dedicated explanation in their own sections below:

  1. Connecting EC2 and SoftLayer. First, we built a skeleton of our new infrastructure (the smallest subset of servers to be able to run all key production services with development-level load) in SoftLayer’s data center. Once the new data center was set up, we built a system of VPN tunnels between our old and our new data centers to ensure transparent network connectivity between components in both data centers.

  2. Architectural changes to our applications. Next, we needed to make changes to our applications to make them work both in the cloud and on our new infrastructure. Once the application could live in both data centers simultaneously, we built a data-replication pipeline to make sure both the cloud infrastructure and the SoftLayer deployment (databases, search indexes, etc) were always in-sync.

Step 1: Connecting EC2 And Softlayer

One of the first things we had to do to prepare for our migration was figure out how to connect our EC2 and our SoftLayer networks together. Unfortunately the “proper” way of connecting a set of EC2 servers to another private network – using the Virtual Private Cloud (VPC) feature of EC2 – was not an option for us since we could not convert our existing set of instances into a VPC without downtime. After some consideration and careful planning, we realized that the only servers that really needed to be able to connect to each other across the data center boundary were our MongoDB nodes. Everything else we could make data center-local (Redis clusters, search servers, application clusters, etc).


Since the number of instances we needed to interconnect was relatively small, we implemented a very simple solution that proved to be stable and effective for our needs:

  • Each data center had a dedicated OpenVPN server deployed in it that NAT’ed all client traffic to its private network address.

  • Each node that needed to be able to connect to another data center would set up a VPN channel there and set up local routing to properly forward all connections directed at the other DC into that tunnel.

Here are some features that made this configuration very convenient for us:

  • Since we did not control network infrastructure on either side, we could not really force all servers on either end to funnel their traffic through a central router connected to the other DC. In our solution, each VPN server decided (with the help of some automation) which traffic to route through the tunnel to ensure complete inter-DC connectivity for all of its clients.

  • Even if a VPN tunnel collapsed (surprisingly, this only happened a few times during the weeks of the project), it would only mean one server lost its outgoing connectivity to the other DC (one node dropped out of MongoDB cluster, some worker server would lose connectivity to the central Resque box, etc). None of those one-off connectivity losses would affect our infrastructure since all important infrastructure components had redundant servers on both sides.

Step 2: Architectural Changes To Our Applications

There were many small changes we had to make in our infrastructure in the weeks of preparation for the migration, but having deep understanding of each and every component of it helped us make appropriate decisions reducing a chance of a disaster during the transitional period. I would argue that infrastructure of almost any complexity could be migrated with enough time and engineering resources to carefully consider each and every network connection established between applications and backend services.


Here are the main steps we had to take to ensure smooth and transparent migration:

  • All stateless services (caches, application clusters, web layer) were independently deployed on each side.

  • For each stateful backend service (database, search cluster, async queues, etc) we had to consider if we wanted (or could afford to) replicate the data to the other side or if we had to incur inter-data center latency for all connections. Relying on the VPN was always considered the last resort option and eventually we were able to reduce the amount of traffic between data centers to a few small streams of replication (mostly MongoDB) and connections to primary/main copies of services that could not be replicated.

  • If a service could be replicated, we would do that and then make application servers always use or prefer the local copy of the service instead of going to the other side.

  • For services that we could not replicate with their internal replication capabilities (like our search backends) we made the changes in our application to implement replication between data centers where asynchronous workers on each side would pull the data from their respective queues and we would always write all asynchronous jobs into queues for both data centers.

Step 3: Flipping The Switch

When both sides were ready to serve 100% of our traffic, we prepared for the final switchover by reducing our DNS TTL down to a few seconds to ensure fast traffic change.

Finally, we switched traffic to the new data center. Requests switched to the new infrastructure with zero impact on our customers. Once traffic to EC2 had drained, we disabled the old data center and forwarded all remaining connections from the old infrastructure to the new one. DNS updates take time, so some residual traffic was visible on our old servers for at least a week after the cut-off time.

A Clear Improvement: Results After Moving From EC2 To Real Hardware

Stability improved. We went from 2-3 serious outages a week (most of these were not customer-visible, since we did our best to make the system resilient to failures, but many outages would wake someone up or force someone to abandon family time) down to 1-2 outages a month, which we were able to handle more thoroughly by spending engineering resources on increasing system resilience to failures and reducing a chance of them making any impact on our customer-visible availability.

Performance improved. Thanks to the modern hardware available from SoftLayer we have seen a consistent performance increase for all of our backend services (especially IO-bound ones like databases and search clusters, but for CPU-bound app servers as well) and, what is more important, the performance was much more predictable: no sudden dips or spikes unrelated to our own software’s activity. This allowed us to start working on real capacity planning instead of throwing more slow instances at all performance problems.

Costs decreased. Last, but certainly not least for a young startup, the monthly cost of our infrastructure dropped by at least 50%, which allowed us to over-provision some of the services to improve performance and stability even further, greatly benefiting our customers.

Provisioning flexibility improved, but provisioning time increased. We are now able to exactly specify servers to meet their workload (lots of disk doesn’t mean we need a powerful CPU). However, we can no longer start new servers in minutes with an API call. SoftLayer generally can add a new server to our fleet within 1-2 hours. This is a big trade-off for some companies, but it was one that works well for Swiftype.


Since switching to real hardware, we’ve grown considerably – our data and query volume is up 20x – but our API performance is better than ever. Knowing exactly how our servers will perform lets us plan for growth in a way we couldn’t before.

In our experience, the cloud may be a good idea when you need to rapidly spin up new hardware, but it only works well when you’re making a huge (Netflix-level) effort to survive in it. If your goal is to build a business from day one and you do not have spare engineering resources to spend on paying the “cloud tax”, using real hardware may be a much better idea.
(via HighScalability.com)