Design Of A Modern Cache

Caching is a common approach for improving performance, yet most implementations use strictly classical techniques. In this article we will explore the modern methods used by Caffeine, an open-source Java caching library, that yield high hit rates and excellent concurrency. These ideas can be translated to your favorite language and hopefully some readers will be inspired to do just that.

Eviction Policy

A cache’s eviction policy tries to predict which entries are most likely to be used again in the near future, thereby maximizing the hit ratio. The Least Recently Used (LRU) policy is perhaps the most popular due to its simplicity, good runtime performance, and a decent hit rate in common workloads. Its ability to predict the future is limited to the history of the entries residing in the cache, preferring to give the last access the highest priority by guessing that it is the most likely to be reused again soon.

Modern caches extend the usage history to include the recent past and give preference to entries based on recency and frequency. One approach for retaining history is to use a popularity sketch (a compact, probabilistic data structure) to identify the “heavy hitters” in a large stream of events. Take for example CountMin Sketch, which uses a matrix of counters and multiple hash functions. The addition of an entry increments a counter in each row and the frequency is estimated by taking the minimum value observed. This approach lets us tradeoff between space, efficiency, and the error rate due to collisions by adjusting the matrix’s width and depth.

sPS-0VWjzfHp6DtvEQKU0Hg

Window TinyLFU (W-TinyLFU) uses the sketch as a filter, admitting a new entry if it has a higher frequency than the entry that would have to be evicted to make room for it. Instead of filtering immediately, an admission window gives an entry a chance to build up its popularity. This avoids consecutive misses, especially in cases like sparse bursts where an entry may not be deemed suitable for long-term retention. To keep the history fresh an aging process is performed periodically or incrementally to halve all of the counters.

sHXTg0FelH7DQ0ctP3zAnYg

 

W-TinyLFU uses the Segmented LRU (SLRU) policy for long term retention. An entry starts in the probationary segment and on a subsequent access it is promoted to the protected segment (capped at 80% capacity). When the protected segment is full it evicts into the probationary segment, which may trigger a probationary entry to be discarded. This ensures that entries with a small reuse interval (the hottest) are retained and those that are less often reused (the coldest) become eligible for eviction.

database

search

As the database and search traces show, there is a lot of opportunity to improve upon LRU by taking into account recency and frequency. More advanced policies such as ARC, LIRS, and W-TinyLFU narrow the gap to provide a near optimal hit rate. For additional workloads see the research papers and try our simulator if you have your own traces to experiment with.

Expiration Policy

Expiration is often implemented as variable per entry and expired entries are evicted lazily due to a capacity constraint. This pollutes the cache with dead items, so sometimes a scavenger thread is used to periodically sweep the cache and reclaim free space. This strategy tends to work better than ordering entries by their expiration time on a O(lg n) priority queue due to hiding the cost from the user instead of incurring a penalty on every read or write operation.

Caffeine takes a different approach by observing that most often a fixed duration is preferred. This constraint allows for organizing entries on O(1) time ordered queues. A time to live duration is a write order queue and a time to idle duration is an access order queue. The cache can reuse the eviction policy’s queues and the concurrency mechanism described below, so that expired entries are discarded during the cache’s maintenance phase.

Concurrency

Concurrent access to a cache is viewed as a difficult problem because in most policies every access is a write to some shared state. The traditional solution is to guard the cache with a single lock. This might then be improved through lock striping by splitting the cache into many smaller independent regions. Unfortunately that tends to have a limited benefit due to hot entries causing some locks to be more contented than others. When contention becomes a bottleneck the next classic step has been to update only per entry metadata and use either a random sampling or a FIFO-based eviction policy. Those techniques can have great read performance, poor write performance, and difficulty in choosing a good victim.

An alternative is to borrow an idea from database theory where writes are scaled by using a commit log. Instead of mutating the data structures immediately, the updates are written to a log and replayed in asynchronous batches. This same idea can be applied to a cache by performing the hash table operation, recording the operation to a buffer, and scheduling the replay activity against the policy when deemed necessary. The policy is still guarded by a lock, or a try lock to be more precise, but shifts contention onto appending to the log buffers instead.

In Caffeine separate buffers are used for cache reads and writes. An access is recorded into a striped ring buffer where the stripe is chosen by a thread specific hash and the number of stripes grows when contention is detected. When a ring buffer is full an asynchronous drain is scheduled and subsequent additions to that buffer are discarded until space becomes available. When the access is not recorded due to a full buffer the cached value is still returned to the caller. The loss of policy information does not have a meaningful impact because W-TinyLFU is able to identify the hot entries that we wish to retain. By using a thread-specific hash instead of the key’s hash the cache avoids popular entries from causing contention by more evenly spreading out the load.

sxSo8y3_uR8QZ_NbIqnCwzA

In the case of a write a more traditional concurrent queue is used and every change schedules an immediate drain. While data loss is unacceptable, there are still ways to optimize the write buffer. Both types of buffers are written to by multiple threads but only consumed by a single one at a given time. This multiple producer / single consumer behavior allows for simpler, more efficient algorithms to be employed.

The buffers and fine grained writes introduce a race condition where operations for an entry may be recorded out of order. An insertion, read, update, and removal can be replayed in any order and if improperly handled the policy could retain dangling references. The solution to this is a state machine defining the lifecycle of an entry.

scG-jsP3Cfr4YfwBQn9_bpQ

In benchmarks the cost of the buffers is relatively cheap and scales with the underlying hash table. Reads scale linearly with the number of CPUs at about 33% of the hash table’s throughput. Writes have a 10% penalty, but only because contention when updating the hash table is the dominant cost.

png;base641c7880f2120d6f20

Conclusion

There are many pragmatic topics that have not been covered. This could include tricks to minimize the memory overhead, testing techniques to retain quality as complexity grows, and ways to analyze performance to determine whether a optimization is worthwhile. These are areas that practitioners must keep an eye on, because once neglected it can be difficult to restore confidence in one’s own ability to manage the ensuing complexity.

The design and implementation of Caffeine is the result of numerous insights and the hard work of many contributors. Its evolution over the years wouldn’t have been possible without the help from the following people: Charles Fry, Adam Zell, Gil Einziger, Roy Friedman, Kevin Bourrillion, Bob Lee, Doug Lea, Josh Bloch, Bob Lane, Nitsan Wakart, Thomas Müeller, Dominic Tootell, Louis Wasserman, and Vladimir Blagojevic.

(via HighScalability.com)

ejabberd Massive Scalability: 1 Node — 2+ Million Concurrent Users

How Far Can You Push ejabberd?

From our experience, we all get the idea that ejabberd is massively scalable. However, we wanted to provide benchmark results and hard figures to demonstrate our outstanding performance level and give a baseline about what to expect in simple cases.

That’s how we ended up with the challenge of fitting a very large number of concurrent users on a single ejabberd node.

It turns out you can get very far with ejabberd.

Scenario and Platforms

Here is our benchmark scenario: Target was to reach 2,000,000 concurrent users, each with 18 contacts on the roster and a session lasting around 1h. The scenario involves 2.2M registered users, so almost all contacts are online at the peak load. It means that presence packets were broadcast for those users, so there was some traffic as an addition to packets handling users connections and managing sessions. In that situation, the scenario produced 550 connections/second and thus 550 logins per second.

Database for authentication and roster storage was MySQL, running on the same node as ejabberd.

For the benchmark itself, we used Tsung, a tool dedicated to generating large loads to test servers performance. We used a single large instance to generate the load.

Both ejabberd and the test platform were running on Amazon EC2 instances. ejabberd was running on a single node of instance type m4.10xlarge (40 vCPU, 160 GiB). Tsung instance was identical.

Regarding ejabberd software itself, the test was made with ejabberd Community Server version 16.01. This is the standard open source version that is widely available and widely used across the world.

The connections were not using TLS to make sure we were focusing on testing ejabberd itself and not openSSL performance.

Code snippets and comments regarding the Tsung scenario are available for download: tsung_snippets.md

Overall Benchmark Results

ejabberd_hs4

We managed to surpass the target and we support more than2 million concurrent users on a single ejabberd.

For XMPP servers, the main limitation to handle a massive number of online users is usually memory consumption. With proper tuning, we managed to handle the traffic with a memory footprint of 28KB per online user.

