Elements Of Scale: Composing And Scaling Data Platforms

17330600816_c0b875ba8a_n

This is a guest repost of Ben Stopford‘s epic post on Elements of Scale: Composing and Scaling Data Platforms. A masterful tour through the evolutionary forces that shape how systems adapt to key challenges.

As software engineers we are inevitably affected by the tools we surround ourselves with. Languages, frameworks, even processes all act to shape the software we build.

Likewise databases, which have trodden a very specific path, inevitably affect the way we treat mutability and share state in our applications.

Over the last decade we’ve explored what the world might look like had we taken a different path. Small open source projects try out different ideas. These grow. They are composed with others. The platforms that result utilise suites of tools, with each component often leveraging some fundamental hardware or systemic efficiency. The result, platforms that solve problems too unwieldy or too specific to work within any single tool.

So today’s data platforms range greatly in complexity. From simple caching layers or polyglotic persistence right through to wholly integrated data pipelines. There are many paths. They go to many different places. In some of these places at least, nice things are found.

So the aim for this talk is to explain how and why some of these popular approaches work. We’ll do this by first considering the building blocks from which they are composed. These are the intuitions we’ll need to pull together the bigger stuff later on.

Slide04

 In a somewhat abstract sense, when we’re dealing with data, we’re really just arranging locality. Locality to the CPU. Locality to the other data we need. Accessing data sequentially is an important component of this. Computers are just good at sequential operations. Sequential operations can be predicted.

If you’re taking data from disk sequentially it’ll be pre-fetched into the disk buffer, the page cache and the different levels of CPU caching. This has a significant effect on performance. But it does little to help the addressing of data at random, be it in main memory, on disk or over the network. In fact pre-fetching actually hinders random workloads as the various caches and frontside bus fill with data which is unlikely to be used.

Slide05

Whilst disk is somewhat renowned for its slow performance, main memory is often assumed to simply be fast. This is not as ubiquitously true as people often think. There are one to two orders of magnitude between random and sequential main memory workloads. Use a language that manages memory for you and things generally get a whole lot worse.

Streaming data sequentially from disk can actually outperform randomly addressed main memory. So disk may not always be quite the tortoise we think it is, at least not if we can arrange sequential access. SSD’s, particularly those that utilise PCIe, further complicate the picture as they demonstrate different tradeoffs, but the caching benefits of the two access patterns remain, regardless.

Slide06

So lets imagine, as a simple thought experiment, that we want to create a very simple database. We’ll start with the basics: a file.

We want to keep writes and reads sequential, as it works well with the hardware. We can append writes to the end of the file efficiently. We can read by scanning the the file in its entirety. Any processing we wish to do can happen as the data streams through the CPU. We might filter, aggregate or even do something more complex. The world is our oyster!

read

So what about data that changes, updates etc?

We have two options. We could update the value in place. We’d need to use fixed width fields for this, but that’s ok for our little thought experiment. But update in place would mean random IO. We know that’s not good for performance.

Alternatively we could just append updates to the end of the file and deal with the superseded values when we read it back.

So we have our first tradeoff. Append to a ‘journal’ or ‘log’, and reap the benefits of sequential access. Alternatively if we use update in place we’ll be back to 300 or so writes per second, assuming we actually flush through to the underlying media.

Slide11

Now in practice of course reading the file, in its entirety, can be pretty slow. We’ll only need to get into GB’s of data and the fastest disks will take seconds. This is what a database does when it ends up table scanning.

Also we often want something more specific, say customers named “bob”, so scanning the whole file would be overkill. We need an index.

Slide13

Now there are lots of different types of indexes we could use. The simplest would be an ordered array of fixed-width values, in this case customer names, held with the corresponding offsets in the heap file. The ordered array could be searched with binary search. We could also of course use some form of tree, bitmap index, hash index, term index etc. Here we’re picturing a tree.

The thing with indexes like this is that they impose an overarching structure. The values are deliberately ordered so we can access them quickly when we want to do a read. The problem with the overarching structure is that it necessitates random writes as data flows in. So our wonderful, write optimised, append only file must be augmented by writes that scatter-gun the filesystem. This is going to slow us down.

Slide14

Anyone who has put lots of indexes on a database table will be familiar with this problem. If we are using a regular rotating hard drive, we might run 1,000s of times slower if we maintain disk integrity of an index in this way.

Luckily there are a few ways around this problem. Here we are going to discuss three. These represent three extremes, and they are in truth simplifications of the real world, but the concepts are useful when we consider larger compositions.

Slide15

Our first option is simply to place the index in main memory. This will compartmentalise the problem of random writes to RAM. The heap file stays on disk.

This is a simple and effective solution to our random writes problem. It is also one used by many real databases. MongoDB, Cassandra, Riak and many others use this type of optimisation. Often memory mapped files are used.

However, this strategy breaks down if we have far more data than we have main memory. This is particularly noticeable where there are lots of small objects. Our index would get very large. Thus our storage becomes bounded by the amount of main memory we have available. For many tasks this is fine, but if we have very large quantities of data this can be a burden.

A popular solution is to move away from having a single ‘overarching’ index. Instead we use a collection of smaller ones.

Slide16

This is a simple idea. We batch up writes in main memory, as they come in. Once we have sufficient – say a few MB’s – we sort them and write them to disk as an individual mini-index. What we end up with is a chronology of small, immutable index files.

So what was the point of doing that? Our set of immutable files can be streamed sequentially. This brings us back to a world of fast writes, without us needing to keep the whole index in memory. Nice!

Of course there is a downside to this approach too. When we read, we have to consult the many small indexes individually. So all we have really done is shift the problem of RandomIO from writes onto reads. However this turns out to be a pretty good tradeoff in many cases. It’s easier to optimise random reads than it is to optimise random writes.

Keeping a small meta-index in memory or using a Bloom Filter provides a low-memory way of evaluating whether individual index files need to be consulted during a read operation. This gives us almost the same read performance as we’d get with a single overarching index whilst retaining fast, sequential writes.

In reality we will need to purge orphaned updates occasionally too, but that can be done with nice sequentially reads and writes.

Slide17

What we have created is termed a Log Structured Merge Tree. A storage approach used in a lot of big data tools such as HBase, Cassandra, Google’s BigTable and many others. It balances write and read performance with comparatively small memory overhead.

Slide18

So we can get around the ‘random-write penalty’ by storing our indexes in memory or, alternatively, using a write-optimised index structure like LSM. There is a third approach though. Pure brute force.

Think back to our original example of the file. We could read it in its entirety. This gave us many options in terms of how we go about processing the data within it. The brute force approach is simply to hold data by column rather than by row and stream only the columns required for a query, in their entirety, through the CPU. This approach is termed Columnar or Column Oriented.

(It should be noted that there is an unfortunate nomenclature clash between true column stores and those that follow the Big Table pattern. Whilst they share some similarities, in practice they are quite different. It is wise to consider them as different things.)

Slide19

Column Orientation is another simple idea. Instead of storing data as a set of rows, appended to a single file, we split each row by column. We then store each column in a separate file.

We keep the order of the files the same, so row N has the same position (offset) in each column file. This is important because we will need to read multiple columns to service a single query, all at the same time. This means ‘joining’ columns on the fly. If the columns are in the same order we can do this in a tight loop which is very cache- and cpu-efficient. Many implementations make heavy use of vectorisation to further optimise throughput for simple join and filter operations.

Writes can leverage the benefit of being append-only. The downside is that we now have many files to update, one for every column in every individual write to the database. The most common solution to this is to batch writes in a similar way to the one used in the LSM approach above. Many columnar databases also impose an overall order to the table as a whole to increase their read performance for one chosen key.

Slide20

By splitting data by column we significantly reduce the amount of data that needs to be brought from disk, so long as our query operates on a subset of all columns.

In addition to this, data in a single column generally compresses well. We can take advantage of the data type of the column to do this, if we have knowledge of it. This means we can often use efficient, low cost encodings such as run-length, delta, bit-packed etc. For some encodings predicates can be used directly on the uncompressed stream too.

The result is a brute force approach that will work particularly well for operations that require large scans. Aggregate functions like average, max, min, group by etc are typical of this.

This is very different to using the ‘heap file & index’ approach we covered earlier. A good way to understand this is to ask yourself: what is the difference between a columnar approach like this vs a ‘heap & index’ where indexes are added to every field?

merge

The answer to this lies in the ordering of the index files. BTrees etc will be ordered by the fields they index. Joining the data in two indexes involves a streaming operation on one side, but on the other side the index lookups have to read random positions in the second index. This is generally less efficient than joining two indexes (columns) that retain the same ordering. Again we’re leveraging sequential access.

Slide21

So many of the best technologies which we may want to use as components in a data platform will leverage one of these core efficiencies to excel for a certain set of workloads.

Storing indexes in memory, over a heap file, is favoured by many NoSQL stores such as Riak, Couchbase or MongoDB as well as some relational databases. It’s a simple model that works well.

