WhatsApp Partners With Open WhisperSystems To End-To-End Encrypt Billions Of Messages A Day

Good news for people who love privacy and security! Bad news for black-hat hackers and government surveillance agencies. WhatsApp, the wildly popular messaging app, has partnered with the crypto gurus at Open WhisperSystems to implement strong end-to-end encryption on all WhatsApp text messages–meaning not even Zuck himself to pry into your conversations, even if a court order demands it.

WhatsApp, of course, has hundreds of millions of daily users worldwide, and was purchased by Facebook earlier this year for an eye-popping $19 billion. Open WhisperSystems creates state-of-the-art end-to-end encryption systems such as Signal, for voice, and TextSecure, for text. (They have nothing to do with the messaging app Whisper.) As per the EFF’s Secure Messaging Scorecard, TextSecure is a huge upgrade from WhatsApp’s previous encryption.

And I do mean huge. Right now the TextSecure protocol has only rolled out to WhatsApp for Android–other clients, and group/media messaging, are coming soon–but already “billions of encrypted messages are being exchanged daily … we believe this already represents the largest deployment of end-to-end encrypted communication in history,” to quote OWS founder Moxie Marlinspike.

This comes in the wake of Apple, similarly, encrypting iOS 8 devices such that Apple cannot retrieve data stored on them (albeit in a closed-source, unverifiable way.) FBI director James Comey subsequently claimed “the post-Snowden pendulum” has “gone too far” amid concerns that the world is “going dark” to wiretaps and surveillance. These and other three-letter agencies–along with black-hat hackers and governments worldwide, thanks to WhatsApp’s immense global reach–won’t be happy about their new inability to pry into the contents of WhatsApp text messages.

However, Comey’s claims appear to suffer from the slight disadvantage of being completely false:

and I think the tech industry’s response to the FBI and NSA’s dismay at the widespread use of end-to-end encryption by more and more companies, and the notion that the tech industry is “picking a fight” with them, can be summed up as:

And it’s fair to say that, in a world where surveillance technology seems to grow more powerful and pervasive every week, any meaningful blow that can be struck for privacy and anonymity is a welcome rebalancing. Kudos to WhatsApp for making this happen–Marlinspike, who approached them with the idea, stresses how impressed he’s been by their eagerness, dedication, and thoroughness–and to Open WhisperSystems for making it possible.

Marlinspike says OWS’s own goal is to keep producing strong, open-source privacy and security tools that companies (and individuals) can easily incorporate into their own services, without having to do their own crypto research and/or protocol design. They’ll keep working on Signal and TextSecure as reference implementations, using them to push the envelope and prove new ideas; and in the meantime, in one fell swoop, their TextSecure protocol suddenly has hundreds of millions of daily users, more than all but a tiny handful of companies.

Logging for Unikernels

logging-for-unikernelsUnikernels are the next step in virtualized computing. There is a lot of hubbub right now in the tech-o-sphere about unikernels. In fact, in many circles unikernels are thought to be the Next Big Thing. There is a lot of recent news to justify this perspective. Docker, a key player in Linux Containers, has acquired Unikernel Systems. MirageOS, the library that you use to create unikernels has solid backing from Xen and the Linux Foundation as an incubator project.

Unikernels are not going anyway soon. Having a clear understanding of unikernel technology is important to anybody working in large scale, distributed computing. Thus, I share. In this article I am going to explain the basic concepts behind unikernels and I am going to describe how they sit in the landscape of virtualized computing. Also I am going to talk about an approach to logging from within unikernels that I learned from industry expert, Adam Wick, Ph.D..

So let’s begin.

The Path From Virtual Machines to Unikernels.

A unikernel is a unit of binary code that runs directly on a hypervisor, very much in the same way that a virtual machine (VM) runs on a hypervisor. However, there is a difference. When you create a virtual machine, you are for all intents and purposes creating an abstract computer. As with any computer, your virtual machine will have RAM, disk storage, CPUs and network I/O. (Please see Figure 1, below.)


When you create a virtual machine, the resources you allocate from the host computer are static. For example, as shown above, when you create a VM with 16GB of RAM, a 4 core CPU and 1 TB of storage, that configuration is set. The VM owns the resources and will not give them back. Also, the operating system on a VM can be different than the OS on the host. For example, if your host computer is using Ubuntu, creating a VM that uses Windows is no problem at all.

The important thing to remember is that a VM is a full blown computer with dedicated resources. However, there is a drawback. At runtime, should your 4 core, 16 GB VM use only 1 CPU, 1 GB of RAM, the other resources will sit around unused. Remember, there’s no giving back. Your VM owns the resources whether it uses them or not. Thus, there is the possibility that you can be paying for computing that is never used. Inefficient use of resources is a very real hazard on the Virtual Machine landscape.

Enter Unikernels

While full blown VMs may be useful for some work, there are situations where you don’t need all the overhead that a VM requires. Let’s say you publish a web service that makes a recommendation of stock to buy given hourly activity on the Internet. What do you need? You can get by with threading capability, a web client, a web server and some code that provides analysis intelligence. That’s it. This is where a unikernel comes in.

As mentioned above, a unikernel is a unit of binary code that runs directly on a hypervisor. Think of a unikernel as an application that has been pulled into the operating system’s kernel. However, a unikernel is a special type of application. It’s compiled code that contains not only application code, but also the parts of the operating system that are essential to its purpose. A unikernel thinks it is the only show in town. And because it is compiled, you don’t have to go through a whole lot of installation hocus pocus. It’s a single file. (Please see Figure 2, below.)


Unikernels are very small. For example, a DNS server weighs in at around 449 KB; a web server at ~674 KB. A unikernel contains only the pieces of the operating system required, no more, no less. There are no unnecessary drivers, utilities or graphical rendering components.

Unikernels load into memory very fast. Start times can be measured in milliseconds. Thus, they are well suited to provide transient microservice functionality. A transient microservice is a microservice that is loaded and unloaded into memory to meet an immediate need. The service is not meant to have a long lifespan. It’s loaded, work gets done and then the service is terminated

Given the small size of a unikernel and the fast loading speed, you can deploy thousands of them on a single machine, bringing them on and offline as the need requires.. And because they are compiled, binary artifacts, there are less susceptible to security breaches provided the base compilation is clean.

A unikernel can be assigned a unique IP address. A unique IP address means that the unikernel is discoverable on Internet. The implication is that you can have thousands of unikernels running on a machine each communicating with each other via standard network protocols such as HTTP.

What about Containers?

A container is a virtualization technology in which components are assembled into a single deployment unit called a container. You do not have to use yum or apt-get to install application dependencies. Everything the container needs exists in the container.

A container is an isolated process. Thus, conceptually a container is like a VM in that it thinks that it’s the only show in town.

A container leverages the operating system of the host computer. Hence, there is no mixing and matching. You cannot have a Windows host computer running a Linux container.

Similar to a unikernel a container uses only the resources it needs. They are highly efficient.

Unlike a VM and a unikernel which are managed by a hypervisor, containers are managed by a container manager. Docker and Kubernetes are popular container managers.

You can read more about containers here.