The 40 CPUs were almost evenly used, with the exception of the first core that was handling all the network interruptions. It was more loaded by the Operating System and thus less loaded by the Erlang VM.

In the process, we also optimized our XML parser, released now as Fast XML, a high-performance, memory efficient Expat-based Erlang and Elixir XML parser.

Detailed Results

ejabberd Performance

ejabberd_performance-2-1024x640

Benchmark shows that we reached 2 million concurrent users after one hour. We were logging in about 33k users per minute, producing session traffic of a bit more than 210k XMPP packets per minute (this includes the stanzas to do the SASL authentication, binding, roster retrieval, etc). Maximum number of concurrent users is reached shortly after the 2 million concurrent users mark, by design in the scenario. At this point, we still connect new users but, as the first users start disconnecting, the number of concurrent users gets stable.

As we try to reproduce common client behavior we setup Tsung to send “keepalive pings” on the connections. Since each session sends one of such whitespace pings each minute, the number of such requests grows proportionately with the number of connected users. And while idle connections consume few resources on the server, it is important to note that in this scale they start to be noticeable. Once you have 2M users, you will be handling 33K/sec of such pings just from idle connections. They are not represented on the graphs, but are an important part of the real life traffic we were generating.

ejabberd Health

ejabberd_health-1024x640

At all time, ejabberd health was fine. Typically, when ejabberd is overloaded, TCP connection establishment time and authentication tend to grow to an unacceptable level. In our case, both of those operations performed very fast during all bench, in under 10 milliseconds. There was almost no errors (the rare occurrences are artefacts of the benchmark process).

Platform Behavior

ejabberd_platform-1-1024x640

Good health and performance are confirmed by the state of the platform. CPU and memory consumption were totally under control, as shown in the graph. CPU consumption stays far from system limits. Memory grows proportionally to the number of concurrent users.

We also need to mention that values for CPUs are slightly overestimated as seen by the OS, as Erlang schedulers stay a bit of busy waiting when running out of work.

Challenge: The Hardest Part

The hardest part is definitely tuning the Linux system for ejabberd and for the benchmark tool, to overcome the default limitations. By default, Linux servers are not configured to allow you to handle, nor even generate, 2 million TCP sockets. It required quite a bit of network setup not to have problems with exhausted ports on the Tsung side.

On a similar topic, we worked with the Amazon server team, as we have been pushing the limits of their infrastructure like no one before. For example, we had to use a second Ethernet adapter with multiple IP addresses (2 x 15 IP, spread across 2 NICs). It also helped a lot to use latest Enhanced Networking drivers from Intel.

All in all, it was a very interesting process that helped make progress on Amazon Web Services by testing and tuning the platform itself.

What’s Next?

This benchmark was intended to demonstrate that ejabberd can scale to a large volume and serve as a baseline reference for more complex and full-featured platforms.

Next step is to keep on going with our benchmark and optimization iteration work. Our next target is to benchmark Multi-User Chat performance and Pubsub performance. The goal is to find the limits, optimize and demonstrate that massive internet scale can be done with these ejabberd components as well.

(via Blog.process-one.net)

How Wistia Handles Millions Of Requests Per Hour And Processes Rich Video Analytics

This is a guest repost from Christophe Limpalair of his interview with Max Schnur, Web Developer at  Wistia.

Wistia is video hosting for business. They offer video analytics like heatmaps, and they give you the ability to add calls to action, for example. I was really interested in learning how all the different components work and how they’re able to stream so much video content, so that’s what this episode focuses on.

What Does Wistia’s Stack Look Like?

As you will see, Wistia is made up of different parts. Here are some of the technologies powering these different parts:

What Scale Are You Running At?

Wistia has three main parts to it:

  1. The Wistia app (The ‘Hub’ where users log in and interact with the app)
  2. The Distillery (stats processing)
  3. The Bakery (transcoding and serving)

Here are some of their stats:

  • 1.5 million player loads per hour (loading a page with a player on it counts as one. Two players counts as two, etc…)
  • 18.8 million peak requests per hours to their Fastly CDN
  • 740,000 app requests per hour
  • 12,500 videos transcoded per hour
  • 150,000 plays per hour
  • 8 million stats pings per hour

They are running on Rackspace.

What Challenges Come From Receiving Videos, Processing Them, Then Serving Them?

  1. They want to balance quality and deliverability, which has two sides to it:
    1. Encoding derivatives of the original video
    2. Knowing when to play which derivative

     

    Derivatives, in this context, are different versions of a video. Having different quality versions is important, because it allows you to have smaller file sizes. These smaller video files can be served to users with less bandwidth. Otherwise, they would have to constantly buffer. Have enough bandwidth? Great! You can enjoy higher quality (larger) files.

    Having these different versions and knowing when to play which version is crucial for a good user experience.

  2. Lots of I/O. When you have users uploading videos at the same time, you end up having to move many heavy files across clusters. A lot of things can go wrong here.
  3. Volatility. There is volatility in both the number of requests and the number of videos to process. They need to be able to sustain these changes.
  4. Of course, serving videos is also a major challenge. Thankfully, CDNs have different kinds of configurations to help with this.
  5. On the client side, there are a ton of different browsers, device sizes, etc… If you’ve ever had to deal with making websites responsive or work on older IE versions, you know exactly what we’re talking about here.

How Do You Handle Big Changes In The Number Of Video Uploads?

They have boxes which they call Primes. These Prime boxes are in charge of receiving files from user uploads.

If uploads start eating up all the available disk space, they can just spin up new Primes using Chef recipes. It’s a manual job right now, but they don’t usually get anywhere close to the limit. They have lots of space.

22616924913_140f904cba

What Transcoding System Do You Use?

The transcoding part is what they call the Bakery.

The Bakery is comprised of Primes we just looked at, which receive and serve files. They also have a cluster of workers. Workers process tasks and create derivative files from uploaded videos.

Beefy systems are required for this part, because it’s very resource intensive. How beefy?

We’re talking about several hundred workers. Each worker usually performs two tasks at one time. They all have 8 GB of RAM and a pretty powerful CPU.

What kinds of tasks and processing goes on with workers? They encode video primarily in x264 which is a fast encoding scheme. Videos can usually be encoded in about half or a third of the video length.

Videos must also be resized and they need different bitrate versions.

In addition, there are different encoding profiles for different devices–like HLS for iPhones. These encodings need to be doubled for Flash derivatives that are also x264 encoded.

What Does The Whole Process Of Uploading A Video And Transcoding It Look Like?

Once a user uploads a video, it is queued and sent to workers for transcoding, and then slowly transferred over to S3.

Instead of sending a video to S3 right away, it is pushed to Primes in the clusters so that customers can serve videos right away. Then, over a matter of a few hours, the file is pushed to S3 for permanent storage and cleared from Prime to make space.

How Do You Serve The Video To A Customer After It Is Processed?

When you’re hitting a video hosted on Wistia, you make a request to embed.wistia.com/video-url, which is actually hitting a CDN (they use a few different ones. I just tried it and mine went through Akamai). That CDN goes back to Origin, which is their Primes we just talked about.

Primes run HAProxy with a special balancer called Breadroute. Breadroute is a routing layer that sits in front of Primes and balances traffic.

Wistia might have the file stored locally in the cluster (which would be faster to serve), and Breadroute is smart enough to know that. If that is the case, then Nginx will serve the file directly from the file system. Otherwise, they proxy S3.

How Can You Tell Which Video Quality To Load?

That’s primarily decided on the client side.

Wistia will receive data about the user’s bandwidth immediately after they hit play. Before the user even hits play, Wistia receives data about the device, the size of the video embed, and other heuristics to determine the best asset for that particular request.

The decision of whether to go HD or not is only debated when the user goes into full screen. This gives Wistia a chance to not interrupt playback when a user first clicks play.

How Do You Know When Videos Don’t Load? How Do You Monitor That?

They have a service they call pipedream, which they use within their app and within embed codes to constantly send data back.

If a user clicks play, they get information about the size of their window, where it was, and if it buffers (if it’s in a playing state but the time hasn’t changed after a few seconds).

A known problem with videos is slow load times. Wistia wanted to know when a video was slow to load, so they tried tracking metrics for that. Unfortunately, they only knew if it was slow if the user actually waited around for the load to finish. If users have really slow connections, they might just leave before that happens.