Tools designed to work with larger data sets tend to take the LSM approach. This gives them fast ingestion as well as good read performance using disk based structures. HBase, Cassandra, RocksDB, LevelDB and even Mongo now support this approach.

Column-per-file engines are used heavily in MPP databases like Redshift or Vertica as well as in the Hadoop stack using Parquet. These are engines for data crunching problems that require large traversals. Aggregation is the home ground for these tools.

Other products like Kafka apply the use of a simple, hardware efficient contract to messaging. Messaging, at it’s simplest, is just appending to a file, or reading from a predefined offset. You read messages from an offset. You go away. You come back. You read from the offset you previously finished at. All nice sequential IO.

This is different to most message oriented middleware. Specifications like JMS and AMQP require the addition of indexes like the ones discussed above, to manage selectors and session information. This means they often end up performing more like a database than a file. Jim Gray made this point famously back in his 1995 publication Queue’s are Databases.

So all these approaches favour one tradeoff or other, often keeping things simple, and hardware sympathetic, as a means of scaling.

four

So we’ve covered some of the core approaches to storage engines. In truth we made some simplifications. The real world is a little more complex. But the concepts are useful nonetheless.

Scaling a data platform is more than just storage engines though. We need to consider parallelism.

Slide27

When distributing data over many machines we have two core primitives to play with: partitioning and replication. Partitioning, sometimes called sharding, works well both for random access and brute force workloads.

If a hash-based partitioning model is used the data will be spread across a number of machines using a well-known hash function. This is similar to the way a hash table works, with each bucket being held on a different machine.

The result is that any value can be read by going directly to the machine that contains the data, via the hash function. This pattern is wonderfully scalable and is the only pattern that shows linear scalability as the number of client requests increases. Requests are isolated to a single machine. Each one will be served by just a single machine in the cluster.

Slide28

We can also use partitioning to provide parallelism over batch computations, for example aggregate functions or more complex algorithms such as those we might use for clustering or machine learning. The key difference is that we exercise all machines at the same time, in a broadcast manner. This allows us to solve a large computational problem in a much shorter time, using a divide and conquer approach.

Batch systems work well for large problems, but provide little concurrency as they tend to exhaust the resources on the cluster when they execute.

Slide29

So the two extremes are pretty simple: directed access at one end, broadcast, divide and conquer at the other. Where we need to be careful is in the middle ground that lies between the two. A good example of this is the use of secondary indexes in NoSQL stores that span many machines.

A secondary index is an index that isn’t on the primary key. This means the data will not be partitioned by the values in the index. Directed routing via a hash function is no longer an option. We have to broadcast requests to all machines. This limits concurrency. Every node must be involved in every query.

For this reason many key value stores have resisted the temptation to add secondary indexes, despite their obvious use. HBase and Voldemort are examples of this. But many others do expose them, MongoDB, Cassandra, Riak etc. This is good as secondary indexes are useful. But it’s important to understand the effect they will have on the overall concurrency of the system.

Slide30

The route out of this concurrency bottleneck is replication. You’ll probably be familiar with replication either from using async slave databases or from replicated NoSQL stores like Mongo or Cassandra.

In practice replicas can be invisible (used only for recovery), read only (adding read concurrency) or read write (adding partition tolerance). Which of these you choose will trade off against the consistency of the system. This is simply the application of CAP theorem (although cap theorem also may not be as simple as you think).

Slide31

This tradeoff with consistency* brings us to an important question. When does consistency matter?

Consistency is expensive. In the database world ACID is guaranteed by serialisability. This is essentially ensuring that all operations appear to occur in sequential order. It turns out to be a pretty expensive thing. In fact it’s prohibitive enough that many databases don’t offer it as an isolation level at all. Those that do never set it as the default.

Suffice to say that if you apply strong consistency to a system that does distributed writes you’ll likely end up in tortoise territory.

(* note the term consistency has two common usages. The C in ACID and the C in CAP. They are unfortunately not the same. I’m using the CAP definition: all nodes see the same data at the same time)

Slide33

The solution to this consistency problem is simple. Avoid it. If you can’t avoid it isolate it to as few writers and as few machines as possible.

Avoiding consistency issues is often quite easy, particularly if your data is an immutable stream of facts. A set of web logs is a good example. They have no consistency concerns as they are just facts that never change.

There are other use cases which do necessitate consistency though. Transferring money between accounts is an oft used example. Non-commutative actions such as applying discount codes is another.

But often things that appear to need consistency, in a traditional sense, may not. For example if an action can be changed from a mutation to a new set of associated facts we can avoid mutable state. Consider marking a transaction as being potentially fraudulent. We could update it directly with the new field. Alternatively we could simply use a separate stream of facts that links back to the original transaction.

Slide34

So in a data platform it’s useful to either remove the consistency requirement altogether, or at least isolate it. One way to isolate is to use the single writer principal, this gets you some of the way. Datomic is a good example of this. Another is to physically isolate the consistency requirement by splitting mutable and immutable worlds.

Approaches like Bloom/CALM extend this idea further by embracing the concept of disorder by default, imposing order only when necessary.

So those were some of the fundamental tradeoffs we need to consider. Now how to we pull these things together to build a data platform?

Slide35

A typical application architecture might look something like the below. We have a set of processes which write data to a database and read it back again. This is fine for many simple workloads. Many successful applications have been built with this pattern. But we know it works less well as throughput grows. In the application space this is a problem we might tackle with message-passing, actors, load balancing etc.

The other problem is this approach treats the database as a black box. Databases are clever software. They provide a huge wealth of features. But they provide little mechanism for scaling out of an ACID world. This is a good thing in many ways. We default to safety. But it can become an annoyance when scaling is inhibited by general guarantees which may be overkill for the requirements we have.

Slide36

The simplest route out of this is CQRS (Command Query Responsibility Segregation).

Another very simple idea. We separate read and write workloads. Writes go into something write-optimised. Something closer to a simple journal file. Reads come from something read-optimised. There are many ways to do this, be it tools like Goldengate for relational technologies or products that integrate replication internally such as Replica Sets in MongoDB.

Slide37

Many databases do something like this under the hood. Druid is a nice example. Druid is an open source, distributed, time-series, columnar analytics engine. Columnar storage works best if we input data in large blocks, as the data must be spread across many files. To get good write performance Druid stores recent data in a write optimised store. This is gradually ported over to the read optimised store over time.

When Druid is queried the query routes to both the write optimised and read optimised components. The results are combined (‘reduced’) and returned to the user. Druid uses time, marked on each record, to determine ordering.

Composite approaches like this provide the benefits of CQRS behind a single abstraction.

Slide38

Another similar approach is to use an Operational/Analytic Bridge. Read- and write-optimised views are separated using an event stream. The stream of state is retained indefinitely, so that the async views can be recomposed and augmented at a later date by replaying.

So the front section provides for synchronous reads and writes. This can be as simple as immediately reading data that was written or as complex as supporting ACID transactions.

The back end leverages asynchronicity, and the advantages of immutable state, to scale offline processing through replication, denormalisation or even completely different storage engines. The messaging-bridge, along with joining the two, allows applications to listen to the data flowing through the platform.

As a pattern this is well suited to mid-sized deployments where there is at least a partial, unavoidable requirement for a mutable view.

Slide39

If we are designing for an immutable world, it’s easier to embrace larger data sets and more complex analytics. The batch pipeline, one almost ubiquitously implemented with the Hadoop stack, is typical of this.

The beauty of the Hadoop stack comes from it’s plethora of tools. Whether you want fast read-write access, cheap storage, batch processing, high throughput messaging or tools for extracting, processing and analysing data, the Hadoop ecosystem has it all.

The batch pipeline architecture pulls data from pretty much any source, push or pull. Ingests it into HDFS then processes it to provide increasingly optimised versions of the original data. Data might be enriched, cleansed, denormalised, aggregated, moved to a read optimised format such as Parquet or loaded into a serving layer or data mart. Data can be queried and processed throughout this process.

This architecture works well for immutable data, ingested and processed in large volume. Think 100’s of TBs plus. The evolution of this architecture will be slow though. Straight-through timings are often measured in hours.

pipe

The problem with the Batch Pipeline is that we often don’t want to wait hours to get a result. A common solution is to add a streaming layer aside it. This is sometimes referred to as theLambda Architecture.

The Lambda Architecture retains a batch pipeline, like the one above, but it circumvents it with a fast streaming layer. It’s a bit like building a bypass around a busy town. The streaming layer typically uses a streaming processing tool such as Storm or Samza.

The key insight of the Lambda Architecture is that we’re often happy to have an approximate answer quickly, but we would like an accurate answer in the end.

So the streaming layer bypasses the batch layer providing the best answers it can within a streaming window. These are written to a serving layer. Later the batch pipeline computes an accurate data and overwrites the approximation.

This is a clever way to balance accuracy with responsiveness. Some implementations of this pattern suffer if the two branches end up being dual coded in stream and batch layers. But it is often possible to simply abstract this logic into common libraries that can be reused, particularly as much of this processing is often written in external libraries such as Python or R anyway. Alternatively systems like Spark provide both stream and batch functionality in one system (although the streams in Spark are really micro-batches).

