Announcing Kylin: Extreme OLAP Engine for Big Data

We (from Ebay) are very excited to announce that eBay has released to the open-source community our distributed analytics engine: Kylin (http://kylin.io). Designed to accelerate analytics on Hadoop and allow the use of SQL-compatible tools, Kylin provides a SQL interface and multi-dimensional analysis (OLAP) on Hadoop to support extremely large datasets.

Kylin is currently used in production by various business units at eBay. In addition to open-sourcing Kylin, we are proposing Kylin as an Apache Incubator project.

Background

The challenge faced at eBay is that our data volume has become bigger while our user base has become more diverse. Our users – for example, in analytics and business units – consistently ask for minimal latency but want to continue using their favorite tools, such as Tableau and Excel.

So, we worked closely with our internal analytics community and outlined requirements for a successful product at eBay:

  1. Sub-second query latency on billions of rows
  2. ANSI-standard SQL availability for those using SQL-compatible tools
  3. Full OLAP capability to offer advanced functionality
  4. Support for high cardinality and very large dimensions
  5. High concurrency for thousands of users
  6. Distributed and scale-out architecture for analysis in the TB to PB size range

We quickly realized nothing met our exact requirements externally – especially in the open-source Hadoop community. To meet our emergent business needs, we decided to build a platform from scratch. With an excellent team and several pilot customers, we have been able to bring the Kylin platform into production as well as open-source it.

Feature highlights

Kylin is a platform offering the following features for big data analytics:

  • Extremely fast OLAP engine at scale: Kylin is designed to reduce query latency on Hadoop for 10+ billion rows of data.
  • ANSI SQL on Hadoop: Kylin supports most ANSI SQL query functions in its ANSI SQL on Hadoop interface.
  • Interactive query capability: Users can interact with Hadoop data via Kylin at sub-second latency – better than Hive queries for the same dataset.
  • MOLAP cube query serving on billions of rows: Users can define a data model and pre-build in Kylin with more than 10+ billions of raw data records.
  • Seamless integration with BI Tools: Kylin currently offers integration with business intelligence tools such as Tableau and third-party applications.
  • Open-source ODBC driver: Kylin’s ODBC driver is built from scratch and works very well with Tableau. We have open-sourced the driver to the community as well.
  • Other highlights: 
  • Job management and monitoring
  • Compression and encoding to reduce storage
  • Incremental refresh of cubes
  • Leveraging of the HBase coprocessor for query latency
  • Approximate query capability for distinct counts (HyperLogLog)
  • Easy-to-use Web interface to manage, build, monitor, and query cubes
  • Security capability to set ACL at the cube/project level
  • Support for LDAP integration

The fundamental idea

The idea of Kylin is not brand new. Many technologies over the past 30 years have used the same theory to accelerate analytics. These technologies include methods to store pre-calculated results to serve analysis queries, generate each level’s cuboids with all possible combinations of dimensions, and calculate all metrics at different levels.

For reference, here is the cuboid topology:

cuboid_topo

 

When data becomes bigger, the pre-calculation processing becomes impossible – even with powerful hardware. However, with the benefit of Hadoop’s distributed computing power, calculation jobs can leverage hundreds of thousands of nodes. This allows Kylin to perform these calculations in parallel and merge the final result, thereby significantly reducing the processing time.

From relational to key-value

As an example, suppose there are several records stored in Hive tables that represent a relational structure. When the data volume grows very large – 10+ or even 100+ billions of rows – a question like “how many units were sold in the technology category in 2010 on the US site?” will produce a query with a large table scan and a long delay to get the answer. Since the values are fixed every time the query is run, it makes sense to calculate and store those values for further usage. This technique is called Relational to Key-Value (K-V) processing. The process will generate all of the dimension combinations and measured values shown in the example below, at the right side of the diagram. The middle columns of the diagram, from left to right, show how data is calculated by leveraging MapReduce for the large-volume data processing.

rational_to_kv

 

Kylin is based on this theory and is leveraging the Hadoop ecosystem to do the job for huge volumes of data:

  1. Read data from Hive (which is stored on HDFS)
  2. Run MapReduce jobs to pre-calculate
  3. Store cube data in HBase
  4. Leverage Zookeeper for job coordination

Architecture

The following diagram shows the high-level architecture of Kylin.

kylin_arch

 

This diagram illustrates how relational data becomes key-value data through the Cube Build Engine offline process. The yellow lines also illustrate the online analysis data flow. The data requests can originate from SQL submitted using a SQL-based tool, or even using third-party applications via Kylin’s RESTful services. The RESTful services call the Query Engine, which determines if the target dataset exists. If so, the engine directly accesses the target data and returns the result with sub-second latency. Otherwise, the engine is designed to route non-matching dataset queries to SQL on Hadoop, enabled on a Hadoop cluster such as Hive.

Following are descriptions of all of the components the Kylin platform includes.

  • Metadata Manager: Kylin is a metadata-driven application. The Metadata Manager is the key component that manages all metadata stored in Kylin, including the most important cube metadata. All other components rely on the Metadata Manager.
  • Job Engine: This engine is designed to handle all of the offline jobs including shell script, Java API, and MapReduce jobs. The Job Engine manages and coordinates all of the jobs in Kylin to make sure each job executes and handles failures.
  • Storage Engine: This engine manages the underlying storage – specifically the cuboids, which are stored as key-value pairs. The Storage Engine uses HBase – the best solution from the Hadoop ecosystem for leveraging an existing K-V system. Kylin can also be extended to support other K-V systems, such as Redis.
  • REST Server: The REST Server is an entry point for applications to develop against Kylin. Applications can submit queries, get results, trigger cube build jobs, get metadata, get user privileges, and so on.
  • ODBC Driver: To support third-party tools and applications – such as Tableau – we have built and open-sourced an ODBC Driver. The goal is to make it easy for users to onboard.
  • Query Engine: Once the cube is ready, the Query Engine can receive and parse user queries. It then interacts with other components to return the results to the user.

In Kylin, we are leveraging an open-source dynamic data management framework called Apache Calcite to parse SQL and plug in our code. The Calcite architecture is illustrated below. (Calcite was previously called Optiq, which was written by Julian Hyde and is now an Apache Incubator project.)

calcite

 

Kylin usage at eBay

At the time of open-sourcing Kylin, we already had several eBay business units using it in production. Our largest use case is the analysis of 12+ billion source records generating 14+ TB cubes. Its 90% query latency is less than 5 seconds. Now, our use cases target analysts and business users, who can access analytics and get results through the Tableau dashboard very easily – no more Hive query, shell command, and so on.

What’s next

  • Support TopN on high-cardinality dimension: The current MOLAP technology is not perfect when it comes to querying on a high-cardinality dimension – such as TopN on millions of distinct values in one column. Similar to search engines (as many researchers have pointed out), the inverted index is the reasonable mechanism to use to pre-build such results.
  • Support Hybrid OLAP (HOLAP): MOLAP is great to serve queries on historical data, but as more and more data needs to be processed in real time, there is a growing requirement to combine real-time/near-real-time and historical results for business decisions. Many in-memory technologies already work on Relational OLAP (ROLAP) to offer such capability. Kylin’s next generation will be a Hybrid OLAP (HOLAP) to combine MOLAP and ROLAP together and offer a single entry point for front-end queries.

Open source

Kylin has already been open-sourced to the community. To develop and grow an even stronger ecosystem around Kylin, we are currently working on proposing Kylin as an Apache Incubator project. With distinguished sponsors from the Hadoop developer community supporting Kylin, such as Owen O’Malley (Hortonworks co-founder and Apache member) and Julian Hyde (original author of Apache Calcite, also with Hortonworks), we believe that the greater open-source community can take Kylin to the next level.

We welcome everyone to contribute to Kylin. Please visit the Kylin web site for more details: http://kylin.io.

To begin with, we are looking for open-source contributions not only in the core code base, but also in the following areas:

  1. Shell Client
  2. RPC Server
  3. Job Scheduler
  4. Tools

For more details and to discuss these topics further, please follow us on twitter @KylinOLAP and join our Google group: https://groups.google.com/forum/#!forum/kylin-olap

Summary

Kylin has been deployed in production at eBay and is processing extremely large datasets. The platform has demonstrated great performance benefits and has proved to be a better way for analysts to leverage data on Hadoop with a more convenient approach using their favorite tool. We are pleased to open-source Kylin. We welcome feedback and suggestions, and we look forward to the involvement of the open-source community.

( Via Ebaytechblog.com )

How Clay.Io Built Their 10x Architecture Using AWS, Docker, HAProxy, And Lots More

This is the first post in my new series 10x, where I share my experiences and how we do things at Clay.io to develop at scale with a small team. If you find these things interesting, we’re hiring – zoli@clay.io.

The Cloud

CloudFlare

CloudFlare

CloudFlare handles all of our DNS, and acts as a distributed caching proxy with some additional DDOS protection features. It also handles SSL.

Amazon EC2 + VPC + NAT Server

Amazon Web Services

Almost all of our servers live on Amazon EC2, most are either medium or large instances. We also use Amazon VPC to host some of our servers inside of a private network, inaccessible from the outside world. In order to get into this private network we have a NAT server, which also serves as our VPN endpoint which we use when working with our internal network. (guide, OpenVPN)

Amazon S3

We use Amazon S3 as our CDN backend, which hosts all of our static content. We use a separate domain for this: cdn.wtf for security and performance reasons (cookie-less domain).

HAProxy

HAProxy is an extremely performant reverse-proxy which we use to route traffic to our different services. This work is non-trivial due to the nature of Clay.io and its platform support concerns (and legacy code support), which I will go into detail in a later article.

We currently have a single HAProxy server on an m3.medium instance, but will upgrade as traffic increases. In addition, we may add Amazon ELB in front to scale horizontally if necessary.

App Server – Docker

Docker

Docker is tool to manage Linux containers, which are similar to Virtual Machines except with less overhead (and without some isolation and security guarantees). The key to Docker is that code shipped inside of a container should run the same no matter what the host machine looks like.

We currently run most of our computational services on an app server via Docker. This server can easily be replicated to meet elastic demand, and services can be moved on and off easily. Eventually we hope to manage these app servers with a tool like Kubernetes. (See bottom of post)

Staging App Server – Docker

Our staging environment server is identical to our application server, and runs the exact same docker binaries that we run in production. This environment has been critical to preventing unnecessary breakage and downtime of our production systems.

Data

MySQL

MySQL

MySQL is a production-hardened relational SQL database. The vast majority of our data currently resides inside a Master-Slave MySQL cluster. We have one master, and two slave servers which serve most of our queries for our users. Eventually we may have to move tables or shard the single master server, but hopefully not for a while.

Logstash

logstash

Logstash is a log aggregation tool, with Kibana integration for analysis. It basically handles all of our application logs, and gives us a place to check for errors when something goes wrong. It saves us from having to SSH into a machine to check logs.

MongoDB

MongoDB

MongoDB is a NoSQL document storage database. We currently use mongodb for some of our developer endpoints, and for our A/B testing framework Flak Cannon.

Memcached

Memcached is a key-value store, used mostly for caching. In many ways it is similar to Redis. We currently use Memcached in our legacy webapp for caching MySQL query results. Eventually we would like to replace this with Redis.

DevOps

Ansible

Ansible

Ansible has been our tool of choice for managing our servers. It’s simple enough for most developers to learn quickly and be comfortable working with, and has been critical for automating many of the processes normally managed by an operations team.

Other Services

GitHub

GitHub – Great source code management, enough said.

Uptime Robot

Uptime Robot is a free monitoring service which we use to monitor our healthchecks and endpoints. If anything goes down, it will email and text message us within 5min.

Drone.Io

Drone.io is a continuous integration service, which we use to continuously run our test suites for our various projects. It is similar to TravisCI, and has recently released an open source self-hostable version.

Docker Registry

We currently use the official Docker registry to manage our docker containers. It’s similar to GitHub, except for Docker containers.

New Relic

New Relic is a server and application monitoring service, which we mostly use to monitor our servers to let us know when a machine is running out of disk or memory

Google Analytics

Google Analytics is our main website analytics tracking tool. For tracking our site specific features, we use the custom events features.

Google Apps

Google Apps provides email for our domain clay.io, and gives our organization a shared Google Drive setup.

Last Pass

Last Pass is a password management service which allows us to share company credentials for all of the other above services easily amongst the team.

The Future

While we are currently happy with our setup today, we hope to improve it in the coming months. This initial infrastructure version is missing some features which weren’t necessary enough to justify spending time on, but we will eventually need to come back to them as we scale.

Kubernetes is looking to be an amazing project and tool for managing Docker containers at scale. We will be following it’s development closely and hopefully put it into production as the project matures.

Amazon Glacier is another technology we have been looking at for doing database backups, and hope to implement that in the near future.

RethinkDB, while quite immature, is also quite an interesting project. We will definitely be following it’s progress and may eventually move some of our data into it as we move away from MySQL.

(Source: HighScalability.com)

How Disqus Went Realtime With 165K Messages Per Second And Less Than .2 Seconds Latency

14046544182_2e110fb29c_m

How do you add realtime functionality to a web scale application? That’s what Adam Hitchcock, a Software Engineer at Disqus talks about in an excellent talk: Making DISQUS Realtime (slides).

Disqus had to take their commenting system and add realtime capabilities to it. Not something that’s easy to do when at the time of the talk (2013) they had had just hit a billion unique visitors a month.

What Disqus developed is a realtime commenting system called “realertime” that was tested to handle 1.5 million concurrently connected users, 45,000 new connections per second, 165,000 messages/second, with less than .2 seconds latency end-to-end.

The nature of a commenting system is that it is IO bound and has a high fanout, that is a comment comes in and must be sent out to a lot of readers. It’s a problem very similar to what Twitter must solve.

Disqus’ solution was quite interesting as was the path to their solution. They tried different architectures but settled on a solution built on Python, Django, Nginx Push Stream Module, and Thoonk, all unified by a flexible pipeline architecture. In the process they we are able to substantially reduce their server count and easily handle high traffic loads.

At one point in the talk Adam asks if a pipelined architecture is a good one? For Disqus messages filtering through a series of transforms is a perfect match. And it’s a very old idea. Unix System 5 has long had a Streams capability for creating flexible pipelines architectures. It’s an incredibly flexible and powerful way of organizing code.

So let’s see how Disqus evolved their realtime commenting architecture and created something both old and new in the process…

Stats

  • Current:

    • 3 million websites use Disqus as their commenting system

    • Half a billion people engaged in conversations every month

    • 20 million comments every month

  • As of ~March 2013:

    • A billion unique visitors a month.

    • 18 Engineers

Platform

  • Python (Disqus is a service and is written in Python and other languages)

  • Django

  • Thoonk Redis Queue – a queue library on top of redis.

  • Nginx Push Stream Module – A pure stream http push technology for your Nginx setup. Comet made easy and really scalable.

  • Gevent – coroutine-based Python networking library that uses greenlet to provide a high-level synchronous API on top of the libev event loop.

  • Long Polling using EventSource (in the browser)

  • Sentry – a realtime, platform-agnostic error logging and aggregation platform.

  • Scales – tracks server state and statistics, allowing you to see what your server is doing.

  • Runs on raw metal, not EC2.

Architecture

  • Motivation for realtime:

    • Engagement. Realtime distribution of comments encourages users to stay on a page longer. More people comment after realtime than they did before.

    • Sell/trade data. Create a fire-hose product out of the global comment stream.

  • Old realtime system:

    • The Disqus app, written in Django, would post to memcache on many keys: forum:id, thread:id, user:id, post:id. Maybe someone in the future might find it interesting. Since pub/sub is cheap to do, this allows for later innovation.

    • Front-end client would poll the memcache key every couple of seconds.

    • Client would display any new comments.

    • Problem: did not scale at all. Only 10% of the network could use the product at one time.

  • First solution approach:

    • New Posts -> Disqus -> redis pub/sub -> Flask (a web framework) Front End cluster <- HAProxy <- clients.

    • Clients would connect to HAProxy. HAProxy was used to handle millions of connections.

    • Problem: rapidly ran out of CPU on flask machines because they were doing redundant work. If two subscribers were listening to the same thread the message would be formatted twice.

  • Second approach:

    • A backend server was created to do the dedupe formatting work.

    • So  the new flow: New Posts -> Disqus -> redis queue -> “python glue” Gevent formatting server (2 servers for redundancy) -> redis pub/sub (6 servers) -> Flask FE (front end) Cluster (14 big servers) <- HA Proxy (5 servers) <- clients

    • This worked well. Except as it scaled out it was using more and more servers, especially the Flask cluster. The redis pub/sub cluster was also growing quickly.

Third And Winning Approach:

  • Uses a pipelined architecture where messages pass from queue to queue while being acted upon by filters.

  • Switched to nginx + push stream module. This replaced redis pub/sub, flask servers and the HAProxy cluster.

  • New flow looks like: New Posts -> Disqus -> redis queue ->  “python glue” Gevent formatting server (2 servers) -> http post -> nginx pub endpoint -> nginx + push stream module (5 servers) <- clients

  • Only the pub/sub of redis was being used and the nginx push stream module supported the same functionality.

  • 5 push stream servers were required because of network memory limitations in the kernel. It’s a socket allocation problem, that is having lots of sockets open. Otherwise could run on 3 servers, including redundancy.

  • The Disqus part of the flow is a Django web app that uses post_save and post_delete hooks to put stuff onto a thoonk queue. These hooks are very useful for generating notifications for realtime data.

  • Thoonk is a queue library on top of redis.

    • They already had thoonk so used it instead of spinning up a HA cluster of RabbitMQ machines. Ended up really liking it.

    • Thoonk is Implemented as a state machine so it’s easy to see what jobs are claimed or not claimed, etc. Makes cleanup after a failure easy.

    • Since the queue is stored in redis using zsets, range queries can be performed on the queue. Useful to implement end-to-end acks because you can ask which messages have been processed yet, for example, and take appropriate action.

  • The python glue program.

    • Listens to the thoonk queue.

    • Performs all of the formatting and computation for clients. Includes cleaning and formatting data.

    • Originally did formatting in the flask cluster, but that took too much CPU.

    • Found that gzipping individual messages was not a win because there wasn’t enough redundancy in a message to generate sufficient savings from compression.

    • Gevent runs really fast for an IO bound system like this.

    • A watchdog makes sure a greenlet was always running, which is to say work is always being performed. A greenlet is micro-thread with no implicit scheduling:coroutines.

    • A monitor watches for lots of failures and then raises an alert when observed.

  • Pipelined architectures.

    • The python glue program is structured as a data pipeline, there are stages the data must go through: parsing, computation, publish it to another place. These are run in a greenlet.

    • Mixins were used to implement stage functionality: JSONParserMixin,  AnnomizeDataMixin, SuperSecureEncryptDataMixin, HTTPPublisher, FilePublisher.

    • The idea is to compose pipelines. A message would come off of thoonk and run through a pipeline: JSONAnnonHTTPPipeline, JSONSecureHTTPPipeline, JSONAnnonFilePipeline.

    • Pipelines can share most of their functionality, yet still be specialized. Great when bringing up a new feature you can make a new pipeline stage, make a new pipeline, and have the old pipeline run side by side with the new pipeline. Old and new features happily coexist.

    • Tests are also composable within a pipeline. To run tests just insert a filter/module/mixin in the pipeline and the tests will get run.

    • Easy to reason about. Each mixin is easy to understand. It does one thing. New engineers on a project have a much easier time groking a system designed this way.

  • Nginx Push Stream

    • Handles pub/sub aspect and web serving aspect of a system. And does both well.

    • Recently hit two million concurrent users with 5 servers. Hit peaks of ~950K subscribers per machine and 40 MBytes/second per machine with the CPU usage under 15%.

    • Continually write data to sockets to test of a socket is still open. If not it is cleaned up to make room for the next connection.

    • Configuration is a publish endpoint and a subscribe endpoint and how to map data between them.

    • Good monitoring built-in and accessible over a push stream status endpoint.

    • A memory leak in the module requires rolling restarts throughout the day, especially when there are a couple of hundred thousand concurrent connections per process. The browser will know quickly when it has been disconnected so it will restart and reconnect.

  • Long(er) Polling

    • This is on the browser/JavaScript client side of things.

    • Currently using WebSockets because they are fast, but are moving to EventSource because it’s built into the browser and the browser handles everything. Just register for the message type and give it a callback handler.

Testing

  • Darktime testing. Disqus is installed on millions of websites so need test with millions of concurrent connections. Use existing network to load test rather than create a faux setup in EC2.

  • Instrumented clients to say only 10% of users or exactly this website should flow through the new system, for example.

  • Darkesttime. Something important in the world is happening and a couple of websites are getting mega traffic. So they took all traffic and sent it through a single pub/sub key in the system. This helped identify a lot of hot spots in the code.

Measure

  • Measure all the things. In a pipelined system you just measure input and output of every stage so you can reconcile your data with other systems like HAProxy. Without measurement data there’s no way to drill down and find out who is wrong.

  • Express metrics as +1 and -1 if you can. (I didn’t really understand this one)

  • Sentry helps find where problems are in code.

  • Measurements make it easy to create pretty graphs.

  • When the Pope was selected and the white smoke was seen traffic peaked 245 MB per second, 6 TB of data was transferred that day, and peak CPU was 12%.

Lessons Learned

  • Do work once. In a large fanout architecture, do work in one place and then send it out to all the consumers. Don’t repeat work for each consumer.

  • The code most likely to fail is your code. You’re smart, but really smart people wrote redis and other products, so be concerned more about your code than other parts of the system.

  • End-to-end acks are good, but expensive. Necessary for customers who want 100% delivery. Couldn’t do it for every front-end user.

  • Greenlets are free. Use them. They make code much easier to read.

  • Publish is free. Publish to all channels. They were able to make a great realtime map of traffic without any prior planning because messages were published over all channels.

  • Sometimes there are big wins. Discovering the Nginx Push Stream module simplified huge chunks of their system and reduced server count.

  • Understand use cases when load testing so you can really test your system.

  • Test with real traffic. This is a much easier approach as your system gets larger and generating synthetic loads would be a huge project in itself.

  • Use Python. They really like Python and Django, though some of the backend stuff is now being written in Go.

  • Increasing server counts in response to scale is a sign your architecture may need some tuning. Take a look at one you can do change your architecture and use resources more efficiently.

  • Use off the shelf technologies. Don’t feel like you have to build everything from scratch. Leverage code so you can keep your team small.

Update:

14110625651_9ccec7d022_n

So Disqus has grown a bit:

  • 1.3 billion unique visitors
  • 10 billion page views
  • 500 million users engaged in discussions
  • 3 million communities
  • 25 million comments

They are still all about realtime, but Go replaced Python in their Realtime system:

  • Original Realtime backend was written in a pretty lightweight Python + gevent.
  • The realtime service is a hybrid of CPU intensive tasks + lots of network IO. Gevent was handling the network IO without an issue, but at higher contention, the CPU was choking everything. Switching over to Go removed that contention, which was the primary issue that was being seen.
  • Still runs on 5 machines Nginx machines.
    • Uses NginxPushStream, which supprts EventSource, WebSocket, Long Polling, and Forever Iframe.
    • All users are connected to these machines.
    • On a normal day each machine sees 3200 connections/s, 1 million connections, 150K packets/s TX and 130K packets/s RX, 150 mbits/s TX and 80 mbits/s RC, with <15ms delay end-to-end (which is faster than Javascript can render a comment)
    • Had many issues with resource exhaustion at first. The configuration for Nginx and the OS are given that help alleviate the problems, tuning them to handle a scenario with many connections moving little data.
  • Ran out of network bandwidth before anything else.
    • Using 10 gigabit network interface cards helped a lot.
    • Enabling gzip helped a lot, but Nginx preallocates a lot of memory per connection for gzip, but since comments are small this was overkill. Ruducing Nginx buffer sizes reduced out of memory problems.
  • As message rates increased, at peak processing 10k+ messages per second, the machines maxed out, and end-to-end latency went to seconds and minutes in the worst case.
  • Switched to Go.
    • Liked Go because of its performance, native concurrency, and familiarity for Python programmers.
    • In only a week a replacement system was built with impressive results:
      • End-to-end latency is on average, less than 10ms.
      • Currently consuming roughly 10-20% of available CPU. A huge reduction.
    • Node was not selected because it does not handle CPU intensive tasks well
    • Go does not directly access the database. It consumes a queue from RabbitMQ and publishes to the Nginx frontends.
    • A Go framework is not being used. This is a tiny component and the rest of Disqus is still Django.
  • They wanted to use resources better, not add more machines:
    • For the amount of work that was being done, we didn’t want to horizontally scale more. Throwing more and more hardware at a problem isn’t always the best solution. In the end, having a faster product yields its own benefits as well.

(Via highscalability.com)

The WhatsApp Architecture

WhatsApp stats: What has hundreds of nodes, thousands of cores, hundreds of terabytes of RAM, and hopes to serve the billions of smartphones that will soon be a reality around the globe? The Erlang/FreeBSD-based server infrastructure at WhatsApp. We’ve faced many challenges in meeting the ever-growing demand for our messaging services, but as we continue to push the envelope on size (>8000 cores) and speed (>70M Erlang messages per second) of our serving system.

A warning here, we don’t know a lot about the WhatsApp over all architecture. Just bits and pieces gathered from various sources. Rick Reed’s main talk is about the optimization process used to get to 2 million connections a server while using Erlang, which is interesting, but it’s not a complete architecture talk.

Stats

These stats are generally for the current system, not the system we have a talk on. The talk on the current system will include more on hacks for data storage, messaging, meta-clustering, and more BEAM/OTP patches.

  • 450 million active users, and reached that number faster than any other company in history.

  • 32 engineers, one developer supports 14 million active users

  • 50 billion messages every day across seven platforms (inbound + outbound)

  • 1+ million people sign up every day

  • $0 invested in advertising

  • $60 million investment from Sequoia Capital; $3.4 billion is the amount Sequoia will make

  • 35% is how much of Facebook’s cash is being used for the deal
  • Hundreds of nodes

  • >8000 cores

  • Hundreds of terabytes of RAM

  • >70M Erlang messages per second

  • In 2011 WhatsApp achieved 1 million established tcp sessions on a single machine with memory and cpu to spare. In 2012 that was pushed to over 2 million tcp connections. In 2013 WhatsApp tweeted out: On Dec 31st we had a new record day: 7B msgs inbound, 11B msgs outbound = 18 billion total messages processed in one day! Happy 2013!!!

Platform

Backend

  • Erlang

  • FreeBSD

  • Yaws, lighttpd

  • PHP

  • Custom patches to BEAM (BEAM is like Java’s JVM, but for Erlang)

  • Custom XMPP

  • Hosting may be in Softlayer

Frontend

  • Seven client platforms: iPhone, Android, Blackberry, Nokia Symbian S60, Nokia S40, Windows Phone, ?

  • SQLite

Hardware

  • Standard user facing server:

    • Dual Westmere Hex-core (24 logical CPUs);

    • 100GB RAM, SSD;

    • Dual NIC (public user-facing network, private back-end/distribution);

Product

  • Focus is on messaging. Connecting people all over the world, regardless of where they are in the world, without having to pay a lot of money. Founder Jan Koum remembers how difficult it was in 1992 to connect to family all over the world.

  • Privacy. Shaped by Jan Koum’s experiences growing up in the Ukraine, where nothing was private. Messages are not stored on servers; chat history is not stored; goal is to know as little about users as possible; your name and your gender are not known; chat history is only on your phone.

General

  • WhatsApp server is almost completely implemented in Erlang.

    • Server systems that do the backend message routing are done in Erlang.

    • Great achievement is that the number of active users is managed with a really small server footprint. Team consensus is that it is largely because of Erlang.

    • Interesting to note Facebook Chat was written in Erlang in 2009, but they went away from it because it was hard to find qualified programmers.

  • WhatsApp server has started from ejabberd

    • Ejabberd is a famous open source Jabber server written in Erlang.

    • Originally chosen because its open, had great reviews by developers, ease of start and the promise of Erlang’s long term suitability for large communication system.

    • The next few years were spent re-writing and modifying quite a few parts of ejabberd, including switching from XMPP to internally developed protocol, restructuring the code base and redesigning some core components, and making lots of important modifications to Erlang VM to optimize server performance.

  • To handle 50 billion messages a day the focus is on making a reliable system that works. Monetization is something to look at later, it’s far far down the road.

  • A primary gauge of system health is message queue length. The message queue length of all the processes on a node is constantly monitored and an alert is sent out if they accumulate backlog beyond a preset threshold. If one or more processes falls behind that is alerted on, which gives a pointer to the next bottleneck to attack.

  • Multimedia messages are sent by uploading the image, audio or video to be sent to an HTTP server and then sending a link to the content along with its Base64 encoded thumbnail (if applicable).

  • Some code is usually pushed every day. Often, it’s multiple times a day, though in general peak traffic times are avoided. Erlang helps being aggressive in getting fixes and features into production. Hot-loading means updates can be pushed without restarts or traffic shifting. Mistakes can usually be undone very quickly, again by hot-loading. Systems tend to be much more loosely-coupled which makes it very easy to roll changes out incrementally.

  • What protocol is used in Whatsapp app? SSL socket to the WhatsApp server pools. All messages are queued on the server until the client reconnects to retrieve the messages. The successful retrieval of a message is sent back to the whatsapp server which forwards this status back to the original sender (which will see that as a “checkmark” icon next to the message). Messages are wiped from the server memory as soon as the client has accepted the message

  • How does the registration process work internally in Whatsapp? WhatsApp used to create a username/password based on the phone IMEI number. This was changed recently. WhatsApp now uses a general request from the app to send a unique 5 digit PIN. WhatsApp will then send a SMS to the indicated phone number (this means the WhatsApp client no longer needs to run on the same phone). Based on the pin number the app then request a unique key from WhatsApp. This key is used as “password” for all future calls. (this “permanent” key is stored on the device). This also means that registering a new device will invalidate the key on the old device.

  • Google’s push service is used on Android.

  • More users on Android. Android is more enjoyable to work with. Developers are able to prototype a feature and push it out to hundreds of millions of users overnight, if there’s an issue it can be fixed quickly. iOS, not so much.

The Quest For 2+ Million Connections Per Server

  • Experienced lots of user growth, which is a good problem to have, but it also means having to spend money buying more hardware and increased operational complexity of managing all those machines.

  • Need to plan for bumps in traffic. Examples are soccer games and earthquakes in Spain or Mexico. These happen near peak traffic loads, so there needs to be enough spare capacity to handle peaks + bumps. A recent soccer match generated a 35% spike in outbound message rate right at the daily peak.

  • Initial server loading was 200 simultaneous connections per server.

    • Extrapolated out would mean a lot of servers with the hoped for growth pattern.

    • Servers were brittle in the face of burst loads. Network glitches and other problems would occur. Needed to decouple components so things weren’t so brittle at high capacity.

    • Goal was a million connections per server. An ambitious goal given at the time they were running at 200K connections. Running servers with headroom to allow for world events, hardware failures, and other types of glitches would require enough resilience to handle the high usage levels and failures.

Tools And Techniques Used To Increase Scalability

  • Wrote system activity reporter tool (wsar):

    • Record system stats across the system, including OS stats, hardware stats, BEAM stats. It was build so it was easy to plugin metrics from other systems, like virtual memory. Track CPU utilization, overall utilization, user time, system time, interrupt time, context switches, system calls, traps, packets sent/received, total count of messages in queues across all processes, busy port events, traffic rate, bytes in/out, scheduling stats, garbage collection stats, words collected, etc.

    • Initially ran once a minute. As the systems were driven harder one second polling resolution was required because events that happened in the space if a minute were invisible. Really fine grained stats to see how everything is performing.

  • Hardware performance counters in CPU (pmcstat):

    • See where the CPU is at as a percentage of time. Can tell how much time is being spent executing the emulator loop. In their case it is 16% which tells them that only 16% is executing emulated code so even if you were able to remove all the execution time of all the Erlang code it would only save 16% out of the total runtime. This implies you should focus in other areas to improve efficiency of the system.

  • dtrace, kernel lock-counting, fprof

    • Dtrace was mostly for debugging, not performance.

    • Patched BEAM on FreeBSD to include CPU time stamp.

    • Wrote scripts to create an aggregated view of across all processes to see where routines are spending all the  time.

    • Biggest win was compiling the emulator with lock counting turned on.

  • Some Issues:

    • Earlier on saw more time spent in the garbage collections routines, that was brought down.

    • Saw some issues with the networking stack that was tuned away.

    • Most issues were with lock contention in the emulator which shows strongly in the output of the lock counting.

  • Measurement:

    • Synthetic workloads, which means generating traffic from your own test scripts, is of little value for tuning user facing systems at extreme scale.

      • Worked well for simple interfaces like a user table, generating inserts and reads as quickly as possible.

      • If supporting a million connections on a server it would take 30 hosts to open enough IP ports to generate enough connections to test just one server. For two million servers that would take 60 hosts. Just difficult to generate that kind of scale.

      • The type of traffic that is seen during production is difficult to generate. Can guess at a normal workload, but in actuality see networking events, world events, since multi-platform see varying behaviour between clients, and varying by country.

    • Tee’d workload:

      • Take normal production traffic and pipe it off to a separate system.

      • Very useful for systems for which side effects could be constrained. Don’t want to tee traffic and do things that would affect the permanent state of a user or result in multiple messages going to users.

      • Erlang supports hot loading, so could be under a full production load, have an idea, compile, load the change as the program is running and instantly see if that change is better or worse.

      • Added knobs to change production load dynamically and see how it would affect performance. Would be tailing the sar output looking at things like CPU usage, VM utilization, listen queue overflows, and turn knobs to see how the system reacted.

    • True production loads:

      • Ultimate test. Doing both input work and output work.

      • Put server in DNS a couple of times so it would get double or triple the normal traffic. Creates issues with TTLs because clients don’t respect DNS TTLs and there’s a delay, so can’t quickly react to getting more traffic than can be dealt with.

      • IPFW. Forward traffic from one server to another so could give a host exactly the number of desired client connections. A bug caused a kernel panic so that didn’t work very well.

  • Results:

    • Started at 200K simultaneous connections per server.

    • First bottleneck showed up at 425K. System ran into a lot of contention. Work stopped. Instrumented the scheduler to measure how much useful work is being done, or sleeping, or spinning. Under load it started to hit sleeping locks so 35-45% CPU was being used across the system but the schedulers are at 95% utilization.

    • The first round of fixes got to over a million connections.

      • VM usage is at 76%. CPU is at 73%. BEAM emulator running at 45% utilization, which matches closely to user percentage, which is good because the emulator runs as user.

      • Ordinarily CPU utilization isn’t a good measure of how busy a system is because the scheduler uses CPU.

    • A month later tackling bottlenecks 2 million connections per server was achieved.

      • BEAM utilization at 80%, close to where FreeBSD might start paging. CPU is about the same, with double the connections. Scheduler is hitting contention, but running pretty well.

    • Seemed like a good place to stop so started profiling Erlang code.

      • Originally had two Erlang processes per connection. Cut that to one.

      • Did some things with timers.

    • Peaked at 2.8M connections per server

      • 571k pkts/sec, >200k dist msgs/sec

      • Made some memory optimizations so VM load was down to 70%.

    • Tried 3 million connections, but failed.

      • See long message queues when the system is in trouble. Either a single message queue or a sum of message queues.

      • Added to BEAM instrumentation on message queue stats per process. How many messages are being sent/received, how fast.

      • Sampling every 10 seconds, could see a process had 600K messages in its message queue with a dequeue rate of 40K with a delay of 15 seconds. Projected drain time was 41 seconds.

  • Findings:

    • Erlang + BEAM + their fixes – has awesome SMP scalability. Nearly linear scalability. Remarkable. On a 24-way box can run the system with 85% CPU utilization and it’s keeping up running a production load. It can run like this all day.

      • Testament to Erlang’s program model.

      • The longer a server has been up it will accumulate long running connections that are mostly idle so it can handle more connections because these connections are not as busy per connection.

    • Contention was biggest issue.

      • Some fixes were in their Erlang code to reduce BEAM’s contention issues.

      • Some patched to BEAM.

      • Partitioning workload so work didn’t have to cross processors a lot.

      • Time-of-day lock. Every time a message is delivered from a port it looks to update the time-of-day which is a single lock across all schedulers which means all CPUs are hitting one lock.

      • Optimized use of timer wheels. Removed bif timer

      • Check IO time table grows arithmetically. Created VM thrashing has the hash table would be reallocated at various points. Improved to use geometric allocation of the table.

      • Added write file that takes a port that you already have open to reduce port contention.

      • Mseg allocation is single point of contention across all allocators. Make per scheduler.

      • Lots of port transactions when accepting a connection. Set option to reduce expensive port interactions.

      • When message queue backlogs became large garbage collection would destabilize the system. So pause GC until the queues shrunk.

    • Avoiding some common things that come at a price.

      • Backported a TSE time counter from FreeBSD 9 to 8. It’s a cheaper to read timer. Fast to get time of day, less expensive than going to a chip.

      • Backported igp network driver from FreeBSD 9 because having issue with multiple queue on NICs locking up.

      • Increase number of files and sockets.

      • Pmcstat showed a lot of time was spent looking up PCBs in the network stack. So bumped up the size of the hash table to make lookups faster.

    • BEAM Patches

      • Previously mentioned instrumentation patches. Instrument scheduler to get utilization information, statistics for message queues, number of sleeps, send rates, message counts, etc. Can be done in Erlang code with procinfo, but with a million connections it’s very slow.

      • Stats collection is very efficient to gather so they can be run in production.

      • Stats kept at 3 different decay intervals: 1, 10, 100 second intervals. Allows seeing issues over time.

      • Make lock counting work for larger async thread counts.

      • Added debug options to debug lock counters.

    • Tuning

      • Set the scheduler wake up threshold to low because schedulers would go to sleep and would never wake up.

      • Prefer mseg allocators over malloc.

      • Have an allocator per instance per scheduler.

      • Configure carrier sizes start out big and get bigger. Causes FreeBSD to use super pages. Reduced TLB thrash rate and improves throughput for the same CPU.

      • Run BEAM at real-time priority so that other things like cron jobs don’t interrupt schedule. Prevents glitches that would cause backlogs of important user traffic.

      • Patch to dial down spin counts so the scheduler wouldn’t spin.

    • Mnesia

      • Prefer os:timestamp to erlang:now.

      • Using no transactions, but with remote replication ran into a backlog. Parallelized replication for each table to increase throughput.

    • There are actually lots more changes that were made.

Lessons

  • Optimization is dark grungy work suitable only for trolls and engineers. When Rick is going through all the changes that he made to get to 2 million connections a server it was mind numbing. Notice the immense amount of work that went into writing tools, running tests, backporting code, adding gobs of instrumentation to nearly every level of the stack, tuning the system, looking at traces, mucking with very low level details and just trying to understand everything. That’s what it takes to remove the bottlenecks in order to increase performance and scalability to extreme levels.

  • Get the data you needWrite tools. Patch tools. Add knobs. Ken was relentless in extending the system to get the data they needed, constantly writing tools and scripts to the data they needed to manage and optimize the system. Do whatever it takes.

  • Measure. Remove Bottlenecks. Test. Repeat. That’s how you do it.

  • Erlang rocks! Erlang continues to prove its capability as a versatile, reliable, high-performance platform. Though personally all the tuning and patching that was required casts some doubt on this claim.

  • Crack the virality code and profit. Virality is an allusive quality, but as WhatsApp shows, if you do figure out, man, it’s worth a lot of money.

  • Value and employee count are now officially divorced. There are a lot of force-multipliers out in the world today. An advanced global telecom infrastructure makes apps like WhatsApp possible. If WhatsApp had to make the network or a phone etc it would never happen. Powerful cheap hardware and Open Source software availability is of course another multiplier. As is being in the right place at the right time with the right product in front of the right buyer.

  • There’s something to this brutal focus on the user idea. WhatsApp is focussed on being a simple messaging app, not being a gaming network, or an advertising network, or a disappearing photos network. That worked for them. It guided their no ads stance, their ability to keep the app simple while adding features, and the overall no brainer it just works philosohpy on any phone.

  • Limits in the cause of simplicity are OK. Your identity is tied to the phone number, so if you change your phone number your identity is gone. This is very un-computer like. But it does make the entire system much simpler in design.

  • Age ain’t no thing. If it was age discrimination that prevented WhatsApp co-founder Brian Acton from getting a job at both Twitter and Facebook in 2009, then shame, shame, shame.

  • Start simply and then customize. When chat was launched initially the server side was based on ejabberd. It’s since been completely rewritten, but that was the initial step in the Erlang direction. The experience with the scalability, reliability, and operability of Erlang in that initial use case led to broader and broader use.

  • Keep server count low. Constantly work to keep server counts as low as possible while leaving enough headroom for events that create short-term spikes in usage. Analyze and optimize until the point of diminishing returns is hit on those efforts and then deploy more hardware.

  • Purposely overprovision hardware. This ensures that users have uninterrupted service during their festivities and employees are able to enjoy the holidays without spending the whole time fixing overload issues.

  • Growth stalls when you charge money. Growth was super fast when WhatsApp was free, 10,000 downloads a day in the early days. Then when switching over to paid that declined to 1,000 a day. At the end of the year, after adding picture messaging, they settled on charging a one-time download fee, later modified to an annual payment.

  • Inspiration comes from the strangest places. Experience with forgetting the username and passwords on Skype accounts drove the passion for making the app “just work.”

(via HighScalability.com)

An analysis of Facebook photo caching

Every day people upload more than 350 million photos to Facebook as of December 2012 and view many more in their News Feeds and on their friends’ Timelines. Facebook stores these photos on Haystack machines that are optimized to store photos. But there is also a deep and distributed photo-serving stack with many layers of caches that delivers photos to people so they can view them.

We recently published an academic study of this stack in SOSP. In this post, we describe the stack and then cover four of our many findings:

    1. How effective each layer of the stack is.
    2. How the popularity distribution of photos changes across layers.
    3. The effect of increasing cache sizes.
    4. The effect of more advanced caching algorithms.

Facebook’s photo-serving stack

The photo-serving stack has four layers: browser caches, edge caches, origin cache, and Haystack backend. The first layer of the stack is the browser cache on peoples’ machines. If someone has recently seen or downloaded a photo, that photo will likely be in their browser cache and they can retrieve it locally. Otherwise, their browser will send an HTTPS request out to the Internet.

That request will be routed either to our CDN partners or to one of Facebook’s many edge caches, which are located at Internet points of presence (PoPs). (In the paper and this blog post, we focus on what happens in the Facebook-controlled stack.) The particular edge cache that a request is routed to is encoded in the request’s URL. If the requested photo is present at the edge cache, then it returns the photo to the user’s browser, which will store it in its local cache. If the photo is not present at the edge cache, it is requested from the origin cache.

The origin cache is a single cache that is distributed across multiple data centers (like PrinevilleForest City, and Lulea). Requests from the edge caches are routed to hosts in the origin cache based on the requested photo’s ID. This means that for a single picture, all requests to the origin cache from all edge caches will be directed to the same server. If the requested photo is present in the origin cache, it is returned to the person via the edge cache, which now will store the photo. If the requested photo is not present, it is fetched from the Haystack backend.

The Haystack backend stores all photos, so all requests can be satisfied at this layer. Requests from the origin cache are typically filled by Haystack machines in the same data center. If for some reason the local Haystack machine is unavailable, the origin cache will request the photo from a Haystack machine in another data center. In either case, the photo flows back up the caching stack being stored at each layer as it goes: origin cache, edge cache, and then browser cache.

Trace collection

In our study, we collected a month-long trace of requests from non-mobile users for a deterministic subset of photos that was served entirely by the Facebook-controlled stack. The trace captures hits and misses at all levels of the stack, including client browser caches. All analysis and simulation is based on this trace.

From our trace we determined the hit ratio at each of the layers and the ratio of total requests that travels past each layer. Each layer significantly reduces the volume of traffic so that the Haystack backend only needs to serve 9.9% of requested photos.

This is a significant reduction in requests, but we can do even better.

Shifting popularity distributions

It is well known that the popularity of objects on the web tends to follow a power law distribution. Our study confirmed this for the Facebook photo workload and showed how this distribution shifts as we move down layers in the stack. We counted the number of requests for each photo at every layer, sorted them by popularity, and then graphed them on a log-log scale. The power law relationship shows up as linear on this scale.

As we move down the stack, the alpha parameter for the power law distribution decreases and flattens out. The distribution also changes to one that more closely resembles a stretched exponential distribution.

Serving popular photos

Caches derive most of their utility from serving the most popular content many times. To quantify to what degree this was true for the Facebook stack, we split photos up into popularity groups. Group “high” has the 1,000 most popular photos, group “medium” the next 9,000 most popular photos, group “low” the next 90,000 most popular photos, and group “lowest” the least popular 2.5 million photos in the subset of photos we traced. Each of these groups represents about 25% of user requests. This graph shows the percent of requests served by each layer for these four groups.

As expected, the browser and edge caches are very effective for the most popular photos in our trace and progressively less effective for less popular groups. The origin cache’s largest effect is seen for popularity group low, which also agrees with our intuition. More popular photos will be effectively cached earlier in the stack at the browser and edge layers. Less popular photos are requested too infrequently to be effectively cached.

Larger caches improve hit ratios

Unless your entire workload fits in the current cache, a larger cache will improve hit ratios — but by how much? To find out we took our trace to each layer and replayed it using a variety of different cache sizes. We share the results for one of the edge caches and the origin cache here.

Size X in the graph is current size of that edge cache. Doubling the size of the cache would significantly increase the hit ratio, from 59% to 65%. Tripling the size of the cache would further increase hit ratio to 68.5%.

The infinite line in the graphs shows the performance of a cache of infinite size, i.e., one that can hold the entire workload. It is an upper bound on the possible hit ratio and it shows us there is still much room for improvement.

The origin cache shows even more room for improvement from larger caches. Doubling the size of the origin cache improves hit ratio from 33% to 42.5%. Tripling increases that to 48.5%. The results for a cache of infinite size again demonstrate that there is still much room for improvement.

Advanced caching algorithms improve hit ratios

Larger caches are not the only way to improve hit ratios — being smarter about what items you keep in the cache can also have an effect. We used our trace to test the effects of using a variety of different caching algorithms. We published results for the following algorithms:

    • FIFO: A first-in-first-out queue is used for cache eviction. This is the algorithm Facebook currently uses.
    • LRU: A priority queue ordered by last-access time is used for cache eviction.
    • LFU: A priority queue ordered first by number of hits and then by last-access time is used for cache eviction.
    • S4LRU – Quadruple-segmented LRU. An intermediate algorithm between LRU and LFU (see paper for details).
    • Clairvoyant: A priority queue ordered by next-access time is used for cache eviction. (Requires knowledge of the future and is impossible to implement in practice, but gives a more accurate upper bound on the performance of an ideal caching algorithm.)

First, how do the different algorithms stack up at an edge cache?

While both LRU and LFU give slightly higher hit ratios than the FIFO caching policy, the S4LRU caching algorithm gives much higher hit ratios. Based on the data from our trace, at the current size the S4LRU algorithm would increase the edge cache hit ratio from 59.2% to 67.7%. At twice the current size, the trace data predicts it would increase cache hit ratios to 72.0%.

We see even more pronounced effects at the origin cache.

Based on our trace data, the S4LRU algorithm again gives the best results. (We tested quite a few different caching algorithms that are less common than LRU and LFU but did not publish results for them because S4LRU outperformed all of them in our tests.) It increases hit ratio from 33.0% to 46.9% based on our trace data. At twice the current size, our data predicts that S4LRU can provide a 54.4% hit ratio, a 21.4% improvement over the current FIFO cache.

More information

Qi Huang gave a talk on this work at SOSP. The slides and a video of the talk are on the SOSP conference page. There are also many more details in our paper.

(via Facebook.com)

 

NYTimes Architecture: No Head, No Master, No Single Point Of Failure

Michael Laing, a Systems Architect at NYTimes, gave this great decription of their use of RabbitMQ and their overall architecture on the RabbitMQ mailing list. The closing sentiment marks this as definitely an architecture to learn from:

Although it may seem complex, nytimes architecture has simple components and is mostly principles and plumbing. The key point to grasp is that there is no head, no master, no single point of failure. As I write this I can see components failing (not RabbitMQ), and we are fixing them so they are more reliable. But the system doesn’t fail, users can connect, and messages are delivered, regardless – all within design parameters.

Since it’s short, to the point, and I couldn’t say it better, I’ll just reproduce two of the email list posts here:

Just a quick note and thank you to the RabbitMQ team for a great product.

Our premier online offering http://www.nytimes.com has a new look and new underpinnings, now including a messaging architecture implemented using RabbitMQ.

This architecture – nytimes architecture – has dozens of RabbitMQ instances spread across 6 AWS zones in Oregon and Dublin. The instances are organized into “wholesale” and “retail” layers. Connection to clients is via websockets/sockjs.

Upon launch today, the system autoscaled to ~500,000 users. Connection times remained flat at ~200ms.

nytimes architecture provides subscription services for breaking news, video feeds, etc. and will add more event based services. It also supports individual messaging related to subscription status for registered users.

This system would not have been possible without RabbitMQ. It was the one component, used everywhere, that never faltered or failed.

We are using: a single Amazon Linux AMI, RabbitMQ, Cassandra 2, python 2We use pika with tornado and libev for the nytimes architecture wholesale and retail pieces; our internal clients use Java and PHP.

We use shovels – lots of shovels – to interconnect.

In production we have a RabbitMQ client 3-cluster and a core 3-cluster in each region on c1-xlarges. A proxy cluster of c1-mediums in Virginia connects clients to the client clusters. All services are parallelized so we can add more cores and clients.

The retail layer autoscales and use c1-mediums with a single rabbit shovel-connected to one of the core rabbits. Each python websocket/sockjs gateway supports up to 100K clients.

We autodeploy into subnets within Virtual Private Clouds in AWS. Clients are routed via least latency to the fastest healthy region.

Of the technical components, the gateway is the most complex. We will be moving it into open source in pieces and the first piece is likely to be the python websocket/sockjs libraries which, frankly, beat the crap out of most other stuff out there and fully conform with the relevant standards. It can be loosely thought of as a C co-process managed by python, and as such, may be possible to reuse in other languages/environments.

We have a 12-node Cassandra cluster across the 2 regions / 6 zones. It is used for persistence of messages and as cache. We do not use persistence in RabbitMQ. Our services are idempotent and important messages may be replicated multiple times creating intentional race conditions in which the fastest wins.

Although it may seem complex, nytimes architecture has simple components and is mostly principles and plumbing. The key point to grasp is that there is no head, no master, no single point of failure. As I write this I can see components failing (not RabbitMQ), and we are fixing them so they are more reliable. But the system doesn’t fail, users can connect, and messages are delivered, regardless – all within design parameters.

(Via HighScalability.com)

Scaling Mercurial at Facebook

With thousands of commits a week across hundreds of thousands of files, Facebook’s main source repository is enormous–many times larger than even the Linux kernel, which checked in at 17 million lines of code and 44,000 files in 2013. Given our size and complexity—and Facebook’s practice of shipping code twice a day–improving our source control is one way we help our engineers move fast.

Choosing a source control system

Two years ago, as we saw our repository continue to grow at a staggering rate, we sat down and extrapolated our growth forward a few years. Based on those projections, it appeared likely that our then-current technology, a Subversion server with a Git mirror, would become a productivity bottleneck very soon. We looked at the available options and found none that were both fast and easy to use at scale.

Our code base has grown organically and its internal dependencies are very complex. We could have spent a lot of time making it more modular in a way that would be friendly to a source control tool, but there are a number of benefits to using a single repository. Even at our current scale, we often make large changes throughout our code base, and having a single repository is useful for continuous modernization. Splitting it up would make large, atomic refactorings more difficult. On top of that, the idea that the scaling constraints of our source control system should dictate our code structure just doesn’t sit well with us.

We realized that we’d have to solve this ourselves. But instead of building a new system from scratch, we decided to take an existing one and make it scale. Our engineers were comfortable with Git and we preferred to stay with a familiar tool, so we took a long, hard look at improving it to work at scale. After much deliberation, we concluded that Git’s internals would be difficult to work with for an ambitious scaling project.

Instead, we chose to improve Mercurial. Mercurial is a distributed source control system similar to Git, with many equivalent features. Importantly, it’s written mostly in clean, modular Python (with some native code for hot paths), making it deeply extensible. Just as importantly, the Mercurial developer community is actively helping us address our scaling problems by reviewing our patches and keeping our scale in mind when designing new features.

When we first started working on Mercurial, we found that it was slower than Git in several notable areas. To narrow this performance gap, we’ve contributed over 500 patches to Mercurial over the last year and a half. These range from new graph algorithms to rewrites of tight loops in native code. These helped, but we also wanted to make more fundamental changes to address the problem of scale.

Speeding up file status operations

For a repository as large as ours, a major bottleneck is simply finding out what files have changed. Git examines every file and naturally becomes slower and slower as the number of files increases, while Perforce “cheats” by forcing users to tell it which files they are going to edit. The Git approach doesn’t scale, and the Perforce approach isn’t friendly.

We solved this by monitoring the file system for changes. This has been tried before, even for Mercurial, but making it work reliably is surprisingly challenging. We decided to query our build system’s file monitor, Watchman, to see which files have changed. Mercurial’s design made integrating with Watchman straightforward, but we expected Watchman to have bugs, so we developed a strategy to address them safely.

Through heavy stress testing and internal dogfooding, we identified and fixed many of the issues and race conditions that are common in file system monitoring. In particular, we ran a beta test on all our engineers’ machines, comparing Watchman’s answers for real user queries with the actual file system results and logging any differences. After a couple months of monitoring and fixing discrepancies in usage, we got the rate low enough that we were comfortable enabling Watchman by default for our engineers.

For our repository, enabling Watchman integration has made Mercurial’s status command more than 5x faster than Git’s status command. Other commands that look for changed files–like diff, update, and commit—also became faster.

Working with large histories

The rate of commits and the sheer size of our history also pose challenges. We have thousands of commits being made every day, and as the repository gets larger, it becomes increasingly painful to clone and pull all of it. Centralized source control systems like Subversion avoid this by only checking out a single commit, leaving all of the history on the server. This saves space on the client but leaves you unable to work if the server goes down. More recent distributed source control systems, like Git and Mercurial, copy all of the history to the client which takes more time and space, but allows you to browse and commit entirely locally. We wanted a happy medium between the speed and space of a centralized system and the robustness and flexibility of a distributed one.

Improving clone and pull

Normally when you run a pull, Mercurial figures out what has changed on the server since the last pull and downloads any new commit metadata and file contents. With tens of thousands of files changing every day, downloading all of this history to the client every day is slow. To solve this problem we created the remotefilelog extension for Mercurial. This extension changes the clone and pull commands to download only the commit metadata, while omitting all file changes that account for the bulk of the download. When a user performs an operation that needs the contents of files (such as checkout), we download the file contents on demand using Facebook’s existing memcache infrastructure. This allows clone and pull to be fast no matter how much history has changed, while only adding a slight overhead to checkout.

But what if the central Mercurial server goes down? A big benefit of distributed source control is the ability to work without interacting with the server. The remotefilelog extension intelligently caches the file revisions needed for your local commits so you can checkout, rebase, and commit to any of your existing bookmarks without needing to access the server. Since we still download all of the commit metadata, operations that don’t require file contents (such as log) are completely local as well. Lastly, we use Facebook’s memcache infrastructure as a caching layer in front of the central Mercurial server, so that even if the central repository goes down, memcache will continue to serve many of the file content requests.

This type of setup is of course not for everyone—it’s optimized for work environments that have a reliable Mercurial server and that are always connected to a fast, low-latency network. For work environments that don’t have a fast, reliable Internet connection, this extension could result in Mercurial commands being slow and failing unexpectedly when the server is congested or unreachable.

Clone and pull performance gains

Enabling the remotefilelog extension for employees at Facebook has made Mercurial clones and pulls 10x faster, bringing them down from minutes to seconds. In addition, because of the way remotefilelog stores its local data on disk, large rebases are 2x faster. When compared with our previous Git infrastructure, the numbers remain impressive. Achieving these types of performance gains through extensions is one of the big reasons we chose Mercurial.

Finally, the remotefilelog extension allowed us to shift most of the request traffic to memcache, which reduced the Mercurial server’s network load by more than 10x. This will make it easier for our Mercurial infrastructure to keep scaling to meet growing demand.

How it works

Mercurial has several nice abstractions that made this extension possible. The most notable is the filelog class. The filelog is a data structure for representing every revision of a particular file. Each version of a file is identified by a unique hash. Given a hash, the filelog can reconstruct the requested version of a file. The remotefilelog extension replaces the filelog with an alternative implementation that has the same interface. It accepts a hash, but instead of reconstructing the version of the file from local data, it fetches that version from either a local cache or the remote server. When we need to request a large number of files from the server, we do it in large batches to avoid the overhead of many requests.

Open Source

Together, the hgwatchman and remotefilelog extensions have improved source control performance for our developers, allowing them to spend more time getting stuff done instead of waiting for their tools. If you have a large deployment of a distributed revision control system, we encourage you to take a look at them. They’ve made a difference for our developers, and we hope they will prove valuable to yours, too.

(Via: Facebook Engineering Blog)