As mentioned above, a microservice lives behind an IP address that makes it available to other services. Typically the microservice provides a single piece of functionality that is stateless. Unlike VMs and Containers that can provide their own storage and authentication mechanisms, the architectural practice for unikernels is to have a strong boundary around the unikernel’s area of concern. The best practice is to have a unikernel use other services to meet needs beyond its concern boundary. For example, going back to the stock analysis service we described above, the purpose of that unikernel is to recommend stocks. If we want to make sure that only authorized parties are using the service we’d make it so our Stock Recommender unikernel uses an authorization/authentication service to allow users access to the service. (Please see Figure 3, below.)


Unikernel technology is best suited for immutable microservices. The internals of a unikernel are written in stone. You do not update parts; there are not external parts. When it comes time to change a unikernel, you create a new binary and deploy that binary.

So far, so good? Excellent! Now that you have an understanding about the purpose and nature of unikernels, let’s move onto logging from within a unikernel.

The Importance of Logging

A unikernel is a compiled binary similar to low level compilation unit such as a C++ executable. Therefore, as of this writing, debugging is difficult. Whereas in interpreted languages such as Java and C#,  where you can decompile the deployment unit, (class/jar for Java, dll/exe for C#)  to take a look at the code and debug it if necessary, in a unikernel things are locked up tight. If anything goes wrong, there is no way to peek inside. The only way you have to get a sense of what is going on or what’s gone wrong is to read the logs. So, the question becomes how do you log from within a unikernel?

The answer is, carefully. I know, it’s a smartass answer. But it’s an answer that’s appropriate. Unikernel technology is still emerging. There is a lot of activity around unikernel loggers, but as of this writing no off-the-shelf technology on the order of Log4J, Log4Net or NLog have come forward. However, there are some people working in the space and I had the privilege to talk to one of them, Adam Wick, Ph.D.

Adam is Research Lead at Galois, Inc. The folks at Adam’s company solve big problems for organizations such as DARPA and the Department of Homeland Security. Galois does work that is strategic and complex.

Adam has been working with unikernels for a while and shared some wisdom with me. The first thing that Adam told me is that they stick to the Separation of Concerns mantra that I described above. The unikernels they implement do only one thing and when it comes to logging they write loggers that do no more then send log entries to a log service. All their loggers do is create log data. Storing log data and analysis is done by a third party service. (Logentries is one the logging services that Adam tests his logger against.) (Please see Figure 4, below.)


In terms of creating the logger for the unikernel, Adam acknowledges that at the moment, it’s still pretty much a roll your own affair. Nonetheless, Adam recommends that no matter what logger you create, you will do well to support the syslog protocol. To quote Adam,

“syslog is a tremendously easy protocol to implement. You basically just need to prepare a simple string in the right format and send it to the right place. Not a lot of tricky bits.”

The syslog protocol defines a log entry format that supports a predefined set of fields. The protocol has gone through a revision from RFC1364 to RFC5424, and there are some alternative versions out there. So there is a bit of confusion around the implementation. Still, there are usual fields that you can plan to support when implementing log emission. Table 1 below describes these usual fields.

Table 1: The fields of the Syslog protocol

Field Description Example
Facility Refers to the source of the message, such as a hardware device, a protocol, or a module of the system software. Log Alert, Line Printer subsystem
Severity The usual severity tags Debug, Informational, Notice, Warning, Error, Critical, Alert, Emergency
Hostname The host generating the entry, MyHostName
Timestamp The timestamp is the local time, in MMM DD HH:MM:SS format
Message A free form message that provides additional information relevant to the log entry “I am starting up now.”

Once you get logging implemented in your unikernel, the question becomes what to log. Of course you’ll log typical events that are important to the service your unikernel represents. For example, if your microservice does image manipulation, you’ll want to log when and how the images are processed. If your unikernel is providing data analysis, you’ll want to log the what and when events relevant to the analysis.

However, remember that your unikernel will be one of thousands sitting in a single computer. More is required. To quote Adam Wick again:

“… the problem that we run into is not logging enough information.

I suspect this is true of many systems — in fact, I know I’ve seen at least a few conference talks on it — but understanding complex distributed systems is just a very hard problem. The more information you have to understand why something failed, or why something bogged down, the better. For example, in some of our early work, we didn’t log a very simple message: a “hello, I’m booted and awake!” message. This turned out to be a problem, later, because we were seeing some weird behavior, and it wasn’t clear if it was because of some sort of deep logic problem, or if it was just because our DHCP server was slow.

In hindsight this is probably obvious, but having information like ‘I’m up and my IP configuration is … ‘ and  ‘I’m about to crash.’ have proved to be very useful in a couple different situations. Most high-level programming languages have a mechanism to catch exceptions that occur. Catching and sending them out on a log seems obvious, but we did have to remember to make it a practice.”

Another area that Adam covered is ensuring storage availability of your log data. Remember, your unikernel will not store data, only emit it. Thus, as we’ve mentioned, we need to rely upon a service to store the data for later analysis. Logentries allows you to store your data for days or months so you can quickly analyze it  and make alerts on your unikernel events. If you need to store the data for longer, you can use Logentries S3 archiving features to maintain a backup of your log data for years (or indefinitely). And, not only  does Logentries provide virtually unlimited storage for an unlimited amount of time, but the service is highly available. It’s always there at the IP endpoint. Internally, if a node within Logentries goes down, the system is designed to transfer traffic to another node immediately without any impact to the service consumer. High availability of the logging service is a critical. Remember, the logging service is all your unikernel has to store log data.

Given the closed, compiled nature of unikernels, logging within a unikernel is important practice. In fact, it’s a ripe opportunity for an entrepreneurial developer.  It’s not unreasonable to think that as the technology spreads, someone will publish an easy to use unikernel logger. Who knows, that someone might be you.

Putting it all together

Unikernels are going to be with us for a while. They are are small and load fast, making them a good choice for implementing transient microservices. And, because the are small, you can load thousands into a machine. Each unikernel has an unique IP address which means that you can cluster them to a common purpose behind a load balancer. Also, the IP address allows you to use standard communication protocols to have the unikernels talk to each other.

But, for as much activity that is going on in the space, unikernels are still an emerging technology. The toolset to create unikernels is growing. Players such as Docker and Xen are active players in unikernel development.

Given that a unikernel is a compiled binary that is nearly impossible to debug, logging information out of your unikernel implementation becomes critical. However, unikernel loggers are still a roll your own undertaking, for now anyway. Those in the know suggest that when you make a logging client, you emit log data according to the Syslog protocol and use logging services such Logentries to store log data for later analysis,

As I’ve mentioned over and over, unikernels are going to be with us for a while. Those who understand the details of unikernel technology and have made a few mission critical microservices using unikernels will be greatly in demand in no time at all. Unikernels are indeed the next big thing and the next big opportunity.

(Via Blog.logentries.com)

Design Of A Modern Cache

Caching is a common approach for improving performance, yet most implementations use strictly classical techniques. In this article we will explore the modern methods used by Caffeine, an open-source Java caching library, that yield high hit rates and excellent concurrency. These ideas can be translated to your favorite language and hopefully some readers will be inspired to do just that.

Eviction Policy

A cache’s eviction policy tries to predict which entries are most likely to be used again in the near future, thereby maximizing the hit ratio. The Least Recently Used (LRU) policy is perhaps the most popular due to its simplicity, good runtime performance, and a decent hit rate in common workloads. Its ability to predict the future is limited to the history of the entries residing in the cache, preferring to give the last access the highest priority by guessing that it is the most likely to be reused again soon.