The answer to this problem? Heartbeats. If they receive heartbeats and they never receive a play, then the user probably bailed.

What Other Analytics Do You Collect?

They also collect analytics for customers.

Analytics like play, pause, seeks, conversions (entering an email, or clicking a call to action). Some of these are shown in heatmaps. They also aggregate those heatmaps into graphs that show engagement, like the areas that people watched and rewatched.

23218035576_3bd30bb1da

How Do You Get Such Detailed Data?

When the video is playing, they use a video tracker which is an object bound to plays, pauses, and seeks. This tracker collects events into a data structure. Once every 10-20 seconds, it pings back to the Distillery which in turn figures out how to turn data into heatmaps.

Why doesn’t everyone do this? I asked him that, because I don’t even get this kind of data from YouTube. The reason, Max says, is because it’s pretty intensive. The Distillery processes a ton of stuff and has a huge database.

What Is Scaling Ease Of Use?

Wistia has about 60 employees. Once you start scaling up customer base, it can be really hard to make sure you keep having good customer support.

Their highest touch points are playback and embeds. These have a lot of factors that are out of their control, so they must make a choice:

  1. Don’t help customers
  2. Help customers, but it takes a lot of time

Those options weren’t good enough. Instead, they went after the source that caused the most customer support issues–embeds.

Embeds have gone through multiple iterations. Why? Because they often broke. Whether it was WordPress doing something weird to an embed, or Markdown breaking the embed, Wistia has had a lot of issues with this over time. To solve this problem, they ended up dramatically simplifying embed codes.

These simpler embed codes run into less problems on the customer’s end. While it does add more complexity behind the scenes, it results in fewer support tickets.

This is what Max means by scaling ease of use. Make it easier for customers, so that they don’t have to contact customer support as often. Even if that means more engineering challenges, it’s worth it to them.

Another example of scaling ease of use is with playbacks. Max is really interested in implementing a kind of client-side CDN load balancer that determines the lowest latency server to serve content from (similar to what Netflix does).

What Projects Are You Working On Right Now?

Something Max is planning on starting soon is what they’re calling the upload server. This upload server is going to give them the ability to do a lot of cool stuff around uploads.

As we talked about, and with most upload services, you have to wait for the file to get to a server before you can start transcoding and doing things with it.

The upload server will make it possible to transcode while uploading. This means customers could start playing their video before it’s even completely uploaded to their system. They could also get the still capture and embed almost immediately

Conclusion

I have to try and fit as much information in this summary as possible without making it too long. This means I cut out some information that could really help you out one day. I highly recommend you view the full interview if you found this interesting, as there are more hidden gems.

You can watch it, read it, or even listen to it. There’s also an option to download the MP3 so you can listen on your way to work. Of course, the show is also on iTunes and Stitcher.

Thanks for reading, and please let me know if you have any feedback!
– Christophe Limpalair (@ScaleYourCode)

(via HighScalability.com)

Designing For Scale – Three Principles And Three Practices From Tapad Engineering

This is a guest post by Toby Matejovsky, Director of Engineering at Tapad (@TapadEng).

Here at Tapad, scaling our technology strategically has been crucial to our immense growth. Over the last four years we’ve scaled our real-time bidding system to handle hundreds of thousands of queries per second. We’ve learned a number of lessons about scalability along that journey.

Here are a few concrete principles and practices we’ve distilled from those experiences:

  • Principle 1: Design for Many
  • Principle 2: Service-Oriented Architecture Beats Monolithic Application
  • Principle 3: Monitor Everything
  • Practice 1: Canary Deployments
  • Practice 2: Distributed Clock
  • Practice 3: Automate To Assist, Not To Control

Principle 1: Design For Many

There are three amounts that matter in software design: none, one, and many. We’ve learned to always design for the “many” case. This makes scaling more of a simple mechanical process, rather than a complicated project requiring re-architecting the entire codebase. The work to get there might not be as easy, but front-loading the effort pays dividends later when the application needs to scale suddenly.

For us– a guiding principle is to always consider the hypothetical ‘10x use case.’ How would our applications respond if they had to suddenly handle 10 times the current traffic? The system is only as good as the sum of its parts. The application layer might scale out easily, but if it fails because of interacting with a single database node then we haven’t achieved true scalability.

Principle 2: Service-Oriented Architecture Beats Monolithic Application

Here at Tapad we leverage a service-based architecture. The main advantages are the ability to allocate resources efficiently, and to make upgrades easier.

Imagine two systems:

  • One requires a lot of compute, not much memory.

  • One requires a lot of memory, not much compute.

If they were combined into a single system, but only the memory-intensive one needed to scale, every additional node would end up overcommitting on compute.

Virtualization solves the problem by making those overcommitted cores available to some other system, but the solution paints a misleading picture. It appears there are N systems available, but it is impossible to run all N at full capacity. If the cluster keeps enough compute available to run all N at full capacity, then money is being wasted – never a good thing.

Principle 3: Monitor Everything

Monitoring is an obvious requirement for any production system. We currently use Zabbix for alerting, and Graphite for tracking metrics over time. A typical Zabbix check looks like:

  • “Is process X running”

  • “Is node N responding to a request within M milliseconds”

  • “Node N’ is using > 80% of its available storage”

We recently switched out our Graphite backend to use Cassandra instead of whisper to better handle the volume of traffic (there are currently about half a million metrics tracked). We aggregate metrics in-memory with a customized version of Twitter’s Ostrich metrics library, and flush them to graphite every 10 seconds.

A example path for a given metric might look like this:

prd.nj1.foo01.pipeline.producer.avro_event_bar_count

We use Grafana for building real-time dashboards to track key metrics, and display those dashboards on big screens inside our office. When we switch our SOA to a more ephemeral container-based approach (e.g. running containers on Mesos with Marathon) we may have to re-think how these metrics are organized, as the instance-specific names like foo01 will end up looking like foo.43ffbdc8-ef60-11e4-97ce-005056a272f9. Graphite supports wildcards in queries, so something likesumSeries(prd.nj1.foo.*.pipeline.producer.avro_event_bar_count) could work.

Principles lay the groundwork for our decisions, but executing them successfully is equally important. Here are three best practices for working with distributed systems.

Practice 1: Canary Deployments

Some things are very challenging to test rigorously prior to a full-scale production launch. To mitigate risk, we upgrade a single node first and monitor it manually. Assuming it behaves as expected, the rest of the nodes are automatically deployed by Rundeck. Rundeck is a tool that can upgrade systems automatically and in parallel, rolling several instances at a time and moving on to the next set as soon as the upgraded nodes report a healthy status. Monitoring the canary deploy involves more than this single health check, which is why it’s upgraded out-of-band.

Practice 2: Distributed Clock

Because of clock skew and lag, there is no good concept of “now” in a distributed system.

  • Clock skew occurs because clocks are not particularly precise, even with NTP (Network Time Protocol).

  • Lag is a factor when passing messages around. If one server is cut off from the network, buffers messages for a while, then sends them after re-joining, the receiving system will get a batch of messages with relatively old timestamps. A system consuming all messages from these producers cannot be assured it has read 100% of messages up to a given time until it sees that each producer has passed the mark. This assumes that each producer guarantees ordering within its own stream, much like Kafka’s model.

Our solution is to create a sort of distributed clock, where producers record their most recent timestamps as child nodes of a particular Zookeeper “clock” node. The time is resolved by taking the minimum timestamp in that set. We also track lag relative to the resolving node’s system clock.

Practice 3: Automate To Assist, Not To Control

Our devops tools are designed to assist humans, rather than to automatically manage things, as human judgement is often required to respond to a system failure. There is risk in allowing a script to automatically failover a database or spin up new nodes. We have a pager duty rotation with primary, secondary, and tertiary engineers. The engineer can initiate the failover or spin up new nodes based on an alert. This means they are fully aware of the context.

The more well-understood a task is, the more it can be automated. One basic level of automation is using Kickstart/PXE boot. When a new VM starts up, it does a PXE boot and registers itself with Spacewalk, and Puppet handles installation of all required packages. Nothing custom is ever required, which enables us to easily build/rebuild sets of systems just by starting new VMs with particular names.