So this pattern again suits high volume data platforms, say in the 100TB+ range, that want to combine streams with existing, rich, batch based analytic function.

Slide40

There is another approach to this problem of slow data pipelines. It’s sometimes termed theKappa architecture. I actually thought this name was ‘tongue in cheek’ but I’m now not so sure. Whichever it is, I’m going to use the term Stream Data Platform, which is a term in use also.

Stream Data Platform’s flip the batch pattern on its head. Rather than storing data in HDFS, and refining it with incremental batch jobs, the data is stored in a scale out messaging system, or log, such as Kafka. This becomes the system of record and the stream of data is processed in real time to create a set of tertiary views, indexes, serving layers or data marts.

This is broadly similar to the streaming layer of the Lambda architecture but with the batch layer removed. Obviously the requirement for this is that the messaging layer can store and vend very large volumes of data and there is a sufficiently powerful stream processor to handle the processing.

There is no free lunch so, for hard problems, Stream Data Platform’s will likely run no faster than an equivalent batch system, but switching the default approach from ‘store and process’ to ‘stream and process’ can provide greater opportunity for faster results.

Slide41

Finally, the Stream Data Platform approach can be applied to the problem of ‘application integration’. This is a thorny and difficult problem that has seen focus from big vendors such as Informatica, Tibco and Oracle for many years. For the most part results have been beneficial, but not transformative. Application integration remains a topic looking for a real workable solution.

Stream Data Platform’s provide an interesting potential solution to this problem. They take many of the benefits of an O/A bridge – the variety of asynchronous storage formats and ability to recreate views – but leave the consistency requirement isolated in, often existing sources:

sdp

With the system of record being a log it’s easy to enforce immutability. Products like Kafka can retain enough volume internally to be used as a historic record. This means recovery can be a process of replaying and regenerating state, rather than constantly checkpointing.

Similarly styled approaches have been taken before in a number of large institutions with tools such as Goldengate, porting data to enterprise data warehouses or more recently data lakes. They were often thwarted by a lack of throughput in the replication layer and the complexity of managing changing schemas. It seems unlikely the first problem will continue. As for the later problem though, the jury is still out.

~

So we started with locality. With sequential addressing for both reads and writes. This dominates the tradeoffs inside the components we use. We looked at scaling these components out, leveraging primitives for both sharding and replication. Finally we rebranded consistency as a problem we should isolate in the platforms we build.

But data platforms themselves are really about balancing the sweet-spots of these individual components within a single, holistic form. Incrementally restructuring. Migrating the write-optimised to the read-optimised. Moving from the constraints of consistency to the open plains of streamed, asynchronous, immutable state.

This must be done with a few things in mind. Schemas are one. Time, the peril of the distributed, asynchronous world, is another. But these problems are manageable if carefully addressed. Certainly the future is likely to include more of these things, particularly as tooling, innovated in the big data space, percolates into platforms that address broader problems, both old and new.

(via HighScalability.com)

How And Why Swiftype Moved From EC2 To Real Hardware

16165342874_c49e5f0d37_m

This is a guest post by Oleksiy Kovyrin, Head of Technical Operations at Swiftype. Swiftype currently powers search on over 100,000 websites and serves more than 1 billion queries every month.

When Matt and Quin founded Swiftype in 2012, they chose to build the company’s infrastructure using Amazon Web Services. The cloud seemed like the best fit because it was easy to add new servers without managing hardware and there were no upfront costs.

Unfortunately, while some of the services (like Route53 and S3) ended up being really useful and incredibly stable for us, the decision to use EC2 created several major problems that plagued the team during our first year.

Swiftype’s customers demand exceptional performance and always-on availability and our ability to provide that is heavily dependent on how stable and reliable our basic infrastructure is. With Amazon we experienced networking issues, hanging VM instances, unpredictable performance degradation (probably due to noisy neighbors sharing our hardware, but there was no way to know) and numerous other problems. No matter what problems we experienced, Amazon always had the same solution: pay Amazon more money by purchasing redundant or higher-end services.

The more time we spent working around the problems with EC2, the less time we could spend developing new features for our customers. We knew it was possible to make our infrastructure work in the cloud, but the effort, time and resources it would take to do so was much greater than migrating away.

After a year of fighting the cloud, we made a decision to leave EC2 for real hardware. Fortunately, this no longer means buying your own servers and racking them up in a colo. Managed hosting providers facilitate a good balance of physical hardware, virtualized instances, and rapid provisioning. Given our previous experience with hosting providers, we made the decision to choose SoftLayer. Their excellent service and infrastructure quality, provisioning speed, and customer support made them the best choice for us.

After more than a month of hard work preparing the inter-data center migration, we were able to execute the transition with zero downtime and no negative impact on our customers.The migration to real hardware resulted in enormous improvements in service stability from day one, provided a huge (~2x) performance boost to all key infrastructure components, and reduced our monthly hosting bill by ~50%.

This article will explain how we planned for and implemented the migration process, detail the performance improvements we saw after the transition, and offer insight for younger companies about when it might make sense to do the same.

Preparing For The Switch

Before the migration, we had around 40 instances on Amazon EC2. We would experience a serious production issue (instance outage, networking issue, etc) at least 2-3 times a week, sometimes daily. Once we decided to move to real hardware, we knew we had our work cut out for us because we needed to switch data centers without bringing down the service. The preparation process involved two major steps, each of which has a dedicated explanation in their own sections below:

  1. Connecting EC2 and SoftLayer. First, we built a skeleton of our new infrastructure (the smallest subset of servers to be able to run all key production services with development-level load) in SoftLayer’s data center. Once the new data center was set up, we built a system of VPN tunnels between our old and our new data centers to ensure transparent network connectivity between components in both data centers.

  2. Architectural changes to our applications. Next, we needed to make changes to our applications to make them work both in the cloud and on our new infrastructure. Once the application could live in both data centers simultaneously, we built a data-replication pipeline to make sure both the cloud infrastructure and the SoftLayer deployment (databases, search indexes, etc) were always in-sync.

Step 1: Connecting EC2 And Softlayer

One of the first things we had to do to prepare for our migration was figure out how to connect our EC2 and our SoftLayer networks together. Unfortunately the “proper” way of connecting a set of EC2 servers to another private network – using the Virtual Private Cloud (VPC) feature of EC2 – was not an option for us since we could not convert our existing set of instances into a VPC without downtime. After some consideration and careful planning, we realized that the only servers that really needed to be able to connect to each other across the data center boundary were our MongoDB nodes. Everything else we could make data center-local (Redis clusters, search servers, application clusters, etc).

image00

Since the number of instances we needed to interconnect was relatively small, we implemented a very simple solution that proved to be stable and effective for our needs:

  • Each data center had a dedicated OpenVPN server deployed in it that NAT’ed all client traffic to its private network address.

  • Each node that needed to be able to connect to another data center would set up a VPN channel there and set up local routing to properly forward all connections directed at the other DC into that tunnel.

Here are some features that made this configuration very convenient for us:

  • Since we did not control network infrastructure on either side, we could not really force all servers on either end to funnel their traffic through a central router connected to the other DC. In our solution, each VPN server decided (with the help of some automation) which traffic to route through the tunnel to ensure complete inter-DC connectivity for all of its clients.

  • Even if a VPN tunnel collapsed (surprisingly, this only happened a few times during the weeks of the project), it would only mean one server lost its outgoing connectivity to the other DC (one node dropped out of MongoDB cluster, some worker server would lose connectivity to the central Resque box, etc). None of those one-off connectivity losses would affect our infrastructure since all important infrastructure components had redundant servers on both sides.

Step 2: Architectural Changes To Our Applications

There were many small changes we had to make in our infrastructure in the weeks of preparation for the migration, but having deep understanding of each and every component of it helped us make appropriate decisions reducing a chance of a disaster during the transitional period. I would argue that infrastructure of almost any complexity could be migrated with enough time and engineering resources to carefully consider each and every network connection established between applications and backend services.

image01

Here are the main steps we had to take to ensure smooth and transparent migration:

  • All stateless services (caches, application clusters, web layer) were independently deployed on each side.

  • For each stateful backend service (database, search cluster, async queues, etc) we had to consider if we wanted (or could afford to) replicate the data to the other side or if we had to incur inter-data center latency for all connections. Relying on the VPN was always considered the last resort option and eventually we were able to reduce the amount of traffic between data centers to a few small streams of replication (mostly MongoDB) and connections to primary/main copies of services that could not be replicated.

  • If a service could be replicated, we would do that and then make application servers always use or prefer the local copy of the service instead of going to the other side.

  • For services that we could not replicate with their internal replication capabilities (like our search backends) we made the changes in our application to implement replication between data centers where asynchronous workers on each side would pull the data from their respective queues and we would always write all asynchronous jobs into queues for both data centers.

Step 3: Flipping The Switch

When both sides were ready to serve 100% of our traffic, we prepared for the final switchover by reducing our DNS TTL down to a few seconds to ensure fast traffic change.