Modern caches extend the usage history to include the recent past and give preference to entries based on recency and frequency. One approach for retaining history is to use a popularity sketch (a compact, probabilistic data structure) to identify the “heavy hitters” in a large stream of events. Take for example CountMin Sketch, which uses a matrix of counters and multiple hash functions. The addition of an entry increments a counter in each row and the frequency is estimated by taking the minimum value observed. This approach lets us tradeoff between space, efficiency, and the error rate due to collisions by adjusting the matrix’s width and depth.


Window TinyLFU (W-TinyLFU) uses the sketch as a filter, admitting a new entry if it has a higher frequency than the entry that would have to be evicted to make room for it. Instead of filtering immediately, an admission window gives an entry a chance to build up its popularity. This avoids consecutive misses, especially in cases like sparse bursts where an entry may not be deemed suitable for long-term retention. To keep the history fresh an aging process is performed periodically or incrementally to halve all of the counters.



W-TinyLFU uses the Segmented LRU (SLRU) policy for long term retention. An entry starts in the probationary segment and on a subsequent access it is promoted to the protected segment (capped at 80% capacity). When the protected segment is full it evicts into the probationary segment, which may trigger a probationary entry to be discarded. This ensures that entries with a small reuse interval (the hottest) are retained and those that are less often reused (the coldest) become eligible for eviction.



As the database and search traces show, there is a lot of opportunity to improve upon LRU by taking into account recency and frequency. More advanced policies such as ARC, LIRS, and W-TinyLFU narrow the gap to provide a near optimal hit rate. For additional workloads see the research papers and try our simulator if you have your own traces to experiment with.

Expiration Policy

Expiration is often implemented as variable per entry and expired entries are evicted lazily due to a capacity constraint. This pollutes the cache with dead items, so sometimes a scavenger thread is used to periodically sweep the cache and reclaim free space. This strategy tends to work better than ordering entries by their expiration time on a O(lg n) priority queue due to hiding the cost from the user instead of incurring a penalty on every read or write operation.

Caffeine takes a different approach by observing that most often a fixed duration is preferred. This constraint allows for organizing entries on O(1) time ordered queues. A time to live duration is a write order queue and a time to idle duration is an access order queue. The cache can reuse the eviction policy’s queues and the concurrency mechanism described below, so that expired entries are discarded during the cache’s maintenance phase.


Concurrent access to a cache is viewed as a difficult problem because in most policies every access is a write to some shared state. The traditional solution is to guard the cache with a single lock. This might then be improved through lock striping by splitting the cache into many smaller independent regions. Unfortunately that tends to have a limited benefit due to hot entries causing some locks to be more contented than others. When contention becomes a bottleneck the next classic step has been to update only per entry metadata and use either a random sampling or a FIFO-based eviction policy. Those techniques can have great read performance, poor write performance, and difficulty in choosing a good victim.

An alternative is to borrow an idea from database theory where writes are scaled by using a commit log. Instead of mutating the data structures immediately, the updates are written to a log and replayed in asynchronous batches. This same idea can be applied to a cache by performing the hash table operation, recording the operation to a buffer, and scheduling the replay activity against the policy when deemed necessary. The policy is still guarded by a lock, or a try lock to be more precise, but shifts contention onto appending to the log buffers instead.

In Caffeine separate buffers are used for cache reads and writes. An access is recorded into a striped ring buffer where the stripe is chosen by a thread specific hash and the number of stripes grows when contention is detected. When a ring buffer is full an asynchronous drain is scheduled and subsequent additions to that buffer are discarded until space becomes available. When the access is not recorded due to a full buffer the cached value is still returned to the caller. The loss of policy information does not have a meaningful impact because W-TinyLFU is able to identify the hot entries that we wish to retain. By using a thread-specific hash instead of the key’s hash the cache avoids popular entries from causing contention by more evenly spreading out the load.


In the case of a write a more traditional concurrent queue is used and every change schedules an immediate drain. While data loss is unacceptable, there are still ways to optimize the write buffer. Both types of buffers are written to by multiple threads but only consumed by a single one at a given time. This multiple producer / single consumer behavior allows for simpler, more efficient algorithms to be employed.

The buffers and fine grained writes introduce a race condition where operations for an entry may be recorded out of order. An insertion, read, update, and removal can be replayed in any order and if improperly handled the policy could retain dangling references. The solution to this is a state machine defining the lifecycle of an entry.


In benchmarks the cost of the buffers is relatively cheap and scales with the underlying hash table. Reads scale linearly with the number of CPUs at about 33% of the hash table’s throughput. Writes have a 10% penalty, but only because contention when updating the hash table is the dominant cost.



There are many pragmatic topics that have not been covered. This could include tricks to minimize the memory overhead, testing techniques to retain quality as complexity grows, and ways to analyze performance to determine whether a optimization is worthwhile. These are areas that practitioners must keep an eye on, because once neglected it can be difficult to restore confidence in one’s own ability to manage the ensuing complexity.

The design and implementation of Caffeine is the result of numerous insights and the hard work of many contributors. Its evolution over the years wouldn’t have been possible without the help from the following people: Charles Fry, Adam Zell, Gil Einziger, Roy Friedman, Kevin Bourrillion, Bob Lee, Doug Lea, Josh Bloch, Bob Lane, Nitsan Wakart, Thomas Müeller, Dominic Tootell, Louis Wasserman, and Vladimir Blagojevic.

(via HighScalability.com)

ejabberd Massive Scalability: 1 Node — 2+ Million Concurrent Users

How Far Can You Push ejabberd?

From our experience, we all get the idea that ejabberd is massively scalable. However, we wanted to provide benchmark results and hard figures to demonstrate our outstanding performance level and give a baseline about what to expect in simple cases.

That’s how we ended up with the challenge of fitting a very large number of concurrent users on a single ejabberd node.

It turns out you can get very far with ejabberd.

Scenario and Platforms

Here is our benchmark scenario: Target was to reach 2,000,000 concurrent users, each with 18 contacts on the roster and a session lasting around 1h. The scenario involves 2.2M registered users, so almost all contacts are online at the peak load. It means that presence packets were broadcast for those users, so there was some traffic as an addition to packets handling users connections and managing sessions. In that situation, the scenario produced 550 connections/second and thus 550 logins per second.

Database for authentication and roster storage was MySQL, running on the same node as ejabberd.

For the benchmark itself, we used Tsung, a tool dedicated to generating large loads to test servers performance. We used a single large instance to generate the load.

Both ejabberd and the test platform were running on Amazon EC2 instances. ejabberd was running on a single node of instance type m4.10xlarge (40 vCPU, 160 GiB). Tsung instance was identical.

Regarding ejabberd software itself, the test was made with ejabberd Community Server version 16.01. This is the standard open source version that is widely available and widely used across the world.

The connections were not using TLS to make sure we were focusing on testing ejabberd itself and not openSSL performance.

Code snippets and comments regarding the Tsung scenario are available for download: tsung_snippets.md

Overall Benchmark Results


We managed to surpass the target and we support more than2 million concurrent users on a single ejabberd.

For XMPP servers, the main limitation to handle a massive number of online users is usually memory consumption. With proper tuning, we managed to handle the traffic with a memory footprint of 28KB per online user.

The 40 CPUs were almost evenly used, with the exception of the first core that was handling all the network interruptions. It was more loaded by the Operating System and thus less loaded by the Erlang VM.

In the process, we also optimized our XML parser, released now as Fast XML, a high-performance, memory efficient Expat-based Erlang and Elixir XML parser.