As we gain better understanding of a given system’s performance we can automate more parts. For example, scaling a shared-nothing system up and down depending on some reasonable change in traffic. For exceptional circumstances, we want a human to make the decision, assisted by all the information at their disposal.

(via HighScalability.com)

Elements Of Scale: Composing And Scaling Data Platforms

17330600816_c0b875ba8a_n

This is a guest repost of Ben Stopford‘s epic post on Elements of Scale: Composing and Scaling Data Platforms. A masterful tour through the evolutionary forces that shape how systems adapt to key challenges.

As software engineers we are inevitably affected by the tools we surround ourselves with. Languages, frameworks, even processes all act to shape the software we build.

Likewise databases, which have trodden a very specific path, inevitably affect the way we treat mutability and share state in our applications.

Over the last decade we’ve explored what the world might look like had we taken a different path. Small open source projects try out different ideas. These grow. They are composed with others. The platforms that result utilise suites of tools, with each component often leveraging some fundamental hardware or systemic efficiency. The result, platforms that solve problems too unwieldy or too specific to work within any single tool.

So today’s data platforms range greatly in complexity. From simple caching layers or polyglotic persistence right through to wholly integrated data pipelines. There are many paths. They go to many different places. In some of these places at least, nice things are found.

So the aim for this talk is to explain how and why some of these popular approaches work. We’ll do this by first considering the building blocks from which they are composed. These are the intuitions we’ll need to pull together the bigger stuff later on.

Slide04

 In a somewhat abstract sense, when we’re dealing with data, we’re really just arranging locality. Locality to the CPU. Locality to the other data we need. Accessing data sequentially is an important component of this. Computers are just good at sequential operations. Sequential operations can be predicted.

If you’re taking data from disk sequentially it’ll be pre-fetched into the disk buffer, the page cache and the different levels of CPU caching. This has a significant effect on performance. But it does little to help the addressing of data at random, be it in main memory, on disk or over the network. In fact pre-fetching actually hinders random workloads as the various caches and frontside bus fill with data which is unlikely to be used.

Slide05

Whilst disk is somewhat renowned for its slow performance, main memory is often assumed to simply be fast. This is not as ubiquitously true as people often think. There are one to two orders of magnitude between random and sequential main memory workloads. Use a language that manages memory for you and things generally get a whole lot worse.

Streaming data sequentially from disk can actually outperform randomly addressed main memory. So disk may not always be quite the tortoise we think it is, at least not if we can arrange sequential access. SSD’s, particularly those that utilise PCIe, further complicate the picture as they demonstrate different tradeoffs, but the caching benefits of the two access patterns remain, regardless.

Slide06

So lets imagine, as a simple thought experiment, that we want to create a very simple database. We’ll start with the basics: a file.

We want to keep writes and reads sequential, as it works well with the hardware. We can append writes to the end of the file efficiently. We can read by scanning the the file in its entirety. Any processing we wish to do can happen as the data streams through the CPU. We might filter, aggregate or even do something more complex. The world is our oyster!

read

So what about data that changes, updates etc?

We have two options. We could update the value in place. We’d need to use fixed width fields for this, but that’s ok for our little thought experiment. But update in place would mean random IO. We know that’s not good for performance.

Alternatively we could just append updates to the end of the file and deal with the superseded values when we read it back.

So we have our first tradeoff. Append to a ‘journal’ or ‘log’, and reap the benefits of sequential access. Alternatively if we use update in place we’ll be back to 300 or so writes per second, assuming we actually flush through to the underlying media.

Slide11

Now in practice of course reading the file, in its entirety, can be pretty slow. We’ll only need to get into GB’s of data and the fastest disks will take seconds. This is what a database does when it ends up table scanning.

Also we often want something more specific, say customers named “bob”, so scanning the whole file would be overkill. We need an index.

Slide13

Now there are lots of different types of indexes we could use. The simplest would be an ordered array of fixed-width values, in this case customer names, held with the corresponding offsets in the heap file. The ordered array could be searched with binary search. We could also of course use some form of tree, bitmap index, hash index, term index etc. Here we’re picturing a tree.

The thing with indexes like this is that they impose an overarching structure. The values are deliberately ordered so we can access them quickly when we want to do a read. The problem with the overarching structure is that it necessitates random writes as data flows in. So our wonderful, write optimised, append only file must be augmented by writes that scatter-gun the filesystem. This is going to slow us down.

Slide14

Anyone who has put lots of indexes on a database table will be familiar with this problem. If we are using a regular rotating hard drive, we might run 1,000s of times slower if we maintain disk integrity of an index in this way.

Luckily there are a few ways around this problem. Here we are going to discuss three. These represent three extremes, and they are in truth simplifications of the real world, but the concepts are useful when we consider larger compositions.

Slide15

Our first option is simply to place the index in main memory. This will compartmentalise the problem of random writes to RAM. The heap file stays on disk.

This is a simple and effective solution to our random writes problem. It is also one used by many real databases. MongoDB, Cassandra, Riak and many others use this type of optimisation. Often memory mapped files are used.

However, this strategy breaks down if we have far more data than we have main memory. This is particularly noticeable where there are lots of small objects. Our index would get very large. Thus our storage becomes bounded by the amount of main memory we have available. For many tasks this is fine, but if we have very large quantities of data this can be a burden.

A popular solution is to move away from having a single ‘overarching’ index. Instead we use a collection of smaller ones.

Slide16

This is a simple idea. We batch up writes in main memory, as they come in. Once we have sufficient – say a few MB’s – we sort them and write them to disk as an individual mini-index. What we end up with is a chronology of small, immutable index files.

So what was the point of doing that? Our set of immutable files can be streamed sequentially. This brings us back to a world of fast writes, without us needing to keep the whole index in memory. Nice!

Of course there is a downside to this approach too. When we read, we have to consult the many small indexes individually. So all we have really done is shift the problem of RandomIO from writes onto reads. However this turns out to be a pretty good tradeoff in many cases. It’s easier to optimise random reads than it is to optimise random writes.

Keeping a small meta-index in memory or using a Bloom Filter provides a low-memory way of evaluating whether individual index files need to be consulted during a read operation. This gives us almost the same read performance as we’d get with a single overarching index whilst retaining fast, sequential writes.

In reality we will need to purge orphaned updates occasionally too, but that can be done with nice sequentially reads and writes.

Slide17

What we have created is termed a Log Structured Merge Tree. A storage approach used in a lot of big data tools such as HBase, Cassandra, Google’s BigTable and many others. It balances write and read performance with comparatively small memory overhead.

Slide18

So we can get around the ‘random-write penalty’ by storing our indexes in memory or, alternatively, using a write-optimised index structure like LSM. There is a third approach though. Pure brute force.

Think back to our original example of the file. We could read it in its entirety. This gave us many options in terms of how we go about processing the data within it. The brute force approach is simply to hold data by column rather than by row and stream only the columns required for a query, in their entirety, through the CPU. This approach is termed Columnar or Column Oriented.

(It should be noted that there is an unfortunate nomenclature clash between true column stores and those that follow the Big Table pattern. Whilst they share some similarities, in practice they are quite different. It is wise to consider them as different things.)

Slide19

Column Orientation is another simple idea. Instead of storing data as a set of rows, appended to a single file, we split each row by column. We then store each column in a separate file.

We keep the order of the files the same, so row N has the same position (offset) in each column file. This is important because we will need to read multiple columns to service a single query, all at the same time. This means ‘joining’ columns on the fly. If the columns are in the same order we can do this in a tight loop which is very cache- and cpu-efficient. Many implementations make heavy use of vectorisation to further optimise throughput for simple join and filter operations.

Writes can leverage the benefit of being append-only. The downside is that we now have many files to update, one for every column in every individual write to the database. The most common solution to this is to batch writes in a similar way to the one used in the LSM approach above. Many columnar databases also impose an overall order to the table as a whole to increase their read performance for one chosen key.

Slide20

By splitting data by column we significantly reduce the amount of data that needs to be brought from disk, so long as our query operates on a subset of all columns.

In addition to this, data in a single column generally compresses well. We can take advantage of the data type of the column to do this, if we have knowledge of it. This means we can often use efficient, low cost encodings such as run-length, delta, bit-packed etc. For some encodings predicates can be used directly on the uncompressed stream too.