Finally, we switched traffic to the new data center. Requests switched to the new infrastructure with zero impact on our customers. Once traffic to EC2 had drained, we disabled the old data center and forwarded all remaining connections from the old infrastructure to the new one. DNS updates take time, so some residual traffic was visible on our old servers for at least a week after the cut-off time.

A Clear Improvement: Results After Moving From EC2 To Real Hardware

Stability improved. We went from 2-3 serious outages a week (most of these were not customer-visible, since we did our best to make the system resilient to failures, but many outages would wake someone up or force someone to abandon family time) down to 1-2 outages a month, which we were able to handle more thoroughly by spending engineering resources on increasing system resilience to failures and reducing a chance of them making any impact on our customer-visible availability.

Performance improved. Thanks to the modern hardware available from SoftLayer we have seen a consistent performance increase for all of our backend services (especially IO-bound ones like databases and search clusters, but for CPU-bound app servers as well) and, what is more important, the performance was much more predictable: no sudden dips or spikes unrelated to our own software’s activity. This allowed us to start working on real capacity planning instead of throwing more slow instances at all performance problems.

Costs decreased. Last, but certainly not least for a young startup, the monthly cost of our infrastructure dropped by at least 50%, which allowed us to over-provision some of the services to improve performance and stability even further, greatly benefiting our customers.

Provisioning flexibility improved, but provisioning time increased. We are now able to exactly specify servers to meet their workload (lots of disk doesn’t mean we need a powerful CPU). However, we can no longer start new servers in minutes with an API call. SoftLayer generally can add a new server to our fleet within 1-2 hours. This is a big trade-off for some companies, but it was one that works well for Swiftype.

Conclusion

Since switching to real hardware, we’ve grown considerably – our data and query volume is up 20x – but our API performance is better than ever. Knowing exactly how our servers will perform lets us plan for growth in a way we couldn’t before.

In our experience, the cloud may be a good idea when you need to rapidly spin up new hardware, but it only works well when you’re making a huge (Netflix-level) effort to survive in it. If your goal is to build a business from day one and you do not have spare engineering resources to spend on paying the “cloud tax”, using real hardware may be a much better idea.
(via HighScalability.com)

Web Server Load-Balancing with HAProxy on Ubuntu 14.04

What is HAProxy?

HAProxy(High Availability Proxy) is an open-source load-balancer which can load balance any TCP service. HAProxy is a free, very fast and reliable solution that offers load-balancing, high-availability, and proxying for TCP and HTTP-based applications. It is particularly well suited for very high traffic web sites and powers many of the world’s most visited ones.

Since it’s existence, it has become the de-facto standard open-source load-balancer. Although it does not advertise itself, but is used widely. Below is a basic diagram of how the setup looks like:
haproxy

Installing HAProxy

I am using Ubuntu 14.04 and install it by:

apt-get install haproxy

You can check the version by:

haproxy -v

We need to enable HAProxy to be started by the init script /etc/default/haproxy. Set ENABLED option to 1 as:

ENABLED=1

To verify if this change is done properly, execute the init script of HAProxy without any parameters. You should see the following:

$ service haproxy <press_tab_key>
reload   restart  start    status   stop

HAProxy is now installed. Let us now create a setup in which we have 2(two) Apache Web Server instances and 1(one) HAProxy instance. Below is the setup information:

We will be using three systems, spawned virtually through VirtualBox:

Instance 1 – Load Balancer

Hostname: haproxy
OS: Ubuntu
Private IP: 192.168.205.15

Instance 2 – Web Server 1

Hostname: webser01
OS: Ubuntu with LAMP
Private IP: 192.168.205.16

Instance 2 – Web Server 2

Hostname: webserver02
OS: Ubuntu with LAMP
Private IP: 192.168.205.17

Here is the diagram of how the setup looks like:
haproxy_example_setup
Let us now configure HAProxy.

Configuring HAProxy

Backup the original file by renaming it:

mv /etc/haproxy/haproxy.cfg{,.original}

We’ll create our own haproxy.cfg file. Using your favorite text editor create the/etc/haproxy/haproxy.cfg file as:

global
        log /dev/log   local0
        log 127.0.0.1   local1 notice
        maxconn 4096
        user haproxy
        group haproxy
        daemon

defaults
        log     global
        mode    http
        option  httplog
        option  dontlognull
        retries 3
        option redispatch
        maxconn 2000
        contimeout     5000
        clitimeout     50000
        srvtimeout     50000

listen webfarm 0.0.0.0:80
    mode http
    stats enable
    stats uri /haproxy?stats
    balance roundrobin
    option httpclose
    option forwardfor
    server webserver01 192.168.205.16:80 check
    server webserver02 192.168.205.17:80 check

Explanation:

global
        log /dev/log   local0
        log 127.0.0.1   local1 notice
        maxconn 4096
        user haproxy
        group haproxy
        daemon

The log directive mentions a syslog server to which log messages will be sent.
The maxconn directive specifies the number of concurrent connections on the front-end. The default value is 2000 and should be tuned according to your system’s configuration.
The user and group directives changes the HAProxy process to the specified user/group. These shouldn’t be changed.

defaults
        log     global
        mode    http
        option  httplog
        option  dontlognull
        retries 3
        option redispatch
        maxconn 2000
        contimeout     5000
        clitimeout     50000
        srvtimeout     50000

The above section has the default values. The option redispatch enables session redistribution in case of connection failures. So session stickness is overriden if a web server instance goes down.
The retries directive sets the number of retries to perform on a web server instance after a connection failure.
The values to be modified are the various timeout directives. The contimeout option specifies the maximum time to wait for a connection attempt to a web server instance to succeed.
The clitimeout and srvtimeout apply when the client or server is expected to acknowledge or send data during the TCP process. HAProxy recommends setting the client and server timeouts to the same value.

listen webfarm 0.0.0.0:80
    mode http
    stats enable
    stats uri /haproxy?stats
    balance roundrobin
    option httpclose
    option forwardfor
    server webserver01 192.168.205.16:80 check
    server webserver02 192.168.205.17:80 check

Above block contains configuration for both the frontend and backend. We are configuring HAProxy to listen on port 80 for webfarm which is just a name for identifying an application.
The stats directives enable the connection statistics page. This page can viewed with the URL mentioned in stats uri so in this case, it is http://192.168.205.15/haproxy?stats a demo of this page can be viewed here.
The balance directive specifies the load balancing algorithm to use. Algorithm options available are:

  • Round Robin (roundrobin),
  • Static Round Robin (static-rr),
  • Least Connections (leastconn),
  • Source (source),
  • URI (uri) and
  • URL parameter (url_param).

Information about each algorithm can be obtained from the official documentation.

The server directive declares a backend server, the syntax is:

server <server_name> <server_address>[:port] [param*]

The name we mention here will appear in logs and alerts. There are some more parameters supported by this directive and we’ll be using the check parameter in this article. The check option enables health checks on the web server instance otherwise, the web server instance is ?always considered available.

Once you’re done configuring start the HAProxy service:

sudo service haproxy start

Testing Load-Balancing and Fail-over

We will append the server name in both the default index.html file located by default at /var/www/index.html

On the Instance 2 – Web Server 1 (webserver01 with IP- 192.168.205.16), append below line as:

sudo sh -c “echo \<h1\>Hostname: webserver01 \(192.168.205.16\)\<\/h1\> >> /var/www/index.html”

On the Instance 3 – Web Server 2 (webserver02 with IP- 192.168.205.17), append below line as:

sudo sh -c “echo \<h1\>Hostname: webserver02 \(192.168.205.17\)\<\/h1\> >> /var/www/index.html”

Now open up the web browser on local machine and browse through the haproxy IP i.e. http://192.168.205.15

Each time you refresh the tab, you’ll see the load is being distributed to each web server. Below is screenshot of my browser:

For the first time when I visit http://192.168.205.15 , I get:

webserver01

And for the second time, i.e. when I refresh the page, I get:

webserver02

You can also check the haproxy stats by visiting http://192.168.205.15/haproxy?stats

There’s more that you can do to this setup. Some ideas include:

  • take one or both web servers offline to test what happens when you access HAProxy
  • configure HAProxy to serve a custom maintenance page
  • configure the web interface so you can visually monitor HAProxy statistics
  • change the scheduler to something other than round-robin
  • configure prioritization/weights for particular servers

(via Howtoforge.com)

Monkey – Architecture of a Linux based Web Server

Introduction

Monkey is an open source project started on 2001 with the goal to learn C, the long story is here . Along this years, the code have been improved in many aspects, since nomenclatures to heavy architecture changes, all have been made for good and nowadays thanks to the community of core developers and contributors around the project, Monkey is one of the top performance web servers around, and i would claim that the best option for Embedded Linux.

Understanding the basics of a human readable protocol: HTTP

The Hyper Text Transfer Protocol is basically a language with simple grammar to communicate two components: a HTTP client and a HTTP server. In a common context, the communication starts from a client performing a request to the server and for hence the server replying back with some result for the request performed. As a result we can consider a status response plus a content or simply an error.