Detailed Results

ejabberd Performance


Benchmark shows that we reached 2 million concurrent users after one hour. We were logging in about 33k users per minute, producing session traffic of a bit more than 210k XMPP packets per minute (this includes the stanzas to do the SASL authentication, binding, roster retrieval, etc). Maximum number of concurrent users is reached shortly after the 2 million concurrent users mark, by design in the scenario. At this point, we still connect new users but, as the first users start disconnecting, the number of concurrent users gets stable.

As we try to reproduce common client behavior we setup Tsung to send “keepalive pings” on the connections. Since each session sends one of such whitespace pings each minute, the number of such requests grows proportionately with the number of connected users. And while idle connections consume few resources on the server, it is important to note that in this scale they start to be noticeable. Once you have 2M users, you will be handling 33K/sec of such pings just from idle connections. They are not represented on the graphs, but are an important part of the real life traffic we were generating.

ejabberd Health


At all time, ejabberd health was fine. Typically, when ejabberd is overloaded, TCP connection establishment time and authentication tend to grow to an unacceptable level. In our case, both of those operations performed very fast during all bench, in under 10 milliseconds. There was almost no errors (the rare occurrences are artefacts of the benchmark process).

Platform Behavior


Good health and performance are confirmed by the state of the platform. CPU and memory consumption were totally under control, as shown in the graph. CPU consumption stays far from system limits. Memory grows proportionally to the number of concurrent users.

We also need to mention that values for CPUs are slightly overestimated as seen by the OS, as Erlang schedulers stay a bit of busy waiting when running out of work.

Challenge: The Hardest Part

The hardest part is definitely tuning the Linux system for ejabberd and for the benchmark tool, to overcome the default limitations. By default, Linux servers are not configured to allow you to handle, nor even generate, 2 million TCP sockets. It required quite a bit of network setup not to have problems with exhausted ports on the Tsung side.

On a similar topic, we worked with the Amazon server team, as we have been pushing the limits of their infrastructure like no one before. For example, we had to use a second Ethernet adapter with multiple IP addresses (2 x 15 IP, spread across 2 NICs). It also helped a lot to use latest Enhanced Networking drivers from Intel.

All in all, it was a very interesting process that helped make progress on Amazon Web Services by testing and tuning the platform itself.

What’s Next?

This benchmark was intended to demonstrate that ejabberd can scale to a large volume and serve as a baseline reference for more complex and full-featured platforms.

Next step is to keep on going with our benchmark and optimization iteration work. Our next target is to benchmark Multi-User Chat performance and Pubsub performance. The goal is to find the limits, optimize and demonstrate that massive internet scale can be done with these ejabberd components as well.

(via Blog.process-one.net)

How Wistia Handles Millions Of Requests Per Hour And Processes Rich Video Analytics

This is a guest repost from Christophe Limpalair of his interview with Max Schnur, Web Developer at  Wistia.

Wistia is video hosting for business. They offer video analytics like heatmaps, and they give you the ability to add calls to action, for example. I was really interested in learning how all the different components work and how they’re able to stream so much video content, so that’s what this episode focuses on.

What Does Wistia’s Stack Look Like?

As you will see, Wistia is made up of different parts. Here are some of the technologies powering these different parts:

What Scale Are You Running At?

Wistia has three main parts to it:

  1. The Wistia app (The ‘Hub’ where users log in and interact with the app)
  2. The Distillery (stats processing)
  3. The Bakery (transcoding and serving)

Here are some of their stats:

  • 1.5 million player loads per hour (loading a page with a player on it counts as one. Two players counts as two, etc…)
  • 18.8 million peak requests per hours to their Fastly CDN
  • 740,000 app requests per hour
  • 12,500 videos transcoded per hour
  • 150,000 plays per hour
  • 8 million stats pings per hour

They are running on Rackspace.

What Challenges Come From Receiving Videos, Processing Them, Then Serving Them?

  1. They want to balance quality and deliverability, which has two sides to it:
    1. Encoding derivatives of the original video
    2. Knowing when to play which derivative


    Derivatives, in this context, are different versions of a video. Having different quality versions is important, because it allows you to have smaller file sizes. These smaller video files can be served to users with less bandwidth. Otherwise, they would have to constantly buffer. Have enough bandwidth? Great! You can enjoy higher quality (larger) files.

    Having these different versions and knowing when to play which version is crucial for a good user experience.

  2. Lots of I/O. When you have users uploading videos at the same time, you end up having to move many heavy files across clusters. A lot of things can go wrong here.
  3. Volatility. There is volatility in both the number of requests and the number of videos to process. They need to be able to sustain these changes.
  4. Of course, serving videos is also a major challenge. Thankfully, CDNs have different kinds of configurations to help with this.
  5. On the client side, there are a ton of different browsers, device sizes, etc… If you’ve ever had to deal with making websites responsive or work on older IE versions, you know exactly what we’re talking about here.

How Do You Handle Big Changes In The Number Of Video Uploads?

They have boxes which they call Primes. These Prime boxes are in charge of receiving files from user uploads.

If uploads start eating up all the available disk space, they can just spin up new Primes using Chef recipes. It’s a manual job right now, but they don’t usually get anywhere close to the limit. They have lots of space.


What Transcoding System Do You Use?

The transcoding part is what they call the Bakery.

The Bakery is comprised of Primes we just looked at, which receive and serve files. They also have a cluster of workers. Workers process tasks and create derivative files from uploaded videos.

Beefy systems are required for this part, because it’s very resource intensive. How beefy?

We’re talking about several hundred workers. Each worker usually performs two tasks at one time. They all have 8 GB of RAM and a pretty powerful CPU.

What kinds of tasks and processing goes on with workers? They encode video primarily in x264 which is a fast encoding scheme. Videos can usually be encoded in about half or a third of the video length.

Videos must also be resized and they need different bitrate versions.

In addition, there are different encoding profiles for different devices–like HLS for iPhones. These encodings need to be doubled for Flash derivatives that are also x264 encoded.

What Does The Whole Process Of Uploading A Video And Transcoding It Look Like?

Once a user uploads a video, it is queued and sent to workers for transcoding, and then slowly transferred over to S3.

Instead of sending a video to S3 right away, it is pushed to Primes in the clusters so that customers can serve videos right away. Then, over a matter of a few hours, the file is pushed to S3 for permanent storage and cleared from Prime to make space.

How Do You Serve The Video To A Customer After It Is Processed?

When you’re hitting a video hosted on Wistia, you make a request to embed.wistia.com/video-url, which is actually hitting a CDN (they use a few different ones. I just tried it and mine went through Akamai). That CDN goes back to Origin, which is their Primes we just talked about.

Primes run HAProxy with a special balancer called Breadroute. Breadroute is a routing layer that sits in front of Primes and balances traffic.

Wistia might have the file stored locally in the cluster (which would be faster to serve), and Breadroute is smart enough to know that. If that is the case, then Nginx will serve the file directly from the file system. Otherwise, they proxy S3.

How Can You Tell Which Video Quality To Load?

That’s primarily decided on the client side.

Wistia will receive data about the user’s bandwidth immediately after they hit play. Before the user even hits play, Wistia receives data about the device, the size of the video embed, and other heuristics to determine the best asset for that particular request.

The decision of whether to go HD or not is only debated when the user goes into full screen. This gives Wistia a chance to not interrupt playback when a user first clicks play.