The result is a brute force approach that will work particularly well for operations that require large scans. Aggregate functions like average, max, min, group by etc are typical of this.

This is very different to using the ‘heap file & index’ approach we covered earlier. A good way to understand this is to ask yourself: what is the difference between a columnar approach like this vs a ‘heap & index’ where indexes are added to every field?

merge

The answer to this lies in the ordering of the index files. BTrees etc will be ordered by the fields they index. Joining the data in two indexes involves a streaming operation on one side, but on the other side the index lookups have to read random positions in the second index. This is generally less efficient than joining two indexes (columns) that retain the same ordering. Again we’re leveraging sequential access.

Slide21

So many of the best technologies which we may want to use as components in a data platform will leverage one of these core efficiencies to excel for a certain set of workloads.

Storing indexes in memory, over a heap file, is favoured by many NoSQL stores such as Riak, Couchbase or MongoDB as well as some relational databases. It’s a simple model that works well.

Tools designed to work with larger data sets tend to take the LSM approach. This gives them fast ingestion as well as good read performance using disk based structures. HBase, Cassandra, RocksDB, LevelDB and even Mongo now support this approach.

Column-per-file engines are used heavily in MPP databases like Redshift or Vertica as well as in the Hadoop stack using Parquet. These are engines for data crunching problems that require large traversals. Aggregation is the home ground for these tools.

Other products like Kafka apply the use of a simple, hardware efficient contract to messaging. Messaging, at it’s simplest, is just appending to a file, or reading from a predefined offset. You read messages from an offset. You go away. You come back. You read from the offset you previously finished at. All nice sequential IO.

This is different to most message oriented middleware. Specifications like JMS and AMQP require the addition of indexes like the ones discussed above, to manage selectors and session information. This means they often end up performing more like a database than a file. Jim Gray made this point famously back in his 1995 publication Queue’s are Databases.

So all these approaches favour one tradeoff or other, often keeping things simple, and hardware sympathetic, as a means of scaling.

four

So we’ve covered some of the core approaches to storage engines. In truth we made some simplifications. The real world is a little more complex. But the concepts are useful nonetheless.

Scaling a data platform is more than just storage engines though. We need to consider parallelism.

Slide27

When distributing data over many machines we have two core primitives to play with: partitioning and replication. Partitioning, sometimes called sharding, works well both for random access and brute force workloads.

If a hash-based partitioning model is used the data will be spread across a number of machines using a well-known hash function. This is similar to the way a hash table works, with each bucket being held on a different machine.

The result is that any value can be read by going directly to the machine that contains the data, via the hash function. This pattern is wonderfully scalable and is the only pattern that shows linear scalability as the number of client requests increases. Requests are isolated to a single machine. Each one will be served by just a single machine in the cluster.

Slide28

We can also use partitioning to provide parallelism over batch computations, for example aggregate functions or more complex algorithms such as those we might use for clustering or machine learning. The key difference is that we exercise all machines at the same time, in a broadcast manner. This allows us to solve a large computational problem in a much shorter time, using a divide and conquer approach.

Batch systems work well for large problems, but provide little concurrency as they tend to exhaust the resources on the cluster when they execute.

Slide29

So the two extremes are pretty simple: directed access at one end, broadcast, divide and conquer at the other. Where we need to be careful is in the middle ground that lies between the two. A good example of this is the use of secondary indexes in NoSQL stores that span many machines.

A secondary index is an index that isn’t on the primary key. This means the data will not be partitioned by the values in the index. Directed routing via a hash function is no longer an option. We have to broadcast requests to all machines. This limits concurrency. Every node must be involved in every query.

For this reason many key value stores have resisted the temptation to add secondary indexes, despite their obvious use. HBase and Voldemort are examples of this. But many others do expose them, MongoDB, Cassandra, Riak etc. This is good as secondary indexes are useful. But it’s important to understand the effect they will have on the overall concurrency of the system.

Slide30

The route out of this concurrency bottleneck is replication. You’ll probably be familiar with replication either from using async slave databases or from replicated NoSQL stores like Mongo or Cassandra.

In practice replicas can be invisible (used only for recovery), read only (adding read concurrency) or read write (adding partition tolerance). Which of these you choose will trade off against the consistency of the system. This is simply the application of CAP theorem (although cap theorem also may not be as simple as you think).

Slide31

This tradeoff with consistency* brings us to an important question. When does consistency matter?

Consistency is expensive. In the database world ACID is guaranteed by serialisability. This is essentially ensuring that all operations appear to occur in sequential order. It turns out to be a pretty expensive thing. In fact it’s prohibitive enough that many databases don’t offer it as an isolation level at all. Those that do never set it as the default.

Suffice to say that if you apply strong consistency to a system that does distributed writes you’ll likely end up in tortoise territory.

(* note the term consistency has two common usages. The C in ACID and the C in CAP. They are unfortunately not the same. I’m using the CAP definition: all nodes see the same data at the same time)

Slide33

The solution to this consistency problem is simple. Avoid it. If you can’t avoid it isolate it to as few writers and as few machines as possible.

Avoiding consistency issues is often quite easy, particularly if your data is an immutable stream of facts. A set of web logs is a good example. They have no consistency concerns as they are just facts that never change.

There are other use cases which do necessitate consistency though. Transferring money between accounts is an oft used example. Non-commutative actions such as applying discount codes is another.

But often things that appear to need consistency, in a traditional sense, may not. For example if an action can be changed from a mutation to a new set of associated facts we can avoid mutable state. Consider marking a transaction as being potentially fraudulent. We could update it directly with the new field. Alternatively we could simply use a separate stream of facts that links back to the original transaction.

Slide34

So in a data platform it’s useful to either remove the consistency requirement altogether, or at least isolate it. One way to isolate is to use the single writer principal, this gets you some of the way. Datomic is a good example of this. Another is to physically isolate the consistency requirement by splitting mutable and immutable worlds.

Approaches like Bloom/CALM extend this idea further by embracing the concept of disorder by default, imposing order only when necessary.

So those were some of the fundamental tradeoffs we need to consider. Now how to we pull these things together to build a data platform?

Slide35

A typical application architecture might look something like the below. We have a set of processes which write data to a database and read it back again. This is fine for many simple workloads. Many successful applications have been built with this pattern. But we know it works less well as throughput grows. In the application space this is a problem we might tackle with message-passing, actors, load balancing etc.

The other problem is this approach treats the database as a black box. Databases are clever software. They provide a huge wealth of features. But they provide little mechanism for scaling out of an ACID world. This is a good thing in many ways. We default to safety. But it can become an annoyance when scaling is inhibited by general guarantees which may be overkill for the requirements we have.

Slide36

The simplest route out of this is CQRS (Command Query Responsibility Segregation).

Another very simple idea. We separate read and write workloads. Writes go into something write-optimised. Something closer to a simple journal file. Reads come from something read-optimised. There are many ways to do this, be it tools like Goldengate for relational technologies or products that integrate replication internally such as Replica Sets in MongoDB.

Slide37

Many databases do something like this under the hood. Druid is a nice example. Druid is an open source, distributed, time-series, columnar analytics engine. Columnar storage works best if we input data in large blocks, as the data must be spread across many files. To get good write performance Druid stores recent data in a write optimised store. This is gradually ported over to the read optimised store over time.

When Druid is queried the query routes to both the write optimised and read optimised components. The results are combined (‘reduced’) and returned to the user. Druid uses time, marked on each record, to determine ordering.

Composite approaches like this provide the benefits of CQRS behind a single abstraction.

Slide38

Another similar approach is to use an Operational/Analytic Bridge. Read- and write-optimised views are separated using an event stream. The stream of state is retained indefinitely, so that the async views can be recomposed and augmented at a later date by replaying.

So the front section provides for synchronous reads and writes. This can be as simple as immediately reading data that was written or as complex as supporting ACID transactions.

The back end leverages asynchronicity, and the advantages of immutable state, to scale offline processing through replication, denormalisation or even completely different storage engines. The messaging-bridge, along with joining the two, allows applications to listen to the data flowing through the platform.

As a pattern this is well suited to mid-sized deployments where there is at least a partial, unavoidable requirement for a mutable view.

Slide39