Each HTTP request performed by the client is composed by a request method, URI, protocol version, and optionally a bunch of headers, so described that, we can say that a server must take care of:

  • Listen for new connections
  • Accept connections
  • Once the connection is accepted, start reading the HTTP request sent by the client
  • Parse the HTTP request, understand what the client wants
  • Depending of the request type, the sever can: serve some content, close the connection because some exception, proxy back the request to somebody else, etc.
  • Close the connection or keep it opened waiting for more requests. This depends of the protocol version and client HTTP headers.

Depending of the server target, it can be implemented in many ways with different architecture strategies, so the architecture described in this post only aims to describe what have worked better for us in terms of high performance and low resources usage.

Architecture design facts

  • Monkey is a web server designed with a strong focus in Linux. It do not aims to be portable across other operating system, focusing in the top and widely used mainstream operating system allow us to put our energies and effort in one place in the best way, and of course take the most of Linux Kernel to achieve high performance.
  • Event driven: well known as asynchronous, an event driver web server aims to use non-blocking system calls to perform it works reducing the computing time in the user-space context, e.g: if we are sending a file content to a client, we do not block the whole process or thread when sending the data, instead we instruct the kernel through a system call to send N bytes from the file and just notify me where i am able to send more bytes, in the meanwhile.. i process other connections and send other pending data.
  • Embedded Friendly: our embedded context is Embedded Linux, we care a lot of resources consumption, that means that under a heavy load don’t use more than 2.5MB of memory. Even Monkey binary size is around 80KB, once is load in memory it takes like 350KB, and depending of the load, more resources can be needed.
  • Small core, flexible API: it implements a basic core to handle HTTP protocol, it exposes a flexible API through the plugin interface where is possible to hook plugins for transport layer, security, request type and event handlers.

Contexts

In Monkey, we have defined two contexts of work: process context and thread context. The process context represents the main process waiting for incoming connections and the scheduler balancing the new connection for the worker threads. The thread context belongs to each thread working the active connections:

architecture_general

The number of workers are defined in the configuration, it scale properly well in single and multi-core CPUs solutions. There is no need to set thread affinity through CPU masks, the Linux Kernel Scheduler is smart enough to assign CPU time to each worker request, by default all workers are assign to all CPUs.

From a system administrator point of view, is possible to assign each worker to a different set of CPUs, but this approach is not suggested unless we are totally aware about what the Linux scheduler does in terms of interruptions,  context switches and CPU time for Kernel and User space applications. Do it only if you can do it better than the running scheduler.

Scheduler

Before to enter in the server loop, the scheduler launch and initialize each worker, taking care of set the initial data structures and the interfaces for the interaction between the components mentioned, this stage involves the creation of a epoll(7) queue per worker. Is good to mention that each epoll(7) queue created through epoll_create(2) is managed through a specific file descriptor.

Once the workers are up and running, the next Scheduler job is to to manage the incoming connections. So for each new connection accepted, it determinate who is the lowest loaded worker and assign the connection to it. The chosen worker is the one that have less connections in its epoll(7) interface, so the scheduler  goes around the worker counters and chose one. On this specific point the scheduler have two file descriptors: the connection file descriptor returned by accept(2) and the file descriptor that represents the epoll(7) of the chosen worker. So it basically register the new file descriptor in the proper epoll(7) queue.

Workers

Each worker or thread, runs in an infinite loop through the epoll(7) interface, which is basically a Linux specific polling mechanism to register, enqueue and notify about events in file descriptors registered by the Scheduler (sockets on this case).

The worker stay in a loop waiting for events in the epoll_wait(2) system call. Every time the Scheduler register a new file descriptor, an event will be reported in the worker epoll(7) interface, and it will do same when for subsequent events such as “there is data available for read” (EPOLLIN), “now you can write to the socket” (EPOLLOUT), “connection closed” (EPOLLHUP), etc.

So for each event triggered, the worker keeps a status of the connection to determinate if is a new connection, its receiving the HTTP request, HTTP request completed, parsing the request or sending out some response. Besides events, every a fixed time of seconds set in the configuration, it checks the connections that timed out due to an incomplete request or another anomaly.

Plugins Architecture

Monkey defines three categories of API where the plugins can hook: Context, Events, Stages and Networking.

Context
Define callbacks  that can be invoked when the server is starting up, it covers the process and thread contexts described earlier.

Events
For every type of event reported in a worker loop, a plugin can implement a hook to perform specific actions:

api_events-300x106

Stages
Every new connection, enter in a stage status, so for each step of the HTTP cycle it passed along different phases, and each plugin can hook to a specific one:

api_stages-300x108

Networking
Monkey is not aware about networking, for hence it intentionally depends of a plugin that provides the transport layer, this approach allows to change from common sockets communication to encrypted one as SSL in a easy manner. The networking plugin only needs to provide the required API functions for the communication:

level_networking-300x176

Scaling up

Every time a connection have performed a successful request, this is allocated in a global list of the worker scope (implemented through a pthread_key). for each event reported, the worker needs to lookup the internal data associated to it, so the file descriptor or socket number  acts like a primary key for the search. The solution of data structure implemented for Monkey v1.2, is the use of red-black tree algorithm. This algorithm have shown to behave very fairly and scalable when handling thousands of active connections per worker, maintaining a good balance between performance and cost.

The cost of each file descriptor lookup is critical for the server performance, having a O(n) solution will work fine for a few connections but under high concurrency a O(log(n)) solution will end up providing the highest performance.

Memory Management

One of the success key to reduce overhead in a server, is to reduce as much as possible the memory allocation requests performed  to the system within the main loop. Current Monkey implementation only performs 1 memory allocation per new connection, if it needed because the incoming request will post too much data, it will allocate more memory as it needs. Other web server solutions implements caching mechanism to reduce even more the memory allocations, as our focus is Embedded Linux we focus into speed at low resources usage, and implement a caching mechanism will increase our costs. So we dropped that common approach to do not abuse of system memory, just a decision based in the target.

Linux Kernel system calls

The Linux Kernel exposes a useful of non-portable set of system calls to achieve high performance when creating networking applications. The first one is epoll(7), as described earlier this interface allow to watch a set of file descriptors for certain defined events. Similar solutions like select(2) or poll(2) do not perform so well as epoll(7) does.

When sending a static file, the old-fashioned way is to open the file, get the file descriptor and perform multiples read(2)/write(2) to write out the file content. This operation requires the Kernel to copy data between Kernel and User spaces back and forward which obviously generate an overhead. As solution, the Linux Kernel implements a Zero-Copy strategy through the system call sendfile(2). This system call do not copy data to user space, instead it allows to send it directly to other file descriptor achieving good performance reducing the latency of the old fashioned way described.

In our architecture, the Logger plugin requires to transfer data through a pipe(2)  (a unidirectional data channel that can be used for interprocess communication). A common mechanism is to use read(2)and write(2) on each end, but in a similar way as sendfile(2) works, a new system call takes place for this kind of situation called splice(2). This system call moves data from one point to other without the copy-data overhead. The main difference between sendfile(2) and splice(2), is that splice(2) requires that one end must be a pipe(2).

In my previous post, i mentioned how to usage the new Linux Kernel feature called TCP_FASTOPEN, being something very simple to implement, it requires the cooperation of both sides: the client and the server. If you have full control of your networking application (client and server), consider to use TCP_FASTOPEN, it will increase performance decreasing the TCP handshake roundtrip.

Monkey Plugins

Based in the architecture and API described, the following plugins are distributed as part of the core:

Liana: basic sockets connectivity layer

PolarSSL: provides a transport layer based in SSL

Cheetah: plugin that provides a command line interface to query the internals of a running server through a unix socket

Mandril: security layer that aims to restrict the access by URI strings or sub networks.

Dirlisting: directory listing

Logger: log writer

CGI: old fashioned CGI interface

FastCGI: provide fast-cgi support

 

Bonus track: Full HTTP Stack for web services implementation
Besides to be a common web server to serve static or dynamic content, Monkey is a full stack for the development of web applications. In order to provide an easy API for web application or web services development, we have created Duda I/O , which is an event-driven C framework for rapid development based in Monkey stack.

Duda implements a core API of pseudo-objects and provide extra features  through a packages system, everything in a friendly C API. The most relevant features supported at the moment are WebSocket, JSON, SQLite3, Redis, Base64 and SHA1.

Due to it high performance nature and open source ecosystem around, is being used in production from Embedded Linux products to Big Data solutions. The License of Duda allows to create closed-sourced services or applications and link them to Duda I/O stack at zero cost.

For more details please refer to Duda I/O main site.

Monkey organization believes in Open Source and is fully committed to create the best networking technology for different needs. If you are interested into participate as a contributor or testing our stack, feel free to reach us on our mailing lists or irc channel #monkey at irc.freenode.net.

(via Edsiper.linuxchile.cl)

AppLovin: Marketing To Mobile Consumers Worldwide By Processing 30 Billion Requests A Day

16442530119_470a4487ee_m