How Do You Know When Videos Don’t Load? How Do You Monitor That?

They have a service they call pipedream, which they use within their app and within embed codes to constantly send data back.

If a user clicks play, they get information about the size of their window, where it was, and if it buffers (if it’s in a playing state but the time hasn’t changed after a few seconds).

A known problem with videos is slow load times. Wistia wanted to know when a video was slow to load, so they tried tracking metrics for that. Unfortunately, they only knew if it was slow if the user actually waited around for the load to finish. If users have really slow connections, they might just leave before that happens.

The answer to this problem? Heartbeats. If they receive heartbeats and they never receive a play, then the user probably bailed.

What Other Analytics Do You Collect?

They also collect analytics for customers.

Analytics like play, pause, seeks, conversions (entering an email, or clicking a call to action). Some of these are shown in heatmaps. They also aggregate those heatmaps into graphs that show engagement, like the areas that people watched and rewatched.


How Do You Get Such Detailed Data?

When the video is playing, they use a video tracker which is an object bound to plays, pauses, and seeks. This tracker collects events into a data structure. Once every 10-20 seconds, it pings back to the Distillery which in turn figures out how to turn data into heatmaps.

Why doesn’t everyone do this? I asked him that, because I don’t even get this kind of data from YouTube. The reason, Max says, is because it’s pretty intensive. The Distillery processes a ton of stuff and has a huge database.

What Is Scaling Ease Of Use?

Wistia has about 60 employees. Once you start scaling up customer base, it can be really hard to make sure you keep having good customer support.

Their highest touch points are playback and embeds. These have a lot of factors that are out of their control, so they must make a choice:

  1. Don’t help customers
  2. Help customers, but it takes a lot of time

Those options weren’t good enough. Instead, they went after the source that caused the most customer support issues–embeds.

Embeds have gone through multiple iterations. Why? Because they often broke. Whether it was WordPress doing something weird to an embed, or Markdown breaking the embed, Wistia has had a lot of issues with this over time. To solve this problem, they ended up dramatically simplifying embed codes.

These simpler embed codes run into less problems on the customer’s end. While it does add more complexity behind the scenes, it results in fewer support tickets.

This is what Max means by scaling ease of use. Make it easier for customers, so that they don’t have to contact customer support as often. Even if that means more engineering challenges, it’s worth it to them.

Another example of scaling ease of use is with playbacks. Max is really interested in implementing a kind of client-side CDN load balancer that determines the lowest latency server to serve content from (similar to what Netflix does).

What Projects Are You Working On Right Now?

Something Max is planning on starting soon is what they’re calling the upload server. This upload server is going to give them the ability to do a lot of cool stuff around uploads.

As we talked about, and with most upload services, you have to wait for the file to get to a server before you can start transcoding and doing things with it.

The upload server will make it possible to transcode while uploading. This means customers could start playing their video before it’s even completely uploaded to their system. They could also get the still capture and embed almost immediately


I have to try and fit as much information in this summary as possible without making it too long. This means I cut out some information that could really help you out one day. I highly recommend you view the full interview if you found this interesting, as there are more hidden gems.

You can watch it, read it, or even listen to it. There’s also an option to download the MP3 so you can listen on your way to work. Of course, the show is also on iTunes and Stitcher.

Thanks for reading, and please let me know if you have any feedback!
– Christophe Limpalair (@ScaleYourCode)

(via HighScalability.com)

Designing For Scale – Three Principles And Three Practices From Tapad Engineering

This is a guest post by Toby Matejovsky, Director of Engineering at Tapad (@TapadEng).

Here at Tapad, scaling our technology strategically has been crucial to our immense growth. Over the last four years we’ve scaled our real-time bidding system to handle hundreds of thousands of queries per second. We’ve learned a number of lessons about scalability along that journey.

Here are a few concrete principles and practices we’ve distilled from those experiences:

  • Principle 1: Design for Many
  • Principle 2: Service-Oriented Architecture Beats Monolithic Application
  • Principle 3: Monitor Everything
  • Practice 1: Canary Deployments
  • Practice 2: Distributed Clock
  • Practice 3: Automate To Assist, Not To Control

Principle 1: Design For Many

There are three amounts that matter in software design: none, one, and many. We’ve learned to always design for the “many” case. This makes scaling more of a simple mechanical process, rather than a complicated project requiring re-architecting the entire codebase. The work to get there might not be as easy, but front-loading the effort pays dividends later when the application needs to scale suddenly.

For us– a guiding principle is to always consider the hypothetical ‘10x use case.’ How would our applications respond if they had to suddenly handle 10 times the current traffic? The system is only as good as the sum of its parts. The application layer might scale out easily, but if it fails because of interacting with a single database node then we haven’t achieved true scalability.

Principle 2: Service-Oriented Architecture Beats Monolithic Application

Here at Tapad we leverage a service-based architecture. The main advantages are the ability to allocate resources efficiently, and to make upgrades easier.

Imagine two systems:

  • One requires a lot of compute, not much memory.

  • One requires a lot of memory, not much compute.

If they were combined into a single system, but only the memory-intensive one needed to scale, every additional node would end up overcommitting on compute.

Virtualization solves the problem by making those overcommitted cores available to some other system, but the solution paints a misleading picture. It appears there are N systems available, but it is impossible to run all N at full capacity. If the cluster keeps enough compute available to run all N at full capacity, then money is being wasted – never a good thing.

Principle 3: Monitor Everything

Monitoring is an obvious requirement for any production system. We currently use Zabbix for alerting, and Graphite for tracking metrics over time. A typical Zabbix check looks like:

  • “Is process X running”

  • “Is node N responding to a request within M milliseconds”

  • “Node N’ is using > 80% of its available storage”

We recently switched out our Graphite backend to use Cassandra instead of whisper to better handle the volume of traffic (there are currently about half a million metrics tracked). We aggregate metrics in-memory with a customized version of Twitter’s Ostrich metrics library, and flush them to graphite every 10 seconds.

A example path for a given metric might look like this:


We use Grafana for building real-time dashboards to track key metrics, and display those dashboards on big screens inside our office. When we switch our SOA to a more ephemeral container-based approach (e.g. running containers on Mesos with Marathon) we may have to re-think how these metrics are organized, as the instance-specific names like foo01 will end up looking like foo.43ffbdc8-ef60-11e4-97ce-005056a272f9. Graphite supports wildcards in queries, so something likesumSeries(prd.nj1.foo.*.pipeline.producer.avro_event_bar_count) could work.

Principles lay the groundwork for our decisions, but executing them successfully is equally important. Here are three best practices for working with distributed systems.

Practice 1: Canary Deployments

Some things are very challenging to test rigorously prior to a full-scale production launch. To mitigate risk, we upgrade a single node first and monitor it manually. Assuming it behaves as expected, the rest of the nodes are automatically deployed by Rundeck. Rundeck is a tool that can upgrade systems automatically and in parallel, rolling several instances at a time and moving on to the next set as soon as the upgraded nodes report a healthy status. Monitoring the canary deploy involves more than this single health check, which is why it’s upgraded out-of-band.

Practice 2: Distributed Clock