If we are designing for an immutable world, it’s easier to embrace larger data sets and more complex analytics. The batch pipeline, one almost ubiquitously implemented with the Hadoop stack, is typical of this.

The beauty of the Hadoop stack comes from it’s plethora of tools. Whether you want fast read-write access, cheap storage, batch processing, high throughput messaging or tools for extracting, processing and analysing data, the Hadoop ecosystem has it all.

The batch pipeline architecture pulls data from pretty much any source, push or pull. Ingests it into HDFS then processes it to provide increasingly optimised versions of the original data. Data might be enriched, cleansed, denormalised, aggregated, moved to a read optimised format such as Parquet or loaded into a serving layer or data mart. Data can be queried and processed throughout this process.

This architecture works well for immutable data, ingested and processed in large volume. Think 100’s of TBs plus. The evolution of this architecture will be slow though. Straight-through timings are often measured in hours.

pipe

The problem with the Batch Pipeline is that we often don’t want to wait hours to get a result. A common solution is to add a streaming layer aside it. This is sometimes referred to as theLambda Architecture.

The Lambda Architecture retains a batch pipeline, like the one above, but it circumvents it with a fast streaming layer. It’s a bit like building a bypass around a busy town. The streaming layer typically uses a streaming processing tool such as Storm or Samza.

The key insight of the Lambda Architecture is that we’re often happy to have an approximate answer quickly, but we would like an accurate answer in the end.

So the streaming layer bypasses the batch layer providing the best answers it can within a streaming window. These are written to a serving layer. Later the batch pipeline computes an accurate data and overwrites the approximation.

This is a clever way to balance accuracy with responsiveness. Some implementations of this pattern suffer if the two branches end up being dual coded in stream and batch layers. But it is often possible to simply abstract this logic into common libraries that can be reused, particularly as much of this processing is often written in external libraries such as Python or R anyway. Alternatively systems like Spark provide both stream and batch functionality in one system (although the streams in Spark are really micro-batches).

So this pattern again suits high volume data platforms, say in the 100TB+ range, that want to combine streams with existing, rich, batch based analytic function.

Slide40

There is another approach to this problem of slow data pipelines. It’s sometimes termed theKappa architecture. I actually thought this name was ‘tongue in cheek’ but I’m now not so sure. Whichever it is, I’m going to use the term Stream Data Platform, which is a term in use also.

Stream Data Platform’s flip the batch pattern on its head. Rather than storing data in HDFS, and refining it with incremental batch jobs, the data is stored in a scale out messaging system, or log, such as Kafka. This becomes the system of record and the stream of data is processed in real time to create a set of tertiary views, indexes, serving layers or data marts.

This is broadly similar to the streaming layer of the Lambda architecture but with the batch layer removed. Obviously the requirement for this is that the messaging layer can store and vend very large volumes of data and there is a sufficiently powerful stream processor to handle the processing.

There is no free lunch so, for hard problems, Stream Data Platform’s will likely run no faster than an equivalent batch system, but switching the default approach from ‘store and process’ to ‘stream and process’ can provide greater opportunity for faster results.

Slide41

Finally, the Stream Data Platform approach can be applied to the problem of ‘application integration’. This is a thorny and difficult problem that has seen focus from big vendors such as Informatica, Tibco and Oracle for many years. For the most part results have been beneficial, but not transformative. Application integration remains a topic looking for a real workable solution.

Stream Data Platform’s provide an interesting potential solution to this problem. They take many of the benefits of an O/A bridge – the variety of asynchronous storage formats and ability to recreate views – but leave the consistency requirement isolated in, often existing sources:

sdp

With the system of record being a log it’s easy to enforce immutability. Products like Kafka can retain enough volume internally to be used as a historic record. This means recovery can be a process of replaying and regenerating state, rather than constantly checkpointing.

Similarly styled approaches have been taken before in a number of large institutions with tools such as Goldengate, porting data to enterprise data warehouses or more recently data lakes. They were often thwarted by a lack of throughput in the replication layer and the complexity of managing changing schemas. It seems unlikely the first problem will continue. As for the later problem though, the jury is still out.

~

So we started with locality. With sequential addressing for both reads and writes. This dominates the tradeoffs inside the components we use. We looked at scaling these components out, leveraging primitives for both sharding and replication. Finally we rebranded consistency as a problem we should isolate in the platforms we build.

But data platforms themselves are really about balancing the sweet-spots of these individual components within a single, holistic form. Incrementally restructuring. Migrating the write-optimised to the read-optimised. Moving from the constraints of consistency to the open plains of streamed, asynchronous, immutable state.

This must be done with a few things in mind. Schemas are one. Time, the peril of the distributed, asynchronous world, is another. But these problems are manageable if carefully addressed. Certainly the future is likely to include more of these things, particularly as tooling, innovated in the big data space, percolates into platforms that address broader problems, both old and new.

(via HighScalability.com)

How OpenCL Could Open the Gates for FPGAs

In this special guest feature from Scientific Computing World, Robert Roe explains how OpenCL may make FPGAs an attractive option.

Over the past few years, high-performance computing (HPC) has become used to heterogeneous hardware, principally mixing GPUs and CPUs, but now, with both major FPGA manufacturers in conformance with the OpenCL standard, the door is effectively open for the wider use of FPGAs in high-performance computing.In January 2015, FPGAs took a step closer to the mainstream of high-performance computing with the announcement that Xilinx’s development environment for systems and software engineers, SDAccel, had been certified as conforming to the OpenCL standard for parallel programming of heterogeneous systems.

The changing landscape of HPC, with the move towards data-centric computing, could favour FPGAs with very high I/O throughput. However, it remains to be seen if FPGAs will be used as an accelerator or if supercomputers might be built using FPGA as the main processor technology.

One of the attractions of FPGAs is that they consume very little power but, as with GPUs initially, the barrier to adoption has been the difficulty of programming them. Manufacturers and vendors are now releasing compilers that will optimise code written in C and C++ to make use of the flexible nature of FPGA architecture.

Easier to program

Mike Strickland, director of the computer and storage business unit at Altera said: “The problem was that we did not have the ease of use, we did not have a software-friendly interface back in 2008. The huge enabler here has been OpenCL.”

Larry Getman, VP of strategic marketing and planning at Xilinx said: ‘When FPGAs first started they could do very basic things such as Boolean algebra and it was really used for glue logic. Over the years, FPGAs have really advanced and evolved with more hardened structures which are much more specialised.’

Getman continued: ‘Over the years FPGAs have gone from being glue logic to harder things like radio head systems, that do a lot of DSP processing; very high-performance vision applications; wireless radio; medical equipment; and radar systems. So they are used in high-performance computing, but for applications that use very specialised algorithms.’

Getman concluded: ‘The reason people use FPGAs for these applications is simple, they offer a much higher level of performance per Watt than trying to run the same application in pure software code.’

FPGAs are programmable semiconductor devices that are based around a matrix of configurable logic blocks (CLBs) connected through programmable interconnects. This is where the FPGA gets the term ‘field programmable’ as an FPGA can be programmed and optimised for a specific application. Efficient programming can take advantage of the inherent parallelism of the FPGA architecture delivering a higher level of performance than accelerators that have a less flexible architecture.

Millions of threads running at the same time

Devadas Varma, senior director of software Research and Development at Xilinx said: ‘A CPU, if it is single core CPU, executes one instruction at a time and if you have four cores, eight cores, that are multithreaded then you can do eight or sixteen sets of instructions, for example. If you compare this to an FPGA, which is a blank set of millions of components that you decide to interconnect, theoretically speaking you could have thousands or even millions of threads running at the same time.’

Reuven Weintraub, founder and chief technology officer at Gidel, highlighted the differences between FPGAs and the processors used in CPUs today. He said: ‘They are the same and they are different. They are the same from the perspective that both of them are programmable. The difference is coming from the fact that in the FPGA all the instructions would run in parallel. Actually the FPGA is not a processor; it is compiled to be a dedicated set of hardware components according to the requirements of the algorithm – that is what gives it the efficiency, power savings and so on.’

Traditionally this power efficiency, scalability, and flexible architecture came at the price of more complex programming: code needed to address the hardware and the flow of data across the various components, in addition to providing the basic instruction set to be computed in the logic blocks. However, major FPGA manufacturers Altera and Xilinx have both been working on their own OpenCL based solutions which have the potential to make FPGA acceleration a real possibility for more general HPC workloads.