This is a guest post from AppLovin‘s VP of engineering, Basil Shikin, on the infrastructure of its mobile marketing platform. Major brands like Uber, Disney, Yelp and Hotels.com use AppLovin’s mobile marketing platform. It processes 30 billion requests a day and 60 terabytes of data a day.

AppLovin’s marketing platform provides marketing automation and analytics for brands who want to reach their consumers on mobile. The platform enables brands to use real-time data signals to make effective marketing decisions across one billion mobile consumers worldwide.

Core Stats

  • 30 Billion ad requests per day

  • 300,000 ad requests per second, peaking at 500,000 ad requests per second

  • 5ms average response latency

  • 3 Million events per second

  • 60TB of data processed daily

  • ~1000 servers

  • 9 data centers

  • ~40 reporting dimensions

  • 500,000 metrics data points per minute

  • 1 Pb Spark cluster

  • 15GB/s peak disk writes across all servers

  • 9GB/s peak disk reads across all servers

  • Founded in 2012, AppLovin is headquartered in Palo Alto, with offices in San Francisco, New York, London and Berlin.

 

Technology Stack

 

Third Party Services

Data Storage

  • Aerospike for user profile storage

  • Vertica for aggregated statistics and real-time reporting

  • Aggregating 350,000 rows per second and writing to Vertica at 34,000 rows per second

  • Peak 12,000 user profiles per second written to Aerospike

  • MySQL for ad data

  • Spark for offline processing and deep data analysis

  • Redis for basic caching

  • Thrift for all data storage and transfers

  • Each data point replicated in 4 data centers

  • Each service is replicated at least in 2 data centers (at most in 8)

  • Amazon Web Services used for long term data storage and backups

Core App And Services

  • Custom C/C++ Nginx module for high performance ad serving

  • Java for data processing and auxiliary services

  • PHP / Javascript for UI

  • Jenkins for continuous integration and deployment

  • Zookeeper for distributed locking

  • HAProxy and IPVS for high availability

  • Coverity for Java/C++ static code analysis

  • Checkstyle and PMD for PHP static code analysis

  • Syslog for DC-centralized log server

  • Hibernate for transaction-based services

Servers And Provisioning

  • Ubuntu

  • Cobbler for bare metal provisioning

  • Chef for configuring servers

  • Berkshelf for Chef dependencies

  • Docker with Test Kitchen for running infrastructure tests

 

Monitoring Stack

 

Server Monitoring

  • Icinga for all servers

  • ~100 custom Nagios plugins for deep server monitoring

  • 550 various probes per server

  • Graphite as data format

  • Grafana for displaying all graphs

  • PagerDuty for issue escalation

  • Smokeping for network mesh monitoring

Application Monitoring

  • VividCortex for MySQL monitoring

  • JSON /health endpoint on each service

  • Cross-DC database consistency monitoring

  • 9 4K 65” TVs for showing all graphs across the office

  • Statistical deviation monitoring

  • Fraudulent users monitoring

  • Third-party systems monitoring

  • Deployments are recorded in all graphs

Intelligent Monitoring

  • Intelligent alerting system with a feedback loop: a system that can introspect anything can learn anything

  • Third-party stats about AppLovin are also monitored

  • Alerting is a cross-team exercise: developers, ops, business, data scientists are involved

 

Architecture Overview

 

General Considerations

  • Store everything in RAM

  • If it does not fit, save it to SSD

  • L2 cache level optimizations matter

  • Use right tool for the right job

  • Architecture allows swapping any component

  • Upgrade only if an alternative is 10x better

  • Write your own components if there is nothing suitable out there

  • Replicate important data at least 3x

  • Make sure every message can be re-played without data corruption

  • Automate everything

  • Zero-copy message passing

Message Processing

  • Custom message processing system that guarantees message delivery

  • 3x replication for each message

  • Sending a message = writing to disk

  • Any service may fail, but no data are lost

  • Message dispatching system connects all components together, provides isolation and extensibility of the system

  • Cross-DC failure tolerance

Ad Serving

  • Nginx is really fast: can serve an ad in less than a millisecond

  • Keep all ad serving data in memory: read only

  • jemalloc gave a 30% speed improvement

  • Use Aerospike for user profiles: less than 1ms to fetch a profile

  • Pre-compute all ad serving data on one box and dispatch across all servers

  • Torrents are used to propagate serving data across all servers. Using Torrents resulted in 83% network load drop on the originating server compared to HTTP-based distribution.

  • mmap is used to share ad serving data across nginx processes

  • XXHash is the fastest hash function with a low collision rate. 75x faster than SHA-1 for computing checksums

  • 5% of real traffic goes to staging environment

  • Ability to run 3 A/B tests at once (20%/20%/10% of traffic for three separate tests, 50% for control)

  • A/B test results are available in regular reporting

 

Data Warehouse

  • All data are replicated

  • Running most reports takes under 2 seconds

  • Aggregation is key to allow fast reports on large amounts of data

  • Non-aggregated data for the last 48 hours is usually to resolve most issues

  • 7 days of raw logs is usually enough for debug

  • Some reports must be pre-computed

  • Always think multiple data centers: every data point goes to a multiple locations

  • Backup in S3 for catastrophic failures

  • All raw data are stored in Spark cluster

 

Team

 

Structure

  • 70 full-time employees

  • 15 developers (platform, ad serving, frontend, mobile)

  • 4 data scientists

  • 5 dev. ops.

  • Engineers in Palo Alto, CA

  • Business in San Francisco, CA

  • Offices in New York, London and Berlin

Interaction

  • HipChat to discuss most issues

  • Asana for project-based communication

  • All code is reviewed

  • Frequent group code reviews

  • Quarterly company outings

  • Regular town hall meetings with CEO

  • All engineers (junior to CTO) write code

  • Interviews are tough: offers are really rare

Development Cycle

  • Developers, business side or data science team comes up with an idea

  • Idea is reviewed and scheduled to be executed on a Monday meeting

  • Feature is implemented in a branch; development environment is used for basic testing

  • A pull request is created

  • Code is reviewed and iterated upon

  • For big features group code reviews are common

  • The feature gets merged to master

  • The feature gets deployed to staging with the next build

  • The feature gets tested on 5% real traffic

  • Statistics are examined

  • If the feature is successful it graduates to production

  • Feature is closely monitored for couple days

Avoiding Issues

  • The system is designed to handle failure of any component

  • No failure of a single component can harm ad serving or data consistency

  • Omniscient monitoring

  • Engineers watch and analyze key business reports

  • High quality of code is essential

  • Some features take multiple code reviews and iterations before graduating

  • Alarms are triggered when:

    • Stats for staging are different from production

    • FATAL errors on critical services

    • Error rate exceeds threshold

    • Any irregular activity is detected

  • data are never dropped

  • Most log lines can be easily parsed

  • Rolling back of any change is easy by design

  • After every failure: fix, make sure same thing does not happen in the future, and add monitoring

 

Lessons Learned

 

Product Development

  • Being able to swap any component easily is key to growth

  • Failures drive innovative solutions

  • Staging environment is essential: always be ready to loose 5%

  • A/B testing is essential

  • Monitor everything

  • Build intelligent alerting system

  • Engineers should be aware of business goals

  • Business people should be aware of limitations of engineering

  • Make builds and continuous integration fast. Jenkins run on a 2 bare metal servers with 32 CPU, 128G RAM and SSD drives

Infrastructure

  • Monitoring all data points is critical

  • Automation is important

  • Every component should support HA by design

  • Kernel optimizations can have up to 25% performance improvement

  • Process and IRQ balancing lead to another 20% performance improvement

  • Power saving features impact performance

  • Use SSDs as much as possible

  • When optimizing, profile everything. Flame graphs are great!

(via HighScalability.com)

Google’s Cloud Pub/Sub Real-Time Messaging Service Is Now In Public Beta

2856173673_27b7e5e521_o

Google is launching the first public beta of Cloud Pub/Sub today, its backend messaging service that makes it easier for developers to pass messages between machines and to gather data from smart devices. It’s basically a scalable messaging middleware service in the cloud that allows developers to quickly pass information between applications, no matter where they’re hosted. Snapchat is already using it for its Discover feature and Google itself is using it in applications like its Cloud Monitoring service.

Pub/Sub was in alpha for quite a while. Google first (quietly) introduced it at its I/O developer conference last year, it never made a big deal about the service. Until now, the service was in private alpha, but starting today, all developers can use the service.

cps_integration

 

Using the Pub/Sub API, developers can create up to 10,000 topics (that’s the entity the application sends its messages to) and send up to 10,000 messages per second. Google says notifications should go out in under a second “even when tested at over 1 million messages per second.”

The typical use cases for this service, Google says, include balancing workloads in network clusters, implementing asynchronous workflows, logging to multiple systems, and data streaming from various devices.

During the beta period, the service is available for free. Once it comes out of beta, developers will have to pay $0.40 per million for the first 100 million API calls each month. Users who need to send more messages will pay $0.25 per million for the next 2.4 billion operations (that’s about 1,000 messages per second) and $0.05 per million for messages above that.