Because of clock skew and lag, there is no good concept of “now” in a distributed system.

  • Clock skew occurs because clocks are not particularly precise, even with NTP (Network Time Protocol).

  • Lag is a factor when passing messages around. If one server is cut off from the network, buffers messages for a while, then sends them after re-joining, the receiving system will get a batch of messages with relatively old timestamps. A system consuming all messages from these producers cannot be assured it has read 100% of messages up to a given time until it sees that each producer has passed the mark. This assumes that each producer guarantees ordering within its own stream, much like Kafka’s model.

Our solution is to create a sort of distributed clock, where producers record their most recent timestamps as child nodes of a particular Zookeeper “clock” node. The time is resolved by taking the minimum timestamp in that set. We also track lag relative to the resolving node’s system clock.

Practice 3: Automate To Assist, Not To Control

Our devops tools are designed to assist humans, rather than to automatically manage things, as human judgement is often required to respond to a system failure. There is risk in allowing a script to automatically failover a database or spin up new nodes. We have a pager duty rotation with primary, secondary, and tertiary engineers. The engineer can initiate the failover or spin up new nodes based on an alert. This means they are fully aware of the context.

The more well-understood a task is, the more it can be automated. One basic level of automation is using Kickstart/PXE boot. When a new VM starts up, it does a PXE boot and registers itself with Spacewalk, and Puppet handles installation of all required packages. Nothing custom is ever required, which enables us to easily build/rebuild sets of systems just by starting new VMs with particular names.

As we gain better understanding of a given system’s performance we can automate more parts. For example, scaling a shared-nothing system up and down depending on some reasonable change in traffic. For exceptional circumstances, we want a human to make the decision, assisted by all the information at their disposal.

(via HighScalability.com)

Elements Of Scale: Composing And Scaling Data Platforms


This is a guest repost of Ben Stopford‘s epic post on Elements of Scale: Composing and Scaling Data Platforms. A masterful tour through the evolutionary forces that shape how systems adapt to key challenges.

As software engineers we are inevitably affected by the tools we surround ourselves with. Languages, frameworks, even processes all act to shape the software we build.

Likewise databases, which have trodden a very specific path, inevitably affect the way we treat mutability and share state in our applications.

Over the last decade we’ve explored what the world might look like had we taken a different path. Small open source projects try out different ideas. These grow. They are composed with others. The platforms that result utilise suites of tools, with each component often leveraging some fundamental hardware or systemic efficiency. The result, platforms that solve problems too unwieldy or too specific to work within any single tool.

So today’s data platforms range greatly in complexity. From simple caching layers or polyglotic persistence right through to wholly integrated data pipelines. There are many paths. They go to many different places. In some of these places at least, nice things are found.

So the aim for this talk is to explain how and why some of these popular approaches work. We’ll do this by first considering the building blocks from which they are composed. These are the intuitions we’ll need to pull together the bigger stuff later on.


 In a somewhat abstract sense, when we’re dealing with data, we’re really just arranging locality. Locality to the CPU. Locality to the other data we need. Accessing data sequentially is an important component of this. Computers are just good at sequential operations. Sequential operations can be predicted.

If you’re taking data from disk sequentially it’ll be pre-fetched into the disk buffer, the page cache and the different levels of CPU caching. This has a significant effect on performance. But it does little to help the addressing of data at random, be it in main memory, on disk or over the network. In fact pre-fetching actually hinders random workloads as the various caches and frontside bus fill with data which is unlikely to be used.


Whilst disk is somewhat renowned for its slow performance, main memory is often assumed to simply be fast. This is not as ubiquitously true as people often think. There are one to two orders of magnitude between random and sequential main memory workloads. Use a language that manages memory for you and things generally get a whole lot worse.

Streaming data sequentially from disk can actually outperform randomly addressed main memory. So disk may not always be quite the tortoise we think it is, at least not if we can arrange sequential access. SSD’s, particularly those that utilise PCIe, further complicate the picture as they demonstrate different tradeoffs, but the caching benefits of the two access patterns remain, regardless.


So lets imagine, as a simple thought experiment, that we want to create a very simple database. We’ll start with the basics: a file.

We want to keep writes and reads sequential, as it works well with the hardware. We can append writes to the end of the file efficiently. We can read by scanning the the file in its entirety. Any processing we wish to do can happen as the data streams through the CPU. We might filter, aggregate or even do something more complex. The world is our oyster!


So what about data that changes, updates etc?

We have two options. We could update the value in place. We’d need to use fixed width fields for this, but that’s ok for our little thought experiment. But update in place would mean random IO. We know that’s not good for performance.

Alternatively we could just append updates to the end of the file and deal with the superseded values when we read it back.

So we have our first tradeoff. Append to a ‘journal’ or ‘log’, and reap the benefits of sequential access. Alternatively if we use update in place we’ll be back to 300 or so writes per second, assuming we actually flush through to the underlying media.


Now in practice of course reading the file, in its entirety, can be pretty slow. We’ll only need to get into GB’s of data and the fastest disks will take seconds. This is what a database does when it ends up table scanning.

Also we often want something more specific, say customers named “bob”, so scanning the whole file would be overkill. We need an index.


Now there are lots of different types of indexes we could use. The simplest would be an ordered array of fixed-width values, in this case customer names, held with the corresponding offsets in the heap file. The ordered array could be searched with binary search. We could also of course use some form of tree, bitmap index, hash index, term index etc. Here we’re picturing a tree.

The thing with indexes like this is that they impose an overarching structure. The values are deliberately ordered so we can access them quickly when we want to do a read. The problem with the overarching structure is that it necessitates random writes as data flows in. So our wonderful, write optimised, append only file must be augmented by writes that scatter-gun the filesystem. This is going to slow us down.


Anyone who has put lots of indexes on a database table will be familiar with this problem. If we are using a regular rotating hard drive, we might run 1,000s of times slower if we maintain disk integrity of an index in this way.

Luckily there are a few ways around this problem. Here we are going to discuss three. These represent three extremes, and they are in truth simplifications of the real world, but the concepts are useful when we consider larger compositions.


Our first option is simply to place the index in main memory. This will compartmentalise the problem of random writes to RAM. The heap file stays on disk.

This is a simple and effective solution to our random writes problem. It is also one used by many real databases. MongoDB, Cassandra, Riak and many others use this type of optimisation. Often memory mapped files are used.

However, this strategy breaks down if we have far more data than we have main memory. This is particularly noticeable where there are lots of small objects. Our index would get very large. Thus our storage becomes bounded by the amount of main memory we have available. For many tasks this is fine, but if we have very large quantities of data this can be a burden.

A popular solution is to move away from having a single ‘overarching’ index. Instead we use a collection of smaller ones.


This is a simple idea. We batch up writes in main memory, as they come in. Once we have sufficient – say a few MB’s – we sort them and write them to disk as an individual mini-index. What we end up with is a chronology of small, immutable index files.

So what was the point of doing that? Our set of immutable files can be streamed sequentially. This brings us back to a world of fast writes, without us needing to keep the whole index in memory. Nice!

Of course there is a downside to this approach too. When we read, we have to consult the many small indexes individually. So all we have really done is shift the problem of RandomIO from writes onto reads. However this turns out to be a pretty good tradeoff in many cases. It’s easier to optimise random reads than it is to optimise random writes.

Keeping a small meta-index in memory or using a Bloom Filter provides a low-memory way of evaluating whether individual index files need to be consulted during a read operation. This gives us almost the same read performance as we’d get with a single overarching index whilst retaining fast, sequential writes.

In reality we will need to purge orphaned updates occasionally too, but that can be done with nice sequentially reads and writes.