Development toolkits

Xilinx has recently released SDAccel, a development environment that includes software tools including its own compiler, tools for code development, profiling, and debugging, and provides a GPU-like work environment. Getman said: ‘Our goal is to make an FPGA as easy to program as a GPU. SDAccel, which is OpenCL based, does allow people to program in OpenCL and C or C++ and they can now target the FPGA at a very high level.’

In addition, SDAccel provides functionality to swap multiple kernels in and out of the FPGA without disrupting the interface between the server CPU and the FPGA. This could be a key enabler of FPGAs in real-world data centres where turning off some of your resources while you re-optimise them for the next application is not an economically viable strategy at present.

Altera has been working closely with the Khronos group, which oversees a number of open computing standards including OpenCL, OpenGL, and WebGL. Altera released a development toolkit, Altera’s SDK for OpenCL, in May 2013. Strickland said: ‘In May 2013 we achieved a very important conformance test with the standards body – the Khronos group – that manages OpenCL. We had to pass 8,000 tests and that really strengthened the credibility of what we are doing with the FPGA.’

Strickland continued: ‘In the past, there were a lot of FPGA compiler tools that took care of the logic but not the data management. They could take lines of C and automatically generate lines of RTL but they did not take care of how that data would come from the CPU, the optimisation of external memory bandwidth off the FPGA, and that is a large amount of the work.’

Traditionally optimising algorithms to utilise fully the parallel architectures of FPGA technology involved significant experience using HDLs (hardware description languages) because they allowed programmers to write code that would address the FPGA at register-transfer level (RTL).

RTL enables programmers to describe the flow of data between hardware registers, and the logical operations performed on that data. This is typically what creates the difference in performance between more general processors and FPGAs, which can be optimised much more efficiently for a specific algorithm.

The difficulty is that that kind of coding requires expertise and can be very time consuming. Hand-coded RTL may go through several iterations as programmers test the most efficient ways to parallelise the instruction set to take advantage of the programmable hardware on the FPGA.

Strickland said: “With OpenCL or the OpenCL compiler, you still write something that is like C code that targets the FPGA. The big difference I would say is the instruction set. The big innovation has been the back end of our complier which can now take that C code and efficiently use the FPGA.”

Strickland noted that Altera’s compiler ‘does more than 200 optimisations when you write some C code. It is doing things like seeing the order in which you access memory so that it can group memory addresses together, improving the efficiency of that memory interface.’

Converting code from different languages into an RTL description has been possible for some time, but these developments in OpenCL make it much easier for programmers without extensive knowledge of HDLs, such as VHDL and Verilog, to make use of FPGAs.

However OpenCL is not the final piece of the puzzle for FPGA programming. Strickland said: ‘Over time you may want to have other high-level interfaces. There is a standard called SPIR (Standard Portable Intermediate Representation). The idea is that this allows you to kind of split up your compiler between the front end and the back end, enabling people to use different high-level language interfaces on the front end.’

Strickland continued: ‘In universities now there is research into domain-specific languages, so people are trying to accomplish a certain class of algorithms may benefit from having a higher level interface than even C. The idea behind exposing this intermediate compiler interface is you can now start working with the ecosystem to have front ends with higher-level interfaces.’

Over the past few years, there have been two ideas behind the best way to program FPGAs: high-level synthesis (HLS) or OpenCL. As OpenCL has matured, Xilinx decided to adopt the standard but to keep the work it had done developing HLS technology and integrate that into the development environment conforming to the OpenCL standard.

Getman said: “The main problem is that C is very much designed to go cycle to cycle, step by step. Unfortunately hardware doesn’t. Hardware has a lot of things running at the same time.” This aspect was what made HLS attractive as a compiler that can take OpenCL, C or C++ and architecturally optimise it for the FPGA hardware.

Xilinx acquired AutoESL and its HLS tool AutoPilot in 2011 and began integrating it into its own development tools for FPGAs. Getman said: ‘That was really the big switching point. For many years, people had been promising really great results with HLS but in reality the results were a lot bigger and a lot slower than what could have been done by hand.’

Getman continued: ‘We have integrated this technology into our tools and added a lot to it. This is really one of the big differentiators from our competition, even though we both have OpenCL support. This technology allows our users the opportunity to create their own libraries in real-time using C, C++ or OpenCL, rather than have to wait for the vendor to create specific libraries or specific algorithms for them.

Varma said: “The silver bullet in HLS is the ability to take a sequential description that has been written in C and then find this parallelism, the concurrencies, without the user having to think. That was a necessary technology before we could do anything. It has been adopted by thousands of users already as a standalone technology, but what we do is embed that technology inside OpenCL compilers so that now it can be utilised in full software mode and it is fully compatible with OpenCL.”

Getman said: “We consciously made a switch over the last few years to expand our customer base by both continuing technology development for our traditional users as well as expand our tool flow to cater to software coders.”

A key facet of this technology is that Xilinx is letting programmers take the work they have done in C and port it over to OpenCL using the technology from HLS that is now integrated into its compilers. Varma said: ‘One thing that changes when you go from software to hardware programming is that C programmers, OpenCL programmers, are used to dealing with a lot of libraries. They do not have to write matrix multiplications or filters or those kinds of things, because they are always available as library elements. Now hardware languages often have libraries, but they are very specific implementations that you cannot just change for your use.’

Varma concluded: “By writing in C, our HLS technology can re-compile that very efficiently and immediately. This gives you a tremendous capability.”

Coprocessor or something bigger?

FPGA manufacturers like Altera and Xilinx have been focusing their attention on using FPGAs in HPC as coprocessors or accelerators that would be used in much the same way as GPUs.

Getman said: “The biggest use model is really processor plus FPGA. The reason for that is there are still things that you want to run on a processor. You really want a processor to do what it is good at. Typically an FPGA will be used through something like a PCIE slot and it will be used as an acceleration engine for the things that are really difficult for the processor.”

This view was shared by Devadas Varma who highlighted some of the functionality in an earlier release of OpenCL that increased the potential for CPU/GPU/FPGA synergy.

Varma said: ‘The tool we have developed supports OpenCL 1.2 and importantly it can co-exist with CPUs and GPUs. In fact in our upcoming release we will support partitioning workloads into GPUs, we already support this feature regarding CPUs. That is definitely where we are heading.’

However this was not a view shared by Reuven Weintraub, at Gidel, who felt that to regard an FPGA simply as a coprocessor was to miss much of the point and many of the advantages that FPGAs could offer to computing. Weintraub said: “For me a coprocessor is like the H87 was, you make certain lines of code in the processor and then you say “there’s a line of code for you” and it returns and this goes back and forth. The big advantage of running with the FPGA is that the FPGA can have a lot of pipelining inside of it, solve a lot of things and have a lot of memory.”

He explained that an FPGA contains a ‘huge array of registers that are immediately available’ by taking advantage of the on-board memory and high-throughput that FPGAs can handle, meaning that ‘you do not necessarily have to use the cache because the data is being moved in and out in the correct order.’

Weintraub concluded: “Therefore it is better to give a task to the FPGA rather than giving just a few up codes and the going back and forth. It is more task oriented. Computing is a balance between the processing, memory access, networking and storage, but everything has to be balanced. If you want to utilize a good FPGA then you need to give it a task that makes use of its internal memory so that it can move things from one job to another.”

Gidel has considerable experience in this field. Gidel provided the FPGAs for the Novo-G supercomputer, housed at the University of Florida, the largest re-configurable supercomputer available for research.

The university is a lead partner in the ‘Center for High-Performance Reconfigurable Computing’ (CHREC), a US national research centre funded by the National Science Foundation.

In development at the UF site since 2009, Novo-G features 192, 40nm FPGAs (Altera Stratix-IV E530) and 192, 65nm FPGAs (Stratix-III E260).

These 384 FPGAs are housed in 96 quad-FPGA boards (Gidel ProcStar-IV and ProcStar-III) and supported by quad-core Nehalem Xeon processors, GTX-480 GPUs, 20Gb/s non-blocking InfiniBand, GigE, and approximately 3TB of total RAM, most of it directly attached to the FPGAs. An upgrade is underway to add 32 top-end, 28nm FPGAs (Stratix-V GSD8) to the system.