Now that Pub/Sub has hit beta — and Google even announced the pricing for the final release — chances are we will see a full launch around Google I/O this summer.

many-to-many

(via Techcrunch.com)

The Architecture Of Algolia’s Distributed Search Network

dsn-cover
Algolia started in 2012 as an offline search engine SDK for mobile. At this time we had no idea that within two years we would have built a worldwide distributed search network.

Today Algolia serves more than 2 billion user generated queries per month from 12 regions worldwide, our average server response time is 6.7ms and 90% of queries are answered in less than 15ms. Our unavailability rate on search is below 10-6 which represents less than 3 seconds per month.

The challenges we faced with the offline mobile SDK were technical limitations imposed by the nature of mobile. These challenges forced us to think differently when developing our algorithms because classic server-side approaches would not work.

Our product has evolved greatly since then. We would like to share our experiences with building and scaling our REST API built on top of those algorithms.

We will explain how we are using a distributed consensus for high-availability and synchronization of data in different regions around the world and how we are doing the routing of queries to the closest locations via an anycast DNS.

The data size misconception

Before designing the architecture, we first had to identify the major use cases we needed to support. This was especially true when considering our scaling needs. We had to know if our customers would need to index Gigabytes, Terabytes, or Petabytes of data. The architecture would be different depending on how many of those use cases we needed to handle.

When people think about search, most think about very big use cases like Google’s web page indexing or Facebook’s indexing of trillions of posts. If you stop and think about the search boxes you see every day, the majority of them do not search massively big datasets. Netflix searches approximately 10,000 titles and Amazon’s database in the US contains around 200,000,000 products. The data from both of these cases can be stored on a single machine! We are not saying that having a single machine is a good setup, but keeping in mind all that data can fit on one machine is really important since cross-machine synchronization is a big source of complexity and performance loss.

The road to high-availability

When building a SaaS API, high availability is a big concern as removing all single points of failure (SPOF) is extremely challenging. We spent weeks brainstorming the ideal search architecture for our service while keeping in mind our product would be geared towards user facing search.

Master-Slave Vs. Master-Master

By temporarily restricting the problem to each index being stored on a single machine, we simplified our high availability setup to several machines hosted in different data centers. With this setup, the first solution we thought of was to have a master-slave setup with one master machine receiving all indexing operations and then replicating them to one or more slave machines. With this approach, we could easily load balance search queries across all the machines.

The problem with this master-slave approach is that our high availability only works for search queries. All indexing operations need to go to the master. This architecture is too risky for a service company. All it takes is for the master to be down, which will happen, and clients will start having indexing errors.

We must implement a master-master architecture! The key element to enabling a master-master setup is to have a way of agreeing on a single result among a group of machines. We need to have shared knowledge between all machines which stays consistent under all circumstances, even when there is a network split between machines.

Introducing The Distributed Coherency

For a search engine, one of the best ways to introduce this shared knowledge is to treat the write operations as a unique stream of operations that must be applied in a certain order. When we have several operations coming at the exact same time, we need to assign them a sequence ID. This ID can then be used to ensure the sequence is applied exactly the same way on all replicas.

In order to assign a sequence ID (a number incremented by one after each job), we need to have a shared global state on the next sequence ID between machines. ZooKeeper opensource software is the de-facto solution for distributed knowledge in a cluster and we initially started to use ZooKeeper with the following sequence:

  1. When a machine receives a job, it copies the job to all replicas using a temporary name.

  2. That machine then takes the distributed lock.

  3. Reads the last sequence ID in ZooKeeper and sends an order to copy the temporary file as sequence ID + 1 on all machines. This is equivalent to a two phase commit.

  4. If we have a majority of positive answers from the machines (quorum), we save sequence ID + 1 in Zookeeper.

  5. The distributed lock is then released.

  6. Finally, the client sending the job is informed of the result. This would be success if there is a majority of commit.

Unfortunately this sequence is not right because if a machine that acquires the lock crashes or restarts between steps 3 and 4, we can end up in a state where the job is committed on some machines, a more complex sequence is needed.

The packaging of ZooKeeper as an external service via a TCP connection makes it really difficult to have it right and requires to use a big timeout (default timeout is set to 4 seconds, representing two ticks of two seconds each).

As a consequence, every failure event, either from hardware or software, would freeze our entire system for the duration of this timeout. It might seem acceptable, but in our case we wanted to test a failure very often in production (like the Monkey testing approach of Netflix).

The Raft Consensus Algorithm

Around the time we were running into these problems, the RAFT consensus algorithm was published. It was clear right away that this algorithm fit our use case perfectly. The state machine of RAFT is our index and the log is the list of index jobs to be executed. I already knew about the PAXOS protocol but did not have a strong enough understanding of it and all the variants to be confident enough to implement it myself. RAFT, on the other hand, was much clearer. If was a perfect match for what we needed and even without stable open source implementations at that time, I was confident enough in my understanding to implement it as the basis of our architecture.

The hardest part of implementing consensus algorithms is making sure there are no bugs in the system. To handle that, I opted for a monkey testing approach by randomly killing processes using a sleep before restarting. To test it even further, I simulated network drops and degradations via the firewall. This type of testing helped us find many bugs. Once we were operating for several days without any problems, I was very confident the implementation was done correctly.

Replicate At Application Or Filesystem Level?

We have chosen to distribute the write operations to all machines and execute them locally rather than replicating the final results on filesystem. We made this choice for two reasons:

  • It is faster. Indexing is done in parallel on all machines, it is faster than replicating the resulting binary files that can be big

  • It is compatible with multiple regions. If we replicate the files after indexing, we need to have a process that will rewrite the whole index. This means we could have huge amounts of data to transfer. The size of data to transfer is very inefficient if you need to transfer it to different geographic regions around the world (ex. New York to Singapore).

Each machine will receive all write operation jobs in the correct order and process them as soon as possible independently of other machines. This means all machines are assured to be at the same state but not necessarily at the same time. This is because the changes may not be committed on all machines at exactly the same moment.

The Compromise On Consistency

In distributed computing, the CAP Theorem states that it is impossible for a distributed computing system to simultaneously provide all three of the following:

  • Consistency: all nodes see the same data at the same time.

  • Availability: a guarantee that every request receives a response about whether it succeeded or failed.

  • Partition tolerance: the system continues to operate despite arbitrary message loss or failure of part of the system.

According to this theorem, we compromised on Consistency. We don’t guarantee that all nodes see exactly the same data at the same time but they will all receive the updates. In other words, we can have small cases where the machines are not synchronized. In reality, this is not a problem because when a customer performs a write operation we apply that job on all hosts. There is less than one second between the time of application on the first and last machine so it is normally not visible for end users. The only inconsistency possible is whether the last updated received is already applied or not, which is compatible with the use cases of our clients.

General Architecture

Definition Of A Cluster

Having a distributed consensus between machines is mandatory in order to have a high availability infrastructure but there is unfortunately a big drawback. This consensus requires several round trips between the machines, so the number of possible consensus per second is directly related to the latency between the different machines. They need to be close to have a high number of consensus per second. To be able to support several regions without sacrificing the number of possible write operations means that we need to have several clusters, each cluster will contains three machines that will act as perfect replicas.

Having one cluster per region is the minimum needed for consensus, but is still far from perfect:

  • We cannot make all customers fit on one machine.

  • The more customers we have, the less number of write operations per second each unique customer will be able to perform. This is because the maximum number of consensus per second is fixed.

In order to work around this problem, we decided to apply the same concept at the region level: each region will have several clusters of three machines. One cluster can host from one to several customers depending on the size of the data they have. This concept is close to what virtualization is doing on a physical machine. We are able to put several customers on a cluster except one customer can grow and change their usage dynamically. In order to do this, we need to develop and automate the following processes:

  • Migrate one customer to another cluster if the cluster has too much data or number of write operations.

  • Add a new machine to the cluster if the volume of queries is too big.

  • Change the number of shards or split one customer across several clusters if their volume of data is too big.

If we have these processes in place, a customer won’t be assigned to a cluster permanently. Assignment will change depending on their own usage as well as the cluster’s usage. This means we need a way to assign a customer to a cluster.

Assigning A Customer To A Cluster

The standard way to manage this assignment is to have one unique DNS entry per customer. This is similar to how Amazon Cloudfront works. Each customer is assigned a unique DNS entry of the form customerID.cloudfront.net that can then target a different set of machines depending on the customer.

We chose to go with the same approach. Each customer is assigned a unique application ID which is linked to a DNS record of the form APPID.algolia.io. This DNS record targets a specific cluster with all machines in the cluster being part of the DNS record so there is load balancing done via DNS. We also use health check mechanisms to detect machine failures and remove them from the DNS resolution.

The health check mechanism is still not sufficient to provide a good SLA even with a very low TTL on the DNS records (TTL is the time the client is allowed to keep the DNS answer cached). The problem is that a host may go down but a user still has the host in cache. The user will continue to send queries to it until the cache expires. It gets even worse because TTL is not an exact science. There are cases where systems do not respect the TTL. We have seen DNS records with a TTL of one minute transformed into a TTL of 30 minutes by some DNS servers.