What we have created is termed a Log Structured Merge Tree. A storage approach used in a lot of big data tools such as HBase, Cassandra, Google’s BigTable and many others. It balances write and read performance with comparatively small memory overhead.


So we can get around the ‘random-write penalty’ by storing our indexes in memory or, alternatively, using a write-optimised index structure like LSM. There is a third approach though. Pure brute force.

Think back to our original example of the file. We could read it in its entirety. This gave us many options in terms of how we go about processing the data within it. The brute force approach is simply to hold data by column rather than by row and stream only the columns required for a query, in their entirety, through the CPU. This approach is termed Columnar or Column Oriented.

(It should be noted that there is an unfortunate nomenclature clash between true column stores and those that follow the Big Table pattern. Whilst they share some similarities, in practice they are quite different. It is wise to consider them as different things.)


Column Orientation is another simple idea. Instead of storing data as a set of rows, appended to a single file, we split each row by column. We then store each column in a separate file.

We keep the order of the files the same, so row N has the same position (offset) in each column file. This is important because we will need to read multiple columns to service a single query, all at the same time. This means ‘joining’ columns on the fly. If the columns are in the same order we can do this in a tight loop which is very cache- and cpu-efficient. Many implementations make heavy use of vectorisation to further optimise throughput for simple join and filter operations.

Writes can leverage the benefit of being append-only. The downside is that we now have many files to update, one for every column in every individual write to the database. The most common solution to this is to batch writes in a similar way to the one used in the LSM approach above. Many columnar databases also impose an overall order to the table as a whole to increase their read performance for one chosen key.


By splitting data by column we significantly reduce the amount of data that needs to be brought from disk, so long as our query operates on a subset of all columns.

In addition to this, data in a single column generally compresses well. We can take advantage of the data type of the column to do this, if we have knowledge of it. This means we can often use efficient, low cost encodings such as run-length, delta, bit-packed etc. For some encodings predicates can be used directly on the uncompressed stream too.

The result is a brute force approach that will work particularly well for operations that require large scans. Aggregate functions like average, max, min, group by etc are typical of this.

This is very different to using the ‘heap file & index’ approach we covered earlier. A good way to understand this is to ask yourself: what is the difference between a columnar approach like this vs a ‘heap & index’ where indexes are added to every field?


The answer to this lies in the ordering of the index files. BTrees etc will be ordered by the fields they index. Joining the data in two indexes involves a streaming operation on one side, but on the other side the index lookups have to read random positions in the second index. This is generally less efficient than joining two indexes (columns) that retain the same ordering. Again we’re leveraging sequential access.


So many of the best technologies which we may want to use as components in a data platform will leverage one of these core efficiencies to excel for a certain set of workloads.

Storing indexes in memory, over a heap file, is favoured by many NoSQL stores such as Riak, Couchbase or MongoDB as well as some relational databases. It’s a simple model that works well.

Tools designed to work with larger data sets tend to take the LSM approach. This gives them fast ingestion as well as good read performance using disk based structures. HBase, Cassandra, RocksDB, LevelDB and even Mongo now support this approach.

Column-per-file engines are used heavily in MPP databases like Redshift or Vertica as well as in the Hadoop stack using Parquet. These are engines for data crunching problems that require large traversals. Aggregation is the home ground for these tools.

Other products like Kafka apply the use of a simple, hardware efficient contract to messaging. Messaging, at it’s simplest, is just appending to a file, or reading from a predefined offset. You read messages from an offset. You go away. You come back. You read from the offset you previously finished at. All nice sequential IO.

This is different to most message oriented middleware. Specifications like JMS and AMQP require the addition of indexes like the ones discussed above, to manage selectors and session information. This means they often end up performing more like a database than a file. Jim Gray made this point famously back in his 1995 publication Queue’s are Databases.

So all these approaches favour one tradeoff or other, often keeping things simple, and hardware sympathetic, as a means of scaling.


So we’ve covered some of the core approaches to storage engines. In truth we made some simplifications. The real world is a little more complex. But the concepts are useful nonetheless.

Scaling a data platform is more than just storage engines though. We need to consider parallelism.


When distributing data over many machines we have two core primitives to play with: partitioning and replication. Partitioning, sometimes called sharding, works well both for random access and brute force workloads.

If a hash-based partitioning model is used the data will be spread across a number of machines using a well-known hash function. This is similar to the way a hash table works, with each bucket being held on a different machine.

The result is that any value can be read by going directly to the machine that contains the data, via the hash function. This pattern is wonderfully scalable and is the only pattern that shows linear scalability as the number of client requests increases. Requests are isolated to a single machine. Each one will be served by just a single machine in the cluster.


We can also use partitioning to provide parallelism over batch computations, for example aggregate functions or more complex algorithms such as those we might use for clustering or machine learning. The key difference is that we exercise all machines at the same time, in a broadcast manner. This allows us to solve a large computational problem in a much shorter time, using a divide and conquer approach.

Batch systems work well for large problems, but provide little concurrency as they tend to exhaust the resources on the cluster when they execute.


So the two extremes are pretty simple: directed access at one end, broadcast, divide and conquer at the other. Where we need to be careful is in the middle ground that lies between the two. A good example of this is the use of secondary indexes in NoSQL stores that span many machines.

A secondary index is an index that isn’t on the primary key. This means the data will not be partitioned by the values in the index. Directed routing via a hash function is no longer an option. We have to broadcast requests to all machines. This limits concurrency. Every node must be involved in every query.

For this reason many key value stores have resisted the temptation to add secondary indexes, despite their obvious use. HBase and Voldemort are examples of this. But many others do expose them, MongoDB, Cassandra, Riak etc. This is good as secondary indexes are useful. But it’s important to understand the effect they will have on the overall concurrency of the system.


The route out of this concurrency bottleneck is replication. You’ll probably be familiar with replication either from using async slave databases or from replicated NoSQL stores like Mongo or Cassandra.

In practice replicas can be invisible (used only for recovery), read only (adding read concurrency) or read write (adding partition tolerance). Which of these you choose will trade off against the consistency of the system. This is simply the application of CAP theorem (although cap theorem also may not be as simple as you think).


This tradeoff with consistency* brings us to an important question. When does consistency matter?

Consistency is expensive. In the database world ACID is guaranteed by serialisability. This is essentially ensuring that all operations appear to occur in sequential order. It turns out to be a pretty expensive thing. In fact it’s prohibitive enough that many databases don’t offer it as an isolation level at all. Those that do never set it as the default.

Suffice to say that if you apply strong consistency to a system that does distributed writes you’ll likely end up in tortoise territory.

(* note the term consistency has two common usages. The C in ACID and the C in CAP. They are unfortunately not the same. I’m using the CAP definition: all nodes see the same data at the same time)


The solution to this consistency problem is simple. Avoid it. If you can’t avoid it isolate it to as few writers and as few machines as possible.

Avoiding consistency issues is often quite easy, particularly if your data is an immutable stream of facts. A set of web logs is a good example. They have no consistency concerns as they are just facts that never change.

There are other use cases which do necessitate consistency though. Transferring money between accounts is an oft used example. Non-commutative actions such as applying discount codes is another.

But often things that appear to need consistency, in a traditional sense, may not. For example if an action can be changed from a mutation to a new set of associated facts we can avoid mutable state. Consider marking a transaction as being potentially fraudulent. We could update it directly with the new field. Alternatively we could simply use a separate stream of facts that links back to the original transaction.