According to the article ‘Novo-G: At the Forefront of Scalable Reconfigurable Supercomputing’ written by Alan George, Herman Lam, and Greg Stitt, three researchers from the university, Novo-G achieved speeds rivaling the largest conventional supercomputers in existence – yet at a fraction of their size, energy, and cost.

But although processing speed and energy efficiency were important, they concluded that the principal impact of a reconfigurable supercomputer like Novo-G was the freedom that its innovative design can give to scientists to conduct more types of analysis, and examine larger datasets.

The potential is there.

(via InsideHPC.com)

HTTP 2.0 is coming!

Why we need another version of HTTP protocol?

HTTP has been in use by the World-Wide Web global information initiative since 1990. However, it is December 2014 and we don’t have anymore simple pages with cross linked HTML documents as it used to be. Instead, we have Web applications, some of them very heavy and requiring a lot of resources. And unfortunately, the version of the HTTP protocol currently used – 1.1, has issues.

HTTP is actually very simple – browser sends request the server, server provides the response and that is it. Very simple, but if you check the chart below you’ll see that there is not only one request and one response, but multiple requests and responses – about 80 – 100 and 1.8MB of data:

chart

Data provided by HTTP Archive.

Now, imagine we have a server in Los Angeles and our client is in Berlin, Germany. All those 80-100 requests should travel from Berlin to L.A. and then get back. That is not fast – for example, the roundtrip time between London and New York is about 56 ms. From Berlin to Los Angeles it is even more. And as we know, first page load is latency bound; latency is the constraining factor for today’s applications.

In order to speed up downloading the needed resources, browsers currently open multiple connections to the server (typically 6 per domain). However, opening a connection is expensive – there is domain name resolution, socket connect, more roundtrips if TLS should be established and so on. From browser point of view, this also means more consumed memory, connections management, using heuristics when to open a new connection and when to reuse existing one and so on.

Web engineers also tried to make sites loading as fast as possible. They invented many different workarounds (aka “optimizations”) like: image sprites, domain sharding, resource inlining, concatenating files, combo services and so on. Inventing more and more tricks may work to some point but what if we were able to fix the issues on the protocol level and avoid these workarounds?

HTTP/2 in a nutshell

HTTP/2 is binary protocol, where browser and server exchange frames. Each frame belongs to a stream via identifier. The key point is that these streams are multiplexed, they have priorities, priorities are specified by the client (browser) and they can be changed runtime. A stream can depend on another stream.

In contrary to HTTP 1.1, in HTTP/2 the headers are compressed. A special algorithm was invented for that purpose and it is called HPACK.

Server push is a feature of HTTP/2 which deserves special attention. Web Developers actually implemented the same idea for years – the mentioned above inlining of resources in the page is an example of that. Since this is on protocol level, instead of embedding CSS and JavaScript files or images directly on the page, server can explicitly push these resources to the browser in relation with a previously made request.

How does an HTTP/2 connection look like?

connection

Image by Ilya Grigorik

In this example we have three streams:

  1. Client initiated request of page.html
  2. Stream, which carries script.js – the server initiated this stream, since it knows already what is the content of page.html and that script.js will be requested from the browser as soon as it parses the HTML.
  3. Stream, which carries style.css – initiated by the server, since this CSS is considered as critical and it will be required from the browser as soon as it parses the HTML file.

HTTP/2 is huge step forward. It is currently on its Draft-16 and the final specification will be ready very soon.

Optimizing the Web stack for HTTP/2 era

Does the above mean that we should discard all optimizations we were doing for years to make our Web applications as fast as possible? Of course not! This just means we have to forget about some of them and start applying others. For example, we still should send as less data as possible from the server to the client, to deal with caching and store files offline. In general, we can split the needed optimizations in two parts:

Optimize the content being served to the browser:

  • Minimizing JavaScript, CSS and HTML files
  • Removing redundant data from images
  • Optimize Critical Path CSS
  • Removing the CSS which is not needed on the page using tools like UnCSS before to send the page to the server
  • Properly specifying ETag to the files and setting far future expires headers
  • Using HTML 5 offline to store already downloaded files and minimize traffic on the next page load

Optimize the server and TCP stack:

  • Check your server and be sure the value of TCP’s Initial Congestion Window (initial cwnd) is 10 segments (IW10). If you use GNU/Linux, just upgrade to 3.2+ to get this feature and another important update – Proportional Rate Reduction for TCP
  • Disable Slow-Start Restart after idle
  • Check and enable if needed Window Scaling
  • Consider to use TCP Fast Open (TFO)

(for more information check the wonderful book “High Performance Browser Networking” by Ilya Grigorik)

We may consider to remove the following “optimizations”:

  • Joining files – nowadays many companies are striking for continues deployment which makes this challenging – a single line of code change invalidates the whole bundle. Also, it forces browser to wait until the whole file arrives before to start processing it
  • Domain sharding – loading resources from different domains in order to avoid browser’s limit of connections per domain (usually 6) is the first “optimization” to be removed. It causes retransmissions and unnecessary latency
  • Resource inlining – prevents caching and inflates the document in which they are being stored. Instead, consider to leave CSS, JavaScript and images as external resources
  • Image sprites – the problem with cache invalidation is present here too. Apart from that, image sprites force browser to consume more CPU and memory during the process of decoding the the entire sprite
  • Using cookie free domains

The new modules API from ES6 and HTTP/2

For years we were splitting our JavaScript code in modules, stored in different files. Since JavaScript did not provide module system prior to version 6, the community invented two different main formats – AMD and CommonJS. Of course, custom formats, like those used by YUI Loader existed too. In ECMAScript 6 however we have a brand new way of creating modules. The API for loading them looks like this:

Declarative syntax:

import {foo, bar} from 'file.js';

Programmatic loader API:

System.import('my_module') .then(my_module => {
  // ...
})
.catch(error => {
  // ...
});

Imagine this module has 10 different dependencies, each of them stored in separate file.
If we don’t change anything on build time, then the browser will request the file, which contains the main module, and then it will make many additional requests in order to download all dependencies.
Since we have HTTP/2 now the browser won’t open multiple connections. However, in order to process the main module, browser still has to wait for all dependencies to be downloaded over the network. This means – download one file, parse it, then oh – it requires another module, download again the required file and so on, until all dependencies are being resolved.

One possible fix of the above issue could be to change this on build time. This means, you may concatenate all modules in one file and then overwrite the originally specified import statements to look for modules in that joined file. However, this has drawbacks and they are the same as if we were joining files for the HTTP 1.1 era.

Another fix, which may be considered is to leverage HTTP/2 push promises. In the example above this means you may try to push the dependencies when the main module is being requested. If the browser already has them then it may abort (reset) the stream by sending RST_STREAM frame.

Patrick Meenan however pointed me to a very interesting issue – on practice browser may not be able to abort the stream quickly enough. By the time the pushed resources hit the client and are validated against the cache, the entire resource will already be in buffers (on the network and in the server) and the whole file will be downloaded anyway. It will work if we can be sure that the resources aren’t in the browser cache, otherwise we will end up sending them anyway. This is an interesting point for further research.

HTTP/2 implementations

You may start playing with HTTP/2 today. There are many server implementations – grab one and start experimenting.

The main browser vendors support HTTP/2 already:

  • Internet Explorer supports HTTP/2 from IE 11 on Windows 10 Technical Preview,
  • Firefox has enabled HTTP/2 by default in version 34 and
  • current version of Chrome supports HTTP/2, but it is not enabled by default. It may be enabled by adding a command line flag --enable-spdy4 when Chrome is being launched or via chrome://flags.

Currently only HTTP/2 over TLS is implemented in all browsers.

Other interesting protocols to keep an eye on

QUIC is a another protocol, started by Google as natural extension of the research on protocols like SPDY and HTTP/2. Here I won’t give many details, it is enough to mention that it has all features of SPDY and HTTP/2, but it is built on top of UDP. The goal is to avoid head-of-line blocking like in SPDY or HTTP/2 and to establish a connection much faster than TCP+TLS is capable to do.

For more information about HTTP/2 and QUIC, please watch my JSConfEU talk.

(Via Calendar.perfplanet.com)