In order to further improve high availability and avoid a machine failure impacting users, we generate another set of DNS records for each customer of the form APPID-1.algolia.io, APPID-2.algolia.io, and APPID-3.algolia.io. The idea behind these DNS records is to allow our API clients to retry other records when a TCP connect timeout is reached (usually set to one second). Our standard implementation is to shuffle the list of DNS records and try them in sequential order.

Combined with carefully-controlled retry and timeout logic in our API clients, this proved to be a better and cheaper solution than using specialized load balancer.

Later, we discovered the trendy .IO TLD was not a good choice for performance. There are fewer DNS servers in the anycast network of .IO compared to .NET and the ones there were saturated. This resulted in a lot of timeouts that slowed down the name resolution. We have since solved these performance problems by switching to algolia.net domains while keeping backwards compatibility by continuing to support algolia.io.

What about Scalability of a cluster?

Our choice of using several clusters allows us to add more customers without too much risk of impacting existing customers because of the isolation between clusters. But we still had concerns about the scalability of one cluster that needed to be addressed.

The first limiting factor in the scalability of a cluster is the number of write operations per second due to the consensus. In order to mitigate this factor, we introduced a batch method in our API that encapsulates a set of write operations in one operation from the consensus point of view. The problem is that some customers still perform write operations without batching which can have a negative impact on indexing speed for other customers of the cluster.

In order to reduce this performance impact, we have made two changes to our architecture:

  • We added a batching strategy when there is contention on the consensus by automatically aggregating all write operations of each customer inside a unique operation from the consensus point of view. In practice, this means that we are reordering the sequence of jobs but without an impact on the semantics of the operations. For example, if there are 1,000 jobs pending for consensus and 990 are from one customer, we will merge 990 write operations into one even if there are jobs of other customers interlaced with them.

  • We added a consensus scheduler that controls the number of write operations per second entering the consensus for each application ID. This avoids one customer being able to use all the bandwidth of the consensus.

Before we implemented these improvements, we tried a rate limit strategy by returning a 429 HTTP status code. It was apparent very quickly that this was too painful for our customers to have to watch for this response and implement a retry strategy. Today, our biggest customer performs more than one billion write operations per day on a single cluster of three machines which is an average of 11,500 operations per second with bursts of more than 150,000.

The second problem was to find the best hardware setup and avoid any potential bottlenecks such as CPU or I/O that could compromise the scalability of a cluster. Since the beginning we made the choice to use our own bare metal servers in order to fully control the performance of our service and avoid wasting any resources. Selecting the correct hardware proved to be a challenging task.

At the end of 2012, we started with a small setup consisting of: Intel Xeon E3 1245v2, 2x Intel SSD 320 series 120GB in raid 0, and 32GB of RAM. This hardware was reasonable in terms of price, more powerful than cloud platforms, and allowed us to start the service in Europe and US-East.

This setup allowed us to tune the kernel for I/O scheduling and virtual memory which was critical for us to take advantage of all available physical resources. Even so, we soon discovered our limits were the amount of RAM and I/O. We were using around 10GB of RAM for indexing which left only 20GB of RAM for caching of files used for performing search queries. Our goal had always been to have customer indices in memory in order to have a service optimized for millisecond response times. The current hardware setup was designed for 20GB of index data which was too small.

After this first setup, we tried different hardware machines with single and dual socket CPUs, 128GB and 256GB of RAM, and different models/sizes of SSD.

We finally found an optimal setup with a machine containing an Intel Xeon E5 1650v2, 128GB of RAM, and 2x400GB Intel S3700 SSD. The model of the SSD was very important for durability. We burned a lot of SSDs before finding the correct model that can operate in production for years.

In the end, the final architecture we built allowed us to scale well in all areas with only one condition: we needed to have free resources available at any moment. It might seem crazy in 2015 to deal with the pain of having to manage bare metal servers, but the gain we have in terms of quality of service and price for our customers is well worth it. We are able to offer a fully packaged search engine with replication to three different locations, in memory indices, and with excellent performance in more locations than AWS!

Is it complex to operate?

Limit The Number Of Processes

Each machine contains only three processes. The first is a nginx server with all our query interpretation code embedded inside as a module. To answer a query, we memory map the index files and directly execute the query inside the nginx worker without communicating to another process or machine. The only exception is when the customer data does not fit on one machine which is rare.

The second process is a redis key/value store that we use to check rates and limits as well as storing real time logs and counters for each application ID. These counters are used to build our real time dashboard which can be viewed when you connect to your account. This is useful for visualizing your last API calls and for debugging.

The last process is the builder. This is the process responsible for handling all write operations. When the nginx process receives a write operation, it forwards the operation to the builder to perform the consensus. It is also responsible for building the indices and contains a lot of monitoring code that checks for errors in our service such as crashes, slow indexing, indexing errors, etc. Depending on the severity of the problem, some are reported by SMS via Twilio’s API while others are reported directly to PagerDuty. Each time a new problem is detected in production and not reported we make sure to add a new probe to watch for this type of error in the future.

Ease Of Deployment

The simplicity of this stack makes deployments easy. Before we deploy any code we apply a bunch of unit tests and non regression tests. Once all those tests are passing, we gradually deploy to clusters.

Our deployments should never impact production nor be visible to end users. At the same time, we also want to generate a host failure in consensus in order to check everything is working as expected. In order to achieve both goals, we deploy each machine of a cluster independently and apply the following procedures:

  1. Fetch new nginx and builder binaries.

  2. Gracefully restart the nginx web server and relaunch nginx using the new binary without losing any user queries.

  3. Kill the builder and launch it using the new binary. This triggers a failure in RAFT on the deployment of each machine with allows us to make sure our failover is working as expected.

The simplicity of operating our system was an important goal in our architecture. We did not want nor believe deployment should be constrained by the architecture.

Achieving A Good Worldwide Coverage

Services are becoming more and more global. Serving search queries from only one worldwide region is far from optimal. For example, having search hosted in US-East will have a big difference in usability depending on where users are searching from. Latency will go from a few milliseconds for users in US-East to several hundred milliseconds for users in Asia without counting the bandwidth limitations of saturated oversea fibers.

We have seen some companies use a CDN on top of a search engine to address these issues. This ends up causing more problems than value for us because invalidating cache is a nightmare and it only improves the speed for a small percentage of queries that are frequently made. It was clear to us that in order to solve this problem we would need to replicate indices to different regions and have them loaded in memory in order to answer user queries efficiently.

What we need is an inter-region replication on top of our existing cluster replication. The replica can be stored on one machine since the replica will only be used for search queries. All write operations will still go to the original cluster of the customer.

Each customer can select the set of data centers they want to have as a replicate, so a replicate machine in a specific region can receive data from several clusters and a cluster can send data to several replicates.

The implementation of this architecture is modeled on our consensus based stream of operations. Each cluster transforms its own stream of write operations after consensus into a version for each replicate making sure to replace jobs that are not relevant for this replicate with no-op jobs. This stream of operations is then sent to all replicates as a batch of operations to avoid as much latency as possible. Sending jobs one by one would result in too many round trips with the replicates.

On the cluster, write operations are kept on the machines until they are acknowledged by all replicates.

The last part of DSN is to redirect the end user directly to the closest location. In order to do that we added another DNS record in the form of APPID-dsn.algolia.net that takes care of the resolution to the closest data center. We first used the Route53 DNS service of Amazon but rapidly hit its limits.

  • The latency-based routing is limited to the AWS regions and we have locations not covered by AWS like India, Hong Kong, Canada and Russia.

  • The geo-based routing is horrible. You need to indicate for each country what the DNS resolution will be. This is a classic approach a lot of hosted DNS providers are taking but in our case it would be a nightmare to support and would not provide enough relevancy. For example, we have several data centers in the US.

After a lot of benchmarking and discussion, we decided upon using NSOne for several reasons:

  • Their Anycast network is very good and better balanced than AWS for us. For example, they have a POP in India and Africa.

  • Their filter logic is really good. For each customer we can specify the list of machines that are associated with them (including replicates) and use a geo filter to sort them by distance. We are then able to keep the best one.

  • They support EDNS client subnets. This is important for us in order to be more relevant. We use the IP of the final user instead of the IP of their DNS server for resolution.

In terms of performance, we have been able to reach global worldwide synchronization at the second level. You can try it out on Product Hunt’s search (hosted in US-East, US-West, India, Australia, and Europe) or on Hacker News’ search (hosted in US-East, US-West, India, and Europe).

Conclusion

We spent a lot of time building our distributed and scalable architecture and have faced a lot of different problems. I hope this article gives you a better understanding about how we resolved those problems and provides a useful guide on how to design your own services.

I’m seeing more and more services that are currently facing problems similar to us, having a worldwide audience with multi-region infrastructure but with some worldwide consistent information like login or content. Having a multi-region infrastructure today is mandatory to achieve an excellent user experience. This approach can be used for example to distribute read-only replicates of a database that will be consistent worldwide!
(via HighScalability.com)