So in a data platform it’s useful to either remove the consistency requirement altogether, or at least isolate it. One way to isolate is to use the single writer principal, this gets you some of the way. Datomic is a good example of this. Another is to physically isolate the consistency requirement by splitting mutable and immutable worlds.

Approaches like Bloom/CALM extend this idea further by embracing the concept of disorder by default, imposing order only when necessary.

So those were some of the fundamental tradeoffs we need to consider. Now how to we pull these things together to build a data platform?


A typical application architecture might look something like the below. We have a set of processes which write data to a database and read it back again. This is fine for many simple workloads. Many successful applications have been built with this pattern. But we know it works less well as throughput grows. In the application space this is a problem we might tackle with message-passing, actors, load balancing etc.

The other problem is this approach treats the database as a black box. Databases are clever software. They provide a huge wealth of features. But they provide little mechanism for scaling out of an ACID world. This is a good thing in many ways. We default to safety. But it can become an annoyance when scaling is inhibited by general guarantees which may be overkill for the requirements we have.


The simplest route out of this is CQRS (Command Query Responsibility Segregation).

Another very simple idea. We separate read and write workloads. Writes go into something write-optimised. Something closer to a simple journal file. Reads come from something read-optimised. There are many ways to do this, be it tools like Goldengate for relational technologies or products that integrate replication internally such as Replica Sets in MongoDB.


Many databases do something like this under the hood. Druid is a nice example. Druid is an open source, distributed, time-series, columnar analytics engine. Columnar storage works best if we input data in large blocks, as the data must be spread across many files. To get good write performance Druid stores recent data in a write optimised store. This is gradually ported over to the read optimised store over time.

When Druid is queried the query routes to both the write optimised and read optimised components. The results are combined (‘reduced’) and returned to the user. Druid uses time, marked on each record, to determine ordering.

Composite approaches like this provide the benefits of CQRS behind a single abstraction.


Another similar approach is to use an Operational/Analytic Bridge. Read- and write-optimised views are separated using an event stream. The stream of state is retained indefinitely, so that the async views can be recomposed and augmented at a later date by replaying.

So the front section provides for synchronous reads and writes. This can be as simple as immediately reading data that was written or as complex as supporting ACID transactions.

The back end leverages asynchronicity, and the advantages of immutable state, to scale offline processing through replication, denormalisation or even completely different storage engines. The messaging-bridge, along with joining the two, allows applications to listen to the data flowing through the platform.

As a pattern this is well suited to mid-sized deployments where there is at least a partial, unavoidable requirement for a mutable view.


If we are designing for an immutable world, it’s easier to embrace larger data sets and more complex analytics. The batch pipeline, one almost ubiquitously implemented with the Hadoop stack, is typical of this.

The beauty of the Hadoop stack comes from it’s plethora of tools. Whether you want fast read-write access, cheap storage, batch processing, high throughput messaging or tools for extracting, processing and analysing data, the Hadoop ecosystem has it all.

The batch pipeline architecture pulls data from pretty much any source, push or pull. Ingests it into HDFS then processes it to provide increasingly optimised versions of the original data. Data might be enriched, cleansed, denormalised, aggregated, moved to a read optimised format such as Parquet or loaded into a serving layer or data mart. Data can be queried and processed throughout this process.

This architecture works well for immutable data, ingested and processed in large volume. Think 100’s of TBs plus. The evolution of this architecture will be slow though. Straight-through timings are often measured in hours.


The problem with the Batch Pipeline is that we often don’t want to wait hours to get a result. A common solution is to add a streaming layer aside it. This is sometimes referred to as theLambda Architecture.

The Lambda Architecture retains a batch pipeline, like the one above, but it circumvents it with a fast streaming layer. It’s a bit like building a bypass around a busy town. The streaming layer typically uses a streaming processing tool such as Storm or Samza.

The key insight of the Lambda Architecture is that we’re often happy to have an approximate answer quickly, but we would like an accurate answer in the end.

So the streaming layer bypasses the batch layer providing the best answers it can within a streaming window. These are written to a serving layer. Later the batch pipeline computes an accurate data and overwrites the approximation.

This is a clever way to balance accuracy with responsiveness. Some implementations of this pattern suffer if the two branches end up being dual coded in stream and batch layers. But it is often possible to simply abstract this logic into common libraries that can be reused, particularly as much of this processing is often written in external libraries such as Python or R anyway. Alternatively systems like Spark provide both stream and batch functionality in one system (although the streams in Spark are really micro-batches).

So this pattern again suits high volume data platforms, say in the 100TB+ range, that want to combine streams with existing, rich, batch based analytic function.


There is another approach to this problem of slow data pipelines. It’s sometimes termed theKappa architecture. I actually thought this name was ‘tongue in cheek’ but I’m now not so sure. Whichever it is, I’m going to use the term Stream Data Platform, which is a term in use also.

Stream Data Platform’s flip the batch pattern on its head. Rather than storing data in HDFS, and refining it with incremental batch jobs, the data is stored in a scale out messaging system, or log, such as Kafka. This becomes the system of record and the stream of data is processed in real time to create a set of tertiary views, indexes, serving layers or data marts.

This is broadly similar to the streaming layer of the Lambda architecture but with the batch layer removed. Obviously the requirement for this is that the messaging layer can store and vend very large volumes of data and there is a sufficiently powerful stream processor to handle the processing.

There is no free lunch so, for hard problems, Stream Data Platform’s will likely run no faster than an equivalent batch system, but switching the default approach from ‘store and process’ to ‘stream and process’ can provide greater opportunity for faster results.


Finally, the Stream Data Platform approach can be applied to the problem of ‘application integration’. This is a thorny and difficult problem that has seen focus from big vendors such as Informatica, Tibco and Oracle for many years. For the most part results have been beneficial, but not transformative. Application integration remains a topic looking for a real workable solution.

Stream Data Platform’s provide an interesting potential solution to this problem. They take many of the benefits of an O/A bridge – the variety of asynchronous storage formats and ability to recreate views – but leave the consistency requirement isolated in, often existing sources:


With the system of record being a log it’s easy to enforce immutability. Products like Kafka can retain enough volume internally to be used as a historic record. This means recovery can be a process of replaying and regenerating state, rather than constantly checkpointing.

Similarly styled approaches have been taken before in a number of large institutions with tools such as Goldengate, porting data to enterprise data warehouses or more recently data lakes. They were often thwarted by a lack of throughput in the replication layer and the complexity of managing changing schemas. It seems unlikely the first problem will continue. As for the later problem though, the jury is still out.


So we started with locality. With sequential addressing for both reads and writes. This dominates the tradeoffs inside the components we use. We looked at scaling these components out, leveraging primitives for both sharding and replication. Finally we rebranded consistency as a problem we should isolate in the platforms we build.

But data platforms themselves are really about balancing the sweet-spots of these individual components within a single, holistic form. Incrementally restructuring. Migrating the write-optimised to the read-optimised. Moving from the constraints of consistency to the open plains of streamed, asynchronous, immutable state.

This must be done with a few things in mind. Schemas are one. Time, the peril of the distributed, asynchronous world, is another. But these problems are manageable if carefully addressed. Certainly the future is likely to include more of these things, particularly as tooling, innovated in the big data space, percolates into platforms that address broader problems, both old and new.

(via HighScalability.com)