How Disqus Went Realtime With 165K Messages Per Second And Less Than .2 Seconds Latency

14046544182_2e110fb29c_m

How do you add realtime functionality to a web scale application? That’s what Adam Hitchcock, a Software Engineer at Disqus talks about in an excellent talk: Making DISQUS Realtime (slides).

Disqus had to take their commenting system and add realtime capabilities to it. Not something that’s easy to do when at the time of the talk (2013) they had had just hit a billion unique visitors a month.

What Disqus developed is a realtime commenting system called “realertime” that was tested to handle 1.5 million concurrently connected users, 45,000 new connections per second, 165,000 messages/second, with less than .2 seconds latency end-to-end.

The nature of a commenting system is that it is IO bound and has a high fanout, that is a comment comes in and must be sent out to a lot of readers. It’s a problem very similar to what Twitter must solve.

Disqus’ solution was quite interesting as was the path to their solution. They tried different architectures but settled on a solution built on Python, Django, Nginx Push Stream Module, and Thoonk, all unified by a flexible pipeline architecture. In the process they we are able to substantially reduce their server count and easily handle high traffic loads.

At one point in the talk Adam asks if a pipelined architecture is a good one? For Disqus messages filtering through a series of transforms is a perfect match. And it’s a very old idea. Unix System 5 has long had a Streams capability for creating flexible pipelines architectures. It’s an incredibly flexible and powerful way of organizing code.

So let’s see how Disqus evolved their realtime commenting architecture and created something both old and new in the process…

Stats

  • Current:

    • 3 million websites use Disqus as their commenting system

    • Half a billion people engaged in conversations every month

    • 20 million comments every month

  • As of ~March 2013:

    • A billion unique visitors a month.

    • 18 Engineers

Platform

  • Python (Disqus is a service and is written in Python and other languages)

  • Django

  • Thoonk Redis Queue – a queue library on top of redis.

  • Nginx Push Stream Module – A pure stream http push technology for your Nginx setup. Comet made easy and really scalable.

  • Gevent – coroutine-based Python networking library that uses greenlet to provide a high-level synchronous API on top of the libev event loop.

  • Long Polling using EventSource (in the browser)

  • Sentry – a realtime, platform-agnostic error logging and aggregation platform.

  • Scales – tracks server state and statistics, allowing you to see what your server is doing.

  • Runs on raw metal, not EC2.

Architecture

  • Motivation for realtime:

    • Engagement. Realtime distribution of comments encourages users to stay on a page longer. More people comment after realtime than they did before.

    • Sell/trade data. Create a fire-hose product out of the global comment stream.

  • Old realtime system:

    • The Disqus app, written in Django, would post to memcache on many keys: forum:id, thread:id, user:id, post:id. Maybe someone in the future might find it interesting. Since pub/sub is cheap to do, this allows for later innovation.

    • Front-end client would poll the memcache key every couple of seconds.

    • Client would display any new comments.

    • Problem: did not scale at all. Only 10% of the network could use the product at one time.

  • First solution approach:

    • New Posts -> Disqus -> redis pub/sub -> Flask (a web framework) Front End cluster <- HAProxy <- clients.

    • Clients would connect to HAProxy. HAProxy was used to handle millions of connections.

    • Problem: rapidly ran out of CPU on flask machines because they were doing redundant work. If two subscribers were listening to the same thread the message would be formatted twice.

  • Second approach:

    • A backend server was created to do the dedupe formatting work.

    • So  the new flow: New Posts -> Disqus -> redis queue -> “python glue” Gevent formatting server (2 servers for redundancy) -> redis pub/sub (6 servers) -> Flask FE (front end) Cluster (14 big servers) <- HA Proxy (5 servers) <- clients

    • This worked well. Except as it scaled out it was using more and more servers, especially the Flask cluster. The redis pub/sub cluster was also growing quickly.

Third And Winning Approach:

  • Uses a pipelined architecture where messages pass from queue to queue while being acted upon by filters.

  • Switched to nginx + push stream module. This replaced redis pub/sub, flask servers and the HAProxy cluster.

  • New flow looks like: New Posts -> Disqus -> redis queue ->  “python glue” Gevent formatting server (2 servers) -> http post -> nginx pub endpoint -> nginx + push stream module (5 servers) <- clients

  • Only the pub/sub of redis was being used and the nginx push stream module supported the same functionality.

  • 5 push stream servers were required because of network memory limitations in the kernel. It’s a socket allocation problem, that is having lots of sockets open. Otherwise could run on 3 servers, including redundancy.

  • The Disqus part of the flow is a Django web app that uses post_save and post_delete hooks to put stuff onto a thoonk queue. These hooks are very useful for generating notifications for realtime data.

  • Thoonk is a queue library on top of redis.

    • They already had thoonk so used it instead of spinning up a HA cluster of RabbitMQ machines. Ended up really liking it.

    • Thoonk is Implemented as a state machine so it’s easy to see what jobs are claimed or not claimed, etc. Makes cleanup after a failure easy.

    • Since the queue is stored in redis using zsets, range queries can be performed on the queue. Useful to implement end-to-end acks because you can ask which messages have been processed yet, for example, and take appropriate action.

  • The python glue program.

    • Listens to the thoonk queue.

    • Performs all of the formatting and computation for clients. Includes cleaning and formatting data.

    • Originally did formatting in the flask cluster, but that took too much CPU.

    • Found that gzipping individual messages was not a win because there wasn’t enough redundancy in a message to generate sufficient savings from compression.

    • Gevent runs really fast for an IO bound system like this.

    • A watchdog makes sure a greenlet was always running, which is to say work is always being performed. A greenlet is micro-thread with no implicit scheduling:coroutines.

    • A monitor watches for lots of failures and then raises an alert when observed.

  • Pipelined architectures.

    • The python glue program is structured as a data pipeline, there are stages the data must go through: parsing, computation, publish it to another place. These are run in a greenlet.

    • Mixins were used to implement stage functionality: JSONParserMixin,  AnnomizeDataMixin, SuperSecureEncryptDataMixin, HTTPPublisher, FilePublisher.

    • The idea is to compose pipelines. A message would come off of thoonk and run through a pipeline: JSONAnnonHTTPPipeline, JSONSecureHTTPPipeline, JSONAnnonFilePipeline.

    • Pipelines can share most of their functionality, yet still be specialized. Great when bringing up a new feature you can make a new pipeline stage, make a new pipeline, and have the old pipeline run side by side with the new pipeline. Old and new features happily coexist.

    • Tests are also composable within a pipeline. To run tests just insert a filter/module/mixin in the pipeline and the tests will get run.

    • Easy to reason about. Each mixin is easy to understand. It does one thing. New engineers on a project have a much easier time groking a system designed this way.

  • Nginx Push Stream

    • Handles pub/sub aspect and web serving aspect of a system. And does both well.

    • Recently hit two million concurrent users with 5 servers. Hit peaks of ~950K subscribers per machine and 40 MBytes/second per machine with the CPU usage under 15%.

    • Continually write data to sockets to test of a socket is still open. If not it is cleaned up to make room for the next connection.

    • Configuration is a publish endpoint and a subscribe endpoint and how to map data between them.

    • Good monitoring built-in and accessible over a push stream status endpoint.

    • A memory leak in the module requires rolling restarts throughout the day, especially when there are a couple of hundred thousand concurrent connections per process. The browser will know quickly when it has been disconnected so it will restart and reconnect.

  • Long(er) Polling

    • This is on the browser/JavaScript client side of things.

    • Currently using WebSockets because they are fast, but are moving to EventSource because it’s built into the browser and the browser handles everything. Just register for the message type and give it a callback handler.

Testing

  • Darktime testing. Disqus is installed on millions of websites so need test with millions of concurrent connections. Use existing network to load test rather than create a faux setup in EC2.

  • Instrumented clients to say only 10% of users or exactly this website should flow through the new system, for example.

  • Darkesttime. Something important in the world is happening and a couple of websites are getting mega traffic. So they took all traffic and sent it through a single pub/sub key in the system. This helped identify a lot of hot spots in the code.

Measure

  • Measure all the things. In a pipelined system you just measure input and output of every stage so you can reconcile your data with other systems like HAProxy. Without measurement data there’s no way to drill down and find out who is wrong.

  • Express metrics as +1 and -1 if you can. (I didn’t really understand this one)

  • Sentry helps find where problems are in code.

  • Measurements make it easy to create pretty graphs.

  • When the Pope was selected and the white smoke was seen traffic peaked 245 MB per second, 6 TB of data was transferred that day, and peak CPU was 12%.

Lessons Learned

  • Do work once. In a large fanout architecture, do work in one place and then send it out to all the consumers. Don’t repeat work for each consumer.

  • The code most likely to fail is your code. You’re smart, but really smart people wrote redis and other products, so be concerned more about your code than other parts of the system.

  • End-to-end acks are good, but expensive. Necessary for customers who want 100% delivery. Couldn’t do it for every front-end user.

  • Greenlets are free. Use them. They make code much easier to read.

  • Publish is free. Publish to all channels. They were able to make a great realtime map of traffic without any prior planning because messages were published over all channels.

  • Sometimes there are big wins. Discovering the Nginx Push Stream module simplified huge chunks of their system and reduced server count.

  • Understand use cases when load testing so you can really test your system.

  • Test with real traffic. This is a much easier approach as your system gets larger and generating synthetic loads would be a huge project in itself.

  • Use Python. They really like Python and Django, though some of the backend stuff is now being written in Go.

  • Increasing server counts in response to scale is a sign your architecture may need some tuning. Take a look at one you can do change your architecture and use resources more efficiently.

  • Use off the shelf technologies. Don’t feel like you have to build everything from scratch. Leverage code so you can keep your team small.

Update:

14110625651_9ccec7d022_n

So Disqus has grown a bit:

  • 1.3 billion unique visitors
  • 10 billion page views
  • 500 million users engaged in discussions
  • 3 million communities
  • 25 million comments

They are still all about realtime, but Go replaced Python in their Realtime system:

  • Original Realtime backend was written in a pretty lightweight Python + gevent.
  • The realtime service is a hybrid of CPU intensive tasks + lots of network IO. Gevent was handling the network IO without an issue, but at higher contention, the CPU was choking everything. Switching over to Go removed that contention, which was the primary issue that was being seen.
  • Still runs on 5 machines Nginx machines.
    • Uses NginxPushStream, which supprts EventSource, WebSocket, Long Polling, and Forever Iframe.
    • All users are connected to these machines.
    • On a normal day each machine sees 3200 connections/s, 1 million connections, 150K packets/s TX and 130K packets/s RX, 150 mbits/s TX and 80 mbits/s RC, with <15ms delay end-to-end (which is faster than Javascript can render a comment)
    • Had many issues with resource exhaustion at first. The configuration for Nginx and the OS are given that help alleviate the problems, tuning them to handle a scenario with many connections moving little data.
  • Ran out of network bandwidth before anything else.
    • Using 10 gigabit network interface cards helped a lot.
    • Enabling gzip helped a lot, but Nginx preallocates a lot of memory per connection for gzip, but since comments are small this was overkill. Ruducing Nginx buffer sizes reduced out of memory problems.
  • As message rates increased, at peak processing 10k+ messages per second, the machines maxed out, and end-to-end latency went to seconds and minutes in the worst case.
  • Switched to Go.
    • Liked Go because of its performance, native concurrency, and familiarity for Python programmers.
    • In only a week a replacement system was built with impressive results:
      • End-to-end latency is on average, less than 10ms.
      • Currently consuming roughly 10-20% of available CPU. A huge reduction.
    • Node was not selected because it does not handle CPU intensive tasks well
    • Go does not directly access the database. It consumes a queue from RabbitMQ and publishes to the Nginx frontends.
    • A Go framework is not being used. This is a tiny component and the rest of Disqus is still Django.
  • They wanted to use resources better, not add more machines:
    • For the amount of work that was being done, we didn’t want to horizontally scale more. Throwing more and more hardware at a problem isn’t always the best solution. In the end, having a faster product yields its own benefits as well.

(Via highscalability.com)

The WhatsApp Architecture

WhatsApp stats: What has hundreds of nodes, thousands of cores, hundreds of terabytes of RAM, and hopes to serve the billions of smartphones that will soon be a reality around the globe? The Erlang/FreeBSD-based server infrastructure at WhatsApp. We’ve faced many challenges in meeting the ever-growing demand for our messaging services, but as we continue to push the envelope on size (>8000 cores) and speed (>70M Erlang messages per second) of our serving system.

A warning here, we don’t know a lot about the WhatsApp over all architecture. Just bits and pieces gathered from various sources. Rick Reed’s main talk is about the optimization process used to get to 2 million connections a server while using Erlang, which is interesting, but it’s not a complete architecture talk.

Stats

These stats are generally for the current system, not the system we have a talk on. The talk on the current system will include more on hacks for data storage, messaging, meta-clustering, and more BEAM/OTP patches.

  • 450 million active users, and reached that number faster than any other company in history.

  • 32 engineers, one developer supports 14 million active users

  • 50 billion messages every day across seven platforms (inbound + outbound)

  • 1+ million people sign up every day

  • $0 invested in advertising

  • $60 million investment from Sequoia Capital; $3.4 billion is the amount Sequoia will make

  • 35% is how much of Facebook’s cash is being used for the deal
  • Hundreds of nodes

  • >8000 cores

  • Hundreds of terabytes of RAM

  • >70M Erlang messages per second

  • In 2011 WhatsApp achieved 1 million established tcp sessions on a single machine with memory and cpu to spare. In 2012 that was pushed to over 2 million tcp connections. In 2013 WhatsApp tweeted out: On Dec 31st we had a new record day: 7B msgs inbound, 11B msgs outbound = 18 billion total messages processed in one day! Happy 2013!!!

Platform

Backend

  • Erlang

  • FreeBSD

  • Yaws, lighttpd

  • PHP

  • Custom patches to BEAM (BEAM is like Java’s JVM, but for Erlang)

  • Custom XMPP

  • Hosting may be in Softlayer

Frontend

  • Seven client platforms: iPhone, Android, Blackberry, Nokia Symbian S60, Nokia S40, Windows Phone, ?

  • SQLite

Hardware

  • Standard user facing server:

    • Dual Westmere Hex-core (24 logical CPUs);

    • 100GB RAM, SSD;

    • Dual NIC (public user-facing network, private back-end/distribution);

Product

  • Focus is on messaging. Connecting people all over the world, regardless of where they are in the world, without having to pay a lot of money. Founder Jan Koum remembers how difficult it was in 1992 to connect to family all over the world.

  • Privacy. Shaped by Jan Koum’s experiences growing up in the Ukraine, where nothing was private. Messages are not stored on servers; chat history is not stored; goal is to know as little about users as possible; your name and your gender are not known; chat history is only on your phone.

General

  • WhatsApp server is almost completely implemented in Erlang.

    • Server systems that do the backend message routing are done in Erlang.

    • Great achievement is that the number of active users is managed with a really small server footprint. Team consensus is that it is largely because of Erlang.

    • Interesting to note Facebook Chat was written in Erlang in 2009, but they went away from it because it was hard to find qualified programmers.

  • WhatsApp server has started from ejabberd

    • Ejabberd is a famous open source Jabber server written in Erlang.

    • Originally chosen because its open, had great reviews by developers, ease of start and the promise of Erlang’s long term suitability for large communication system.

    • The next few years were spent re-writing and modifying quite a few parts of ejabberd, including switching from XMPP to internally developed protocol, restructuring the code base and redesigning some core components, and making lots of important modifications to Erlang VM to optimize server performance.

  • To handle 50 billion messages a day the focus is on making a reliable system that works. Monetization is something to look at later, it’s far far down the road.

  • A primary gauge of system health is message queue length. The message queue length of all the processes on a node is constantly monitored and an alert is sent out if they accumulate backlog beyond a preset threshold. If one or more processes falls behind that is alerted on, which gives a pointer to the next bottleneck to attack.

  • Multimedia messages are sent by uploading the image, audio or video to be sent to an HTTP server and then sending a link to the content along with its Base64 encoded thumbnail (if applicable).

  • Some code is usually pushed every day. Often, it’s multiple times a day, though in general peak traffic times are avoided. Erlang helps being aggressive in getting fixes and features into production. Hot-loading means updates can be pushed without restarts or traffic shifting. Mistakes can usually be undone very quickly, again by hot-loading. Systems tend to be much more loosely-coupled which makes it very easy to roll changes out incrementally.

  • What protocol is used in Whatsapp app? SSL socket to the WhatsApp server pools. All messages are queued on the server until the client reconnects to retrieve the messages. The successful retrieval of a message is sent back to the whatsapp server which forwards this status back to the original sender (which will see that as a “checkmark” icon next to the message). Messages are wiped from the server memory as soon as the client has accepted the message

  • How does the registration process work internally in Whatsapp? WhatsApp used to create a username/password based on the phone IMEI number. This was changed recently. WhatsApp now uses a general request from the app to send a unique 5 digit PIN. WhatsApp will then send a SMS to the indicated phone number (this means the WhatsApp client no longer needs to run on the same phone). Based on the pin number the app then request a unique key from WhatsApp. This key is used as “password” for all future calls. (this “permanent” key is stored on the device). This also means that registering a new device will invalidate the key on the old device.

  • Google’s push service is used on Android.

  • More users on Android. Android is more enjoyable to work with. Developers are able to prototype a feature and push it out to hundreds of millions of users overnight, if there’s an issue it can be fixed quickly. iOS, not so much.

The Quest For 2+ Million Connections Per Server

  • Experienced lots of user growth, which is a good problem to have, but it also means having to spend money buying more hardware and increased operational complexity of managing all those machines.

  • Need to plan for bumps in traffic. Examples are soccer games and earthquakes in Spain or Mexico. These happen near peak traffic loads, so there needs to be enough spare capacity to handle peaks + bumps. A recent soccer match generated a 35% spike in outbound message rate right at the daily peak.

  • Initial server loading was 200 simultaneous connections per server.

    • Extrapolated out would mean a lot of servers with the hoped for growth pattern.

    • Servers were brittle in the face of burst loads. Network glitches and other problems would occur. Needed to decouple components so things weren’t so brittle at high capacity.

    • Goal was a million connections per server. An ambitious goal given at the time they were running at 200K connections. Running servers with headroom to allow for world events, hardware failures, and other types of glitches would require enough resilience to handle the high usage levels and failures.

Tools And Techniques Used To Increase Scalability

  • Wrote system activity reporter tool (wsar):

    • Record system stats across the system, including OS stats, hardware stats, BEAM stats. It was build so it was easy to plugin metrics from other systems, like virtual memory. Track CPU utilization, overall utilization, user time, system time, interrupt time, context switches, system calls, traps, packets sent/received, total count of messages in queues across all processes, busy port events, traffic rate, bytes in/out, scheduling stats, garbage collection stats, words collected, etc.

    • Initially ran once a minute. As the systems were driven harder one second polling resolution was required because events that happened in the space if a minute were invisible. Really fine grained stats to see how everything is performing.

  • Hardware performance counters in CPU (pmcstat):

    • See where the CPU is at as a percentage of time. Can tell how much time is being spent executing the emulator loop. In their case it is 16% which tells them that only 16% is executing emulated code so even if you were able to remove all the execution time of all the Erlang code it would only save 16% out of the total runtime. This implies you should focus in other areas to improve efficiency of the system.

  • dtrace, kernel lock-counting, fprof

    • Dtrace was mostly for debugging, not performance.

    • Patched BEAM on FreeBSD to include CPU time stamp.

    • Wrote scripts to create an aggregated view of across all processes to see where routines are spending all the  time.

    • Biggest win was compiling the emulator with lock counting turned on.

  • Some Issues:

    • Earlier on saw more time spent in the garbage collections routines, that was brought down.

    • Saw some issues with the networking stack that was tuned away.

    • Most issues were with lock contention in the emulator which shows strongly in the output of the lock counting.

  • Measurement:

    • Synthetic workloads, which means generating traffic from your own test scripts, is of little value for tuning user facing systems at extreme scale.

      • Worked well for simple interfaces like a user table, generating inserts and reads as quickly as possible.

      • If supporting a million connections on a server it would take 30 hosts to open enough IP ports to generate enough connections to test just one server. For two million servers that would take 60 hosts. Just difficult to generate that kind of scale.

      • The type of traffic that is seen during production is difficult to generate. Can guess at a normal workload, but in actuality see networking events, world events, since multi-platform see varying behaviour between clients, and varying by country.

    • Tee’d workload:

      • Take normal production traffic and pipe it off to a separate system.

      • Very useful for systems for which side effects could be constrained. Don’t want to tee traffic and do things that would affect the permanent state of a user or result in multiple messages going to users.

      • Erlang supports hot loading, so could be under a full production load, have an idea, compile, load the change as the program is running and instantly see if that change is better or worse.

      • Added knobs to change production load dynamically and see how it would affect performance. Would be tailing the sar output looking at things like CPU usage, VM utilization, listen queue overflows, and turn knobs to see how the system reacted.

    • True production loads:

      • Ultimate test. Doing both input work and output work.

      • Put server in DNS a couple of times so it would get double or triple the normal traffic. Creates issues with TTLs because clients don’t respect DNS TTLs and there’s a delay, so can’t quickly react to getting more traffic than can be dealt with.

      • IPFW. Forward traffic from one server to another so could give a host exactly the number of desired client connections. A bug caused a kernel panic so that didn’t work very well.

  • Results:

    • Started at 200K simultaneous connections per server.

    • First bottleneck showed up at 425K. System ran into a lot of contention. Work stopped. Instrumented the scheduler to measure how much useful work is being done, or sleeping, or spinning. Under load it started to hit sleeping locks so 35-45% CPU was being used across the system but the schedulers are at 95% utilization.

    • The first round of fixes got to over a million connections.

      • VM usage is at 76%. CPU is at 73%. BEAM emulator running at 45% utilization, which matches closely to user percentage, which is good because the emulator runs as user.

      • Ordinarily CPU utilization isn’t a good measure of how busy a system is because the scheduler uses CPU.

    • A month later tackling bottlenecks 2 million connections per server was achieved.

      • BEAM utilization at 80%, close to where FreeBSD might start paging. CPU is about the same, with double the connections. Scheduler is hitting contention, but running pretty well.

    • Seemed like a good place to stop so started profiling Erlang code.

      • Originally had two Erlang processes per connection. Cut that to one.

      • Did some things with timers.

    • Peaked at 2.8M connections per server

      • 571k pkts/sec, >200k dist msgs/sec

      • Made some memory optimizations so VM load was down to 70%.

    • Tried 3 million connections, but failed.

      • See long message queues when the system is in trouble. Either a single message queue or a sum of message queues.

      • Added to BEAM instrumentation on message queue stats per process. How many messages are being sent/received, how fast.

      • Sampling every 10 seconds, could see a process had 600K messages in its message queue with a dequeue rate of 40K with a delay of 15 seconds. Projected drain time was 41 seconds.

  • Findings:

    • Erlang + BEAM + their fixes – has awesome SMP scalability. Nearly linear scalability. Remarkable. On a 24-way box can run the system with 85% CPU utilization and it’s keeping up running a production load. It can run like this all day.

      • Testament to Erlang’s program model.

      • The longer a server has been up it will accumulate long running connections that are mostly idle so it can handle more connections because these connections are not as busy per connection.

    • Contention was biggest issue.

      • Some fixes were in their Erlang code to reduce BEAM’s contention issues.

      • Some patched to BEAM.

      • Partitioning workload so work didn’t have to cross processors a lot.

      • Time-of-day lock. Every time a message is delivered from a port it looks to update the time-of-day which is a single lock across all schedulers which means all CPUs are hitting one lock.

      • Optimized use of timer wheels. Removed bif timer

      • Check IO time table grows arithmetically. Created VM thrashing has the hash table would be reallocated at various points. Improved to use geometric allocation of the table.

      • Added write file that takes a port that you already have open to reduce port contention.

      • Mseg allocation is single point of contention across all allocators. Make per scheduler.

      • Lots of port transactions when accepting a connection. Set option to reduce expensive port interactions.

      • When message queue backlogs became large garbage collection would destabilize the system. So pause GC until the queues shrunk.

    • Avoiding some common things that come at a price.

      • Backported a TSE time counter from FreeBSD 9 to 8. It’s a cheaper to read timer. Fast to get time of day, less expensive than going to a chip.

      • Backported igp network driver from FreeBSD 9 because having issue with multiple queue on NICs locking up.

      • Increase number of files and sockets.

      • Pmcstat showed a lot of time was spent looking up PCBs in the network stack. So bumped up the size of the hash table to make lookups faster.

    • BEAM Patches

      • Previously mentioned instrumentation patches. Instrument scheduler to get utilization information, statistics for message queues, number of sleeps, send rates, message counts, etc. Can be done in Erlang code with procinfo, but with a million connections it’s very slow.

      • Stats collection is very efficient to gather so they can be run in production.

      • Stats kept at 3 different decay intervals: 1, 10, 100 second intervals. Allows seeing issues over time.

      • Make lock counting work for larger async thread counts.

      • Added debug options to debug lock counters.

    • Tuning

      • Set the scheduler wake up threshold to low because schedulers would go to sleep and would never wake up.

      • Prefer mseg allocators over malloc.

      • Have an allocator per instance per scheduler.

      • Configure carrier sizes start out big and get bigger. Causes FreeBSD to use super pages. Reduced TLB thrash rate and improves throughput for the same CPU.

      • Run BEAM at real-time priority so that other things like cron jobs don’t interrupt schedule. Prevents glitches that would cause backlogs of important user traffic.

      • Patch to dial down spin counts so the scheduler wouldn’t spin.

    • Mnesia

      • Prefer os:timestamp to erlang:now.

      • Using no transactions, but with remote replication ran into a backlog. Parallelized replication for each table to increase throughput.

    • There are actually lots more changes that were made.

Lessons

  • Optimization is dark grungy work suitable only for trolls and engineers. When Rick is going through all the changes that he made to get to 2 million connections a server it was mind numbing. Notice the immense amount of work that went into writing tools, running tests, backporting code, adding gobs of instrumentation to nearly every level of the stack, tuning the system, looking at traces, mucking with very low level details and just trying to understand everything. That’s what it takes to remove the bottlenecks in order to increase performance and scalability to extreme levels.

  • Get the data you needWrite tools. Patch tools. Add knobs. Ken was relentless in extending the system to get the data they needed, constantly writing tools and scripts to the data they needed to manage and optimize the system. Do whatever it takes.

  • Measure. Remove Bottlenecks. Test. Repeat. That’s how you do it.

  • Erlang rocks! Erlang continues to prove its capability as a versatile, reliable, high-performance platform. Though personally all the tuning and patching that was required casts some doubt on this claim.

  • Crack the virality code and profit. Virality is an allusive quality, but as WhatsApp shows, if you do figure out, man, it’s worth a lot of money.

  • Value and employee count are now officially divorced. There are a lot of force-multipliers out in the world today. An advanced global telecom infrastructure makes apps like WhatsApp possible. If WhatsApp had to make the network or a phone etc it would never happen. Powerful cheap hardware and Open Source software availability is of course another multiplier. As is being in the right place at the right time with the right product in front of the right buyer.

  • There’s something to this brutal focus on the user idea. WhatsApp is focussed on being a simple messaging app, not being a gaming network, or an advertising network, or a disappearing photos network. That worked for them. It guided their no ads stance, their ability to keep the app simple while adding features, and the overall no brainer it just works philosohpy on any phone.

  • Limits in the cause of simplicity are OK. Your identity is tied to the phone number, so if you change your phone number your identity is gone. This is very un-computer like. But it does make the entire system much simpler in design.

  • Age ain’t no thing. If it was age discrimination that prevented WhatsApp co-founder Brian Acton from getting a job at both Twitter and Facebook in 2009, then shame, shame, shame.

  • Start simply and then customize. When chat was launched initially the server side was based on ejabberd. It’s since been completely rewritten, but that was the initial step in the Erlang direction. The experience with the scalability, reliability, and operability of Erlang in that initial use case led to broader and broader use.

  • Keep server count low. Constantly work to keep server counts as low as possible while leaving enough headroom for events that create short-term spikes in usage. Analyze and optimize until the point of diminishing returns is hit on those efforts and then deploy more hardware.

  • Purposely overprovision hardware. This ensures that users have uninterrupted service during their festivities and employees are able to enjoy the holidays without spending the whole time fixing overload issues.

  • Growth stalls when you charge money. Growth was super fast when WhatsApp was free, 10,000 downloads a day in the early days. Then when switching over to paid that declined to 1,000 a day. At the end of the year, after adding picture messaging, they settled on charging a one-time download fee, later modified to an annual payment.

  • Inspiration comes from the strangest places. Experience with forgetting the username and passwords on Skype accounts drove the passion for making the app “just work.”

(via HighScalability.com)

An analysis of Facebook photo caching

Every day people upload more than 350 million photos to Facebook as of December 2012 and view many more in their News Feeds and on their friends’ Timelines. Facebook stores these photos on Haystack machines that are optimized to store photos. But there is also a deep and distributed photo-serving stack with many layers of caches that delivers photos to people so they can view them.

We recently published an academic study of this stack in SOSP. In this post, we describe the stack and then cover four of our many findings:

    1. How effective each layer of the stack is.
    2. How the popularity distribution of photos changes across layers.
    3. The effect of increasing cache sizes.
    4. The effect of more advanced caching algorithms.

Facebook’s photo-serving stack

The photo-serving stack has four layers: browser caches, edge caches, origin cache, and Haystack backend. The first layer of the stack is the browser cache on peoples’ machines. If someone has recently seen or downloaded a photo, that photo will likely be in their browser cache and they can retrieve it locally. Otherwise, their browser will send an HTTPS request out to the Internet.

That request will be routed either to our CDN partners or to one of Facebook’s many edge caches, which are located at Internet points of presence (PoPs). (In the paper and this blog post, we focus on what happens in the Facebook-controlled stack.) The particular edge cache that a request is routed to is encoded in the request’s URL. If the requested photo is present at the edge cache, then it returns the photo to the user’s browser, which will store it in its local cache. If the photo is not present at the edge cache, it is requested from the origin cache.

The origin cache is a single cache that is distributed across multiple data centers (like PrinevilleForest City, and Lulea). Requests from the edge caches are routed to hosts in the origin cache based on the requested photo’s ID. This means that for a single picture, all requests to the origin cache from all edge caches will be directed to the same server. If the requested photo is present in the origin cache, it is returned to the person via the edge cache, which now will store the photo. If the requested photo is not present, it is fetched from the Haystack backend.

The Haystack backend stores all photos, so all requests can be satisfied at this layer. Requests from the origin cache are typically filled by Haystack machines in the same data center. If for some reason the local Haystack machine is unavailable, the origin cache will request the photo from a Haystack machine in another data center. In either case, the photo flows back up the caching stack being stored at each layer as it goes: origin cache, edge cache, and then browser cache.

Trace collection

In our study, we collected a month-long trace of requests from non-mobile users for a deterministic subset of photos that was served entirely by the Facebook-controlled stack. The trace captures hits and misses at all levels of the stack, including client browser caches. All analysis and simulation is based on this trace.

From our trace we determined the hit ratio at each of the layers and the ratio of total requests that travels past each layer. Each layer significantly reduces the volume of traffic so that the Haystack backend only needs to serve 9.9% of requested photos.

This is a significant reduction in requests, but we can do even better.

Shifting popularity distributions

It is well known that the popularity of objects on the web tends to follow a power law distribution. Our study confirmed this for the Facebook photo workload and showed how this distribution shifts as we move down layers in the stack. We counted the number of requests for each photo at every layer, sorted them by popularity, and then graphed them on a log-log scale. The power law relationship shows up as linear on this scale.

As we move down the stack, the alpha parameter for the power law distribution decreases and flattens out. The distribution also changes to one that more closely resembles a stretched exponential distribution.

Serving popular photos

Caches derive most of their utility from serving the most popular content many times. To quantify to what degree this was true for the Facebook stack, we split photos up into popularity groups. Group “high” has the 1,000 most popular photos, group “medium” the next 9,000 most popular photos, group “low” the next 90,000 most popular photos, and group “lowest” the least popular 2.5 million photos in the subset of photos we traced. Each of these groups represents about 25% of user requests. This graph shows the percent of requests served by each layer for these four groups.

As expected, the browser and edge caches are very effective for the most popular photos in our trace and progressively less effective for less popular groups. The origin cache’s largest effect is seen for popularity group low, which also agrees with our intuition. More popular photos will be effectively cached earlier in the stack at the browser and edge layers. Less popular photos are requested too infrequently to be effectively cached.

Larger caches improve hit ratios

Unless your entire workload fits in the current cache, a larger cache will improve hit ratios — but by how much? To find out we took our trace to each layer and replayed it using a variety of different cache sizes. We share the results for one of the edge caches and the origin cache here.

Size X in the graph is current size of that edge cache. Doubling the size of the cache would significantly increase the hit ratio, from 59% to 65%. Tripling the size of the cache would further increase hit ratio to 68.5%.

The infinite line in the graphs shows the performance of a cache of infinite size, i.e., one that can hold the entire workload. It is an upper bound on the possible hit ratio and it shows us there is still much room for improvement.

The origin cache shows even more room for improvement from larger caches. Doubling the size of the origin cache improves hit ratio from 33% to 42.5%. Tripling increases that to 48.5%. The results for a cache of infinite size again demonstrate that there is still much room for improvement.

Advanced caching algorithms improve hit ratios

Larger caches are not the only way to improve hit ratios — being smarter about what items you keep in the cache can also have an effect. We used our trace to test the effects of using a variety of different caching algorithms. We published results for the following algorithms:

    • FIFO: A first-in-first-out queue is used for cache eviction. This is the algorithm Facebook currently uses.
    • LRU: A priority queue ordered by last-access time is used for cache eviction.
    • LFU: A priority queue ordered first by number of hits and then by last-access time is used for cache eviction.
    • S4LRU – Quadruple-segmented LRU. An intermediate algorithm between LRU and LFU (see paper for details).
    • Clairvoyant: A priority queue ordered by next-access time is used for cache eviction. (Requires knowledge of the future and is impossible to implement in practice, but gives a more accurate upper bound on the performance of an ideal caching algorithm.)

First, how do the different algorithms stack up at an edge cache?

While both LRU and LFU give slightly higher hit ratios than the FIFO caching policy, the S4LRU caching algorithm gives much higher hit ratios. Based on the data from our trace, at the current size the S4LRU algorithm would increase the edge cache hit ratio from 59.2% to 67.7%. At twice the current size, the trace data predicts it would increase cache hit ratios to 72.0%.

We see even more pronounced effects at the origin cache.

Based on our trace data, the S4LRU algorithm again gives the best results. (We tested quite a few different caching algorithms that are less common than LRU and LFU but did not publish results for them because S4LRU outperformed all of them in our tests.) It increases hit ratio from 33.0% to 46.9% based on our trace data. At twice the current size, our data predicts that S4LRU can provide a 54.4% hit ratio, a 21.4% improvement over the current FIFO cache.

More information

Qi Huang gave a talk on this work at SOSP. The slides and a video of the talk are on the SOSP conference page. There are also many more details in our paper.

(via Facebook.com)

 

NYTimes Architecture: No Head, No Master, No Single Point Of Failure

Michael Laing, a Systems Architect at NYTimes, gave this great decription of their use of RabbitMQ and their overall architecture on the RabbitMQ mailing list. The closing sentiment marks this as definitely an architecture to learn from:

Although it may seem complex, nytimes architecture has simple components and is mostly principles and plumbing. The key point to grasp is that there is no head, no master, no single point of failure. As I write this I can see components failing (not RabbitMQ), and we are fixing them so they are more reliable. But the system doesn’t fail, users can connect, and messages are delivered, regardless – all within design parameters.

Since it’s short, to the point, and I couldn’t say it better, I’ll just reproduce two of the email list posts here:

Just a quick note and thank you to the RabbitMQ team for a great product.

Our premier online offering http://www.nytimes.com has a new look and new underpinnings, now including a messaging architecture implemented using RabbitMQ.

This architecture – nytimes architecture – has dozens of RabbitMQ instances spread across 6 AWS zones in Oregon and Dublin. The instances are organized into “wholesale” and “retail” layers. Connection to clients is via websockets/sockjs.

Upon launch today, the system autoscaled to ~500,000 users. Connection times remained flat at ~200ms.

nytimes architecture provides subscription services for breaking news, video feeds, etc. and will add more event based services. It also supports individual messaging related to subscription status for registered users.

This system would not have been possible without RabbitMQ. It was the one component, used everywhere, that never faltered or failed.

We are using: a single Amazon Linux AMI, RabbitMQ, Cassandra 2, python 2We use pika with tornado and libev for the nytimes architecture wholesale and retail pieces; our internal clients use Java and PHP.

We use shovels – lots of shovels – to interconnect.

In production we have a RabbitMQ client 3-cluster and a core 3-cluster in each region on c1-xlarges. A proxy cluster of c1-mediums in Virginia connects clients to the client clusters. All services are parallelized so we can add more cores and clients.

The retail layer autoscales and use c1-mediums with a single rabbit shovel-connected to one of the core rabbits. Each python websocket/sockjs gateway supports up to 100K clients.

We autodeploy into subnets within Virtual Private Clouds in AWS. Clients are routed via least latency to the fastest healthy region.

Of the technical components, the gateway is the most complex. We will be moving it into open source in pieces and the first piece is likely to be the python websocket/sockjs libraries which, frankly, beat the crap out of most other stuff out there and fully conform with the relevant standards. It can be loosely thought of as a C co-process managed by python, and as such, may be possible to reuse in other languages/environments.

We have a 12-node Cassandra cluster across the 2 regions / 6 zones. It is used for persistence of messages and as cache. We do not use persistence in RabbitMQ. Our services are idempotent and important messages may be replicated multiple times creating intentional race conditions in which the fastest wins.

Although it may seem complex, nytimes architecture has simple components and is mostly principles and plumbing. The key point to grasp is that there is no head, no master, no single point of failure. As I write this I can see components failing (not RabbitMQ), and we are fixing them so they are more reliable. But the system doesn’t fail, users can connect, and messages are delivered, regardless – all within design parameters.

(Via HighScalability.com)

Scaling Mercurial at Facebook

With thousands of commits a week across hundreds of thousands of files, Facebook’s main source repository is enormous–many times larger than even the Linux kernel, which checked in at 17 million lines of code and 44,000 files in 2013. Given our size and complexity—and Facebook’s practice of shipping code twice a day–improving our source control is one way we help our engineers move fast.

Choosing a source control system

Two years ago, as we saw our repository continue to grow at a staggering rate, we sat down and extrapolated our growth forward a few years. Based on those projections, it appeared likely that our then-current technology, a Subversion server with a Git mirror, would become a productivity bottleneck very soon. We looked at the available options and found none that were both fast and easy to use at scale.

Our code base has grown organically and its internal dependencies are very complex. We could have spent a lot of time making it more modular in a way that would be friendly to a source control tool, but there are a number of benefits to using a single repository. Even at our current scale, we often make large changes throughout our code base, and having a single repository is useful for continuous modernization. Splitting it up would make large, atomic refactorings more difficult. On top of that, the idea that the scaling constraints of our source control system should dictate our code structure just doesn’t sit well with us.

We realized that we’d have to solve this ourselves. But instead of building a new system from scratch, we decided to take an existing one and make it scale. Our engineers were comfortable with Git and we preferred to stay with a familiar tool, so we took a long, hard look at improving it to work at scale. After much deliberation, we concluded that Git’s internals would be difficult to work with for an ambitious scaling project.

Instead, we chose to improve Mercurial. Mercurial is a distributed source control system similar to Git, with many equivalent features. Importantly, it’s written mostly in clean, modular Python (with some native code for hot paths), making it deeply extensible. Just as importantly, the Mercurial developer community is actively helping us address our scaling problems by reviewing our patches and keeping our scale in mind when designing new features.

When we first started working on Mercurial, we found that it was slower than Git in several notable areas. To narrow this performance gap, we’ve contributed over 500 patches to Mercurial over the last year and a half. These range from new graph algorithms to rewrites of tight loops in native code. These helped, but we also wanted to make more fundamental changes to address the problem of scale.

Speeding up file status operations

For a repository as large as ours, a major bottleneck is simply finding out what files have changed. Git examines every file and naturally becomes slower and slower as the number of files increases, while Perforce “cheats” by forcing users to tell it which files they are going to edit. The Git approach doesn’t scale, and the Perforce approach isn’t friendly.

We solved this by monitoring the file system for changes. This has been tried before, even for Mercurial, but making it work reliably is surprisingly challenging. We decided to query our build system’s file monitor, Watchman, to see which files have changed. Mercurial’s design made integrating with Watchman straightforward, but we expected Watchman to have bugs, so we developed a strategy to address them safely.

Through heavy stress testing and internal dogfooding, we identified and fixed many of the issues and race conditions that are common in file system monitoring. In particular, we ran a beta test on all our engineers’ machines, comparing Watchman’s answers for real user queries with the actual file system results and logging any differences. After a couple months of monitoring and fixing discrepancies in usage, we got the rate low enough that we were comfortable enabling Watchman by default for our engineers.

For our repository, enabling Watchman integration has made Mercurial’s status command more than 5x faster than Git’s status command. Other commands that look for changed files–like diff, update, and commit—also became faster.

Working with large histories

The rate of commits and the sheer size of our history also pose challenges. We have thousands of commits being made every day, and as the repository gets larger, it becomes increasingly painful to clone and pull all of it. Centralized source control systems like Subversion avoid this by only checking out a single commit, leaving all of the history on the server. This saves space on the client but leaves you unable to work if the server goes down. More recent distributed source control systems, like Git and Mercurial, copy all of the history to the client which takes more time and space, but allows you to browse and commit entirely locally. We wanted a happy medium between the speed and space of a centralized system and the robustness and flexibility of a distributed one.

Improving clone and pull

Normally when you run a pull, Mercurial figures out what has changed on the server since the last pull and downloads any new commit metadata and file contents. With tens of thousands of files changing every day, downloading all of this history to the client every day is slow. To solve this problem we created the remotefilelog extension for Mercurial. This extension changes the clone and pull commands to download only the commit metadata, while omitting all file changes that account for the bulk of the download. When a user performs an operation that needs the contents of files (such as checkout), we download the file contents on demand using Facebook’s existing memcache infrastructure. This allows clone and pull to be fast no matter how much history has changed, while only adding a slight overhead to checkout.

But what if the central Mercurial server goes down? A big benefit of distributed source control is the ability to work without interacting with the server. The remotefilelog extension intelligently caches the file revisions needed for your local commits so you can checkout, rebase, and commit to any of your existing bookmarks without needing to access the server. Since we still download all of the commit metadata, operations that don’t require file contents (such as log) are completely local as well. Lastly, we use Facebook’s memcache infrastructure as a caching layer in front of the central Mercurial server, so that even if the central repository goes down, memcache will continue to serve many of the file content requests.

This type of setup is of course not for everyone—it’s optimized for work environments that have a reliable Mercurial server and that are always connected to a fast, low-latency network. For work environments that don’t have a fast, reliable Internet connection, this extension could result in Mercurial commands being slow and failing unexpectedly when the server is congested or unreachable.

Clone and pull performance gains

Enabling the remotefilelog extension for employees at Facebook has made Mercurial clones and pulls 10x faster, bringing them down from minutes to seconds. In addition, because of the way remotefilelog stores its local data on disk, large rebases are 2x faster. When compared with our previous Git infrastructure, the numbers remain impressive. Achieving these types of performance gains through extensions is one of the big reasons we chose Mercurial.

Finally, the remotefilelog extension allowed us to shift most of the request traffic to memcache, which reduced the Mercurial server’s network load by more than 10x. This will make it easier for our Mercurial infrastructure to keep scaling to meet growing demand.

How it works

Mercurial has several nice abstractions that made this extension possible. The most notable is the filelog class. The filelog is a data structure for representing every revision of a particular file. Each version of a file is identified by a unique hash. Given a hash, the filelog can reconstruct the requested version of a file. The remotefilelog extension replaces the filelog with an alternative implementation that has the same interface. It accepts a hash, but instead of reconstructing the version of the file from local data, it fetches that version from either a local cache or the remote server. When we need to request a large number of files from the server, we do it in large batches to avoid the overhead of many requests.

Open Source

Together, the hgwatchman and remotefilelog extensions have improved source control performance for our developers, allowing them to spend more time getting stuff done instead of waiting for their tools. If you have a large deployment of a distributed revision control system, we encourage you to take a look at them. They’ve made a difference for our developers, and we hope they will prove valuable to yours, too.

(Via: Facebook Engineering Blog)

 

Rocksdb Architecture – An open source Key-Value DB builds on LevelDB

1. Introduction

The rocksdb project started at Facebook as an experiment to develop an efficient database software that can realize the full potential of storing data on fast storage ( especially flash storage) for server workloads. It is a C++ library and can be used to store keys-and-values where keys and values are arbitrary size byte streams. It has support for atomic reads and atomic writes. It has highly flexible configurable settings that can be tuned to run on a variety of production environments: it can be configured to run on data on pure memory, flash, hard disks or on HDFS. It has support for various compression algorithms and good tools for production support and debugging.

Rocksdb borrows significant code from the open source leveldb project as well as significant ideas from Apache HBase. The initial code was forked from open source leveldb 1.5. It also builds upon code and ideas that were built at Facebook before the birth of rocksdb.

2. Assumptions and Goals

Performance:

The primary design point for rocksdb is that it should be performant for fast storage and for server workloads. It should be able to exploit the full potential of high read/write rates offered by flash or RAM-memory subsystems. It should support efficient point lookups as well as range scans. It should be configurable to support high random-read workloads, high update workloads or a combination of both. Its architecture should support easy tuning of Read Amplification, Write Amplification and Space Amplification.

Production Support:

Rocksdb should be designed in such a way that it has built-in support for tools and utilities that help deployment and debugging in production environments. Most major parameters should be fully tunable so that it can be used by different applications on different hardware.

Backward Compatibility:

Newer versions of this software should be backward compatible, so that existing applications do not need to change when upgrading to newer releases of rocksdb.

3. High Level Architecture

Rocksdb is a key-value store where keys and values are arbitrary byte streams. rocksdb organizes all data in sorted order and the common operations are Get(key)Put(key)Delete(key) and Scan(key).

The three basic constructs of rocksdb are memtablesstfile and logfile. The memtable is an in-memory data structure, new writes are inserted into the memtable and are optionally written to the logfile. The logfile is a sequentially written file on storage. When the memtablefills up, it is flushed to a sstfile on storage and the corresponding logfile can be safely deleted. The data in an sstfile is kept sorted to facilitate easy lookup of keys.

The format of an sstfile is described in more details here.

4. Features

Gets, Iterators and Snapshots

Rocksdb is a data store that stores arbitrary keys and values. The keys and values are treated as pure byte streams. There is no limit to the size of a key or a value. There is a Get api that allows an application to fetch a single key-value from the database. A MultiGet api allows an application to retrieve a bunch of keys from the database, all the keys-values returned via a MultiGet call are consistent with one-another.

All data in the database is logically arranged in sorted order. An application can specify a Key-Comparison method that specifies a total ordering of keys. An Iterator api allows an application to do a RangeScan on the database. The Iterator can seek to a specified key and then the application can start scanning one key at a time from that point. The Iterator api can also be used to do a reverse iteration of the keys in the database. A consistent-point-in-time view of the database is created when the Iterator is created. Thus, all keys returned via the Iterator are from a consistent view of the database.

Snapshot api allows an application to create a point-in-time view of a database. The Get and Iterator apis can be used to read data from a specified snapshot. In some sense, a Snapshot and an Iterator both provide a point-in-time view of the database. But their implementations are different. Short lived scans are best done via an iterator while long-running scans are better used via a snapshot. An iterator keeps a reference count on all underlying files that correspond to that point-in-time-view of the database, these files will not get deleted till the Iterator is released. On the other hand, a snapshot does not prevent files deletions; instead the compaction process understands the existence of snapshots and promises to never delete a key that is visible in any existing snapshot.

Snapshots are not persisted across database restarts: a reload of the rocksdb library (via a server restart) would release all pre-existing snapshots.

Prefix Iterators

Most LSM engines cannot support an efficient RangeScan api because it needs to look into every data file. But most applications do not do pure-random scans of key ranges in the database; instead applications typically scan within a key-prefix. Rocksdb uses this to its advantage. Applications can configure a prefix_extractor to specify a key-prefix. Rocksdb uses this to store blooms for every key-prefix. An iterator that specifies a prefix (via ReadOptions) will use these bloom bits to avoid looking into data files that do not contain keys with the specified key-prefix.

Updates

There is a Put api that can insert a single key-value to the database. A Write api allows multiple keys-values to be inserted atomically into the database. The database guarantees that the keys-values in a single Write call will either all be inserted into the database or none of them will be inserted into the database.

Persistency

Rocksdb has a transaction log. All Puts are stored in a in-memory buffer called the memtable as well as optionally inserted into a transaction log. Each Put has a set of flags specified via WriteOptions. The WriteOptions can specify whether the Put should be inserted into the transaction log. Also, the WriteOptions can specify whether a sync call is issued to the transaction log before a Put is declared to be committed.

Internally, rocksdb uses a batch-commit mechanism to batch transactions into the transaction log so that it can potentially commit multiple transactions using a single sync call.

Fault Tolerance

Rocksdb uses a checksum to detect corruptions in the storage. These checksums are for each block; a block is typically anywhere between 4K to 128K in size. A block, once written to storage, is never modified. Rocksdb will dynamically detect hardware support for checksum computations and avail that support wherever available.

Multi-Threaded Compactions

Compactions are needed to remove multiple copies of the same key that can occur if an application overwrites an existing key. Compactions also process deletions of keys. Compactions can occur in multiple threads if configured appropriately. The overall write throughput of an LSM database is directly dependent on the speed at which compactions can occur, especially when the data is stored in fast storage like SSD or RAM. Rocksdb can be configured to issue compaction requests from multiple threads concurrently. It is observed that sustained write rates can increase upto a factor of 10 with multi-threaded compaction when the database is on SSDs compared to single-threaded compactions.

The entire database is stored in a set of sstfiles. When a memtable is full, its content is written out to a file in Level-0 (L0). Rocksdb removes duplicates and overwritten keys in the memtable when it is flushed to a file in L0. Periodically, some files are read in and merged to form larger files. This is called Compaction.

Rocksdb supports two different styles of compaction. The Universal Style Compaction stores all files in L0 and all files are arranged in time order. A compaction picks up a few files that are chronologically adjacent to one another and merges them back into a new file L0. All files can have overlapping keys.

The Level Style Compaction stores data in multiple levels in the database. The more recent data is in L0 and the earliest data is in Lmax. Files in L0 can have overlapping keys but files in other layers do not have overlapping keys. A compaction process picks one file in Ln and all its overlapping files in Ln+1 and replaces them with new files in Ln+1. The Universal Style Compaction typically results in lower write amplification but higher space amplification than Level Style Compaction.

There is a MANIFEST file in the database that records the state of the database. The Compaction process adds new files and deletes existing files from the database, and it makes these operations persistent by recording them in the MANIFEST file. Transactions to be recorded in the MANIFEST file uses a batch-commit algorithm to amortize the cost of repeated syncs to the MANIFEST file.

Avoiding Stalls

Background compaction threads are also used to flush memtable contents to a file on storage. If all background compaction threads are busy doing long-running compactions, then a sudden burst of writes can fill up the memtable(s) quickly, thus stalling new writes. This situation can be avoided by configuring rocksdb to keep a small set of threads explicitly reserved for the sole purpose of flushing memtableto storage.

Compaction Filter

There are times when an application would like to process keys at compaction time. For example, a database that has inherent support for Time-To-Live (ttl) can optionally remove keys that are expired. This can be done via a application defined Compaction Filter. If the application wants to continuously delete data older than a specific time, then it can use the compaction filter to drop records that have expired. The rocksdb Compaction Filter gives control to the application to modify the value of a key or to entirely drop a key as part of the compaction process. For example, an application can continuously run a data sanitizer as part of the compaction.

ReadOnly Mode

There are times when an application wants to open a database for reading only. It can open the database in ReadOnly mode, the database guarantees that the application won’t be able to modify anything in the database. This results in much higher read performance because oft-traversed code paths can avoid locks completely.

Database Debug Logs

The rocksdb database software writes detailed logs to a file named LOG*. These are mostly used for debugging and analyzing a running system. There is a configuration to allow rolling this LOG at a specified periodicity.

Data Compression

Rocksdb supports snappy, zlib and bzip2 compression. rocksdb can be configured to support different compression algorithms at data in different levels. Typically, 90% of data in the L-max level. A typical installation might configure no-compression for levels L0-L2, snappy compression for the mid levels and zlip compression for Lmax.

Transaction Logs

Rocksdb stores transactions into logfile to protect against system crashes. On restart, it re-processes all the transactions that were recorded in the logfile. The logfile can be configured to be stored in a directory different from the directory where the sstfiles are stored. This is necessary for those cases where you might want to store all data files in non-persistent fast storage, while at the same time, you can ensure no data loss by putting all transaction logs on slower but persistent storage.

Full Backups, Incremental Backups and Replication

Rocksdb has support for full backups and incremental backups. Rocksdb is an LSM database engine, so data files once created are never overwritten and this makes it easy to extract a list of file-names that correspond to a point-in-time snapshot of the database contents. The api DisableFileDeletions instructs rocksdb to not delete data files. Compactions will continue to occur, but files that are not needed by the database will not be deleted. An backup application can then invoke the api GetLiveFiles/GetSortedWalFiles to retrieve the list of live files in the database and copy them to a backup location. Once the backup is complete, the application can invoke EnableFileDeletions; the database is now free to reclaim all the files that are not needed any more.

Incremental Backups and Replication need to be able to find and tail all the recent changes to the database. The api GetUpdatesSinceallows an application to tail the rocksdb transaction log. It can continuously fetch transactions from the rocksdb transaction log and apply them to a remote replica or a remote backup.

A replication system typically wants to annotate each Put with some arbitrary metadata. This metadata may be used to detect loops in the replication pipeline. It can also be used to timestamp and sequence transactions. For this purpose, rocksdb supports a api calledPutLogData that an application can use to annotate each Put with metadata. This metadata is stored only in the transaction log and is not stored in the data files. The metadata inserted via PutLogData can be retrieved via the GetUpdatesSince api.

Rocksdb transaction logs are created in the database directory. When a log file is no longer needed, it is moved to the archive directory. The reason for the existence of the archive directory is because a replication stream that is falling behind might need to retrieve transactions from a log file that is way in the past. The api GetSortedWalFiles returns a list of all transaction log files.

Support for Multiple Embedded Databases in the same process

A common use-case for applications that use rocksdb is that they inherently partition their data set into multiple logical partitions or shards. This technique is helpful for application load balancing and fast recovery from faults. This means that a single server process should be able to operate multiple rocksdb databases simultaneously. This is done via an environment object named Env. Among other things, a thread pool is associated with an Env. If applications want to share a common thread pool (for background compactions) among multiple database instances, then it should use the same Env object for opening those databases.

Similarly, there is support for multiple database instances to share the same block cache.

Block Cache — Compressed and Uncompressed Data

Rocksdb uses a LRU cache for blocks to serve reads. The block cache is partitioned into two individual caches: the first one caches uncompressed blocks and the second one caches compressed blocks in RAM. If a compressed block cache is configured, then the database intelligently avoids caching data in the OS buffers.

Table cache

The Table Cache is a construct that caches open file descriptors. These file descriptors are for sstfiles. An application can specify the maximum size of the Table Cache.

External Compaction Algorithms

The performance of an LSM database has a significant dependency on the compaction algorithm and implementation. rocksdb has two supported compaction algorithms: LevelStyle and UniversalStyle. But we would like to enable the large community of developers to develop and experiment with other compaction policies. For this reason, rocksdb has appropriate hooks to switch off the inbuilt compaction algorithm and has other apis to allow an application to operate their own compaction algorithms. Options.disable_auto_compaction, if set, disables the inbuilt compaction algorithm. The GetLiveFilesMetaData api allows an external component to look at every data file in the database, decide on which data files to merge and compact, and the DeleteFile api allows it to delete data files that are deemed obsolete.

Non-Blocking Database Access

There are certain applications that are architected in such a way that they would like to retrieve data from the database only if that data retrieval call is non-blocking, i.e. the data retrieval call does not have to read in data from storage. Rocksdb caches a portion of the database in the block cache and these applications would like to retrieve the data only if it is found in this block cache. If this call does not find the data in the block cache then rocksdb returns an appropriate error code to the application. The application can then schedule a normal Get/Next operation understanding that fact that this data retrieval call could potentially block for IO from the storage (maybe in a different thread context).

Stackable DB

Rocksdb has a built-in wrapper-mechanism to layer additional functionality as a layer above the code database kernel. This functionality is encapsulated by an api called StackableDB. For example, the time-to-live functionality is implemented by a StackableDB and is not part of the core rocksdb api. This approach keeps the code modularized and clean.

Memtables:

Pluggable Memtables:

The default implementation of the memtable for rocksdb is a skiplist. The skiplist is an sorted set. The sorted set is a necessary construct when the workload interleaves writes with range-scans. But there are some applications that do not interleave their writes and scans and there are other applications that do not do range-scans altogether. For these applications, a sorted set might not provide the optimal performance. For this reason, rocksdb supports a pluggable api that allows an application to provide their own implementation of a memtable. There are three memtables that are part of the library: a skiplist memtable, a vector memtable and a prefix-hash memtable. A vector memtable is appropriate for bulk-loading data into the database. Every write inserts a new element at the end of the vector; when it is time to flush the memtable to storage the elements in the vector are sorted and written out to a file in L0. A prefix-hash memtable allows efficient processing of gets, puts and scans-within-a-key-prefix.

Memtable Pipelining

Rocksdb supports configuring an arbitrary number of memtables for a database. When a memtable is full it becomes an immutable memtable and a background thread starts flushing its contents to storage. Meanwhile, new writes continue to accumulate to a newly allocated memtable. If the newly allocated memtable is filled up to its limit, it too is converted to an immutable memtable and inserted into the flush pipeline. The background thread continues to flush all the pipelined immutable memtables to storage. This pipelining increases write throughput of rocksdb especially when it is operating on slow storage devices.

Memtable Compaction:

When a memtable is being flushed to storage, an inline-compaction process removes duplicate records from the output steam. Similarly, if an earlier put is hidden by a later delete, then the put is not written to the output file at all. This feature reduces the size of data on storage and write amplification greatly. This is an essential feature when rocksdb is used as a producer-consumer-queue, especially when the lifetime of an element in the queue is very short-lived.

Merge Operator

Rocksdb natively supports three types of records, a Put record, a Delete record and a Merge record. When a compaction process encounters a Merge record, it invokes an application-specified method called the Merge Operator. The Merge can combine multiple Put and Merge records into a single one. This is a powerful feature and allows applications that typically do read-modify-writes to completely avoid the reads. It allows an application to record the intent-of-the-operation as a Merge Record and the rocksdb compaction process lazily applies that intent to the original value. This feature is described in detail in Merge Operator

5. Tools

There are a number of interesting tools that are used to support a database in production. The sst_dump utility dumps all the keys-values in a sst file. The ldb tool can put, get, scan the contents of a database. ldb can also dump contents of the MANIFEST, it can also be used to change the number of configured levels of the database. It can be used to manually compact a database.

6. Tests

There are a bunch of unit tests that test specific features of the database. A make check command runs all unit tests. The unit tests trigger specific features of rocksdb and are not designed to test data correctness at scale. The db_stress test is used to validate data correctness at scale.

7. Performance

a. Setup

All of the benchmarks are run on the same machine. Here are the details of the test setup:

  • 12 CPUs, HT enabled -> 24 vCPUs reported, 2 sockets X 6 cores/socket with X5650 @ 2.67GHz
  • 2 FusionIO devices in SW RAID 0 that can do ~200k 4kb read/second at peak
  • Fusion IO devices were about 50% to 70% full for each of the benchmark runs
  • Machine has 144 GB of RAM
  • Operating System Linux 2.6.38.4
  • 1G rocksdb block cache
  • 1 Billion keys; each key is of size 10 bytes, each value is of size 800 bytes
  • total database size is 800GB

The following benchmark results compare the performance of rocksdb compared to leveldb.. This is an IO bound workload where the database is 800GB while the machine has only 144GB of RAM.

b. Bulk Load of keys in Random Order

Measure performance to load 1B keys into the database. The keys are inserted in random order. The database is empty at the beginning of this benchmark run and gradually fills up. No data is being read when the data load is in progress.

rocksdb:   103 minutes, 80 MB/sec (total data size 481 GB, 1 billion key-values)
leveldb:   many many days (in 20 hours it inserted only 200 million key-values) 

Rocksdb was configured to first load all the data in L0 with compactions switched off and using an unsorted vector memtable. Then it made a second pass over the data to merge-sort all the files in L0 into sorted files in L1. Leveldb is very slow because of high write amplification. Here are the command(s) for loading the data into rocksdb

echo "Bulk load database into L0...."
bpl=10485760;overlap=10;mcz=2;del=300000000;levels=2;ctrig=10000000; delay=10000000; stop=10000000; wbn=30; mbc=20; mb=1073741824;wbs=268435456; dds=1; sync=0; r=1000000000; t=1; vs=800; bs=65536; cs=1048576; of=500000; si=1000000; ./db_bench --benchmarks=fillrandom --disable_seek_compaction=1 --mmap_read=0 --statistics=1 --histogram=1 --num=$r --threads=$t --value_size=$vs --block_size=$bs --cache_size=$cs --bloom_bits=10 --cache_numshardbits=4 --open_files=$of --verify_checksum=1 --db=/data/mysql/leveldb/test --sync=$sync --disable_wal=1 --compression_type=zlib --stats_interval=$si --compression_ratio=50 --disable_data_sync=$dds --write_buffer_size=$wbs --target_file_size_base=$mb --max_write_buffer_number=$wbn --max_background_compactions=$mbc --level0_file_num_compaction_trigger=$ctrig --level0_slowdown_writes_trigger=$delay --level0_stop_writes_trigger=$stop --num_levels=$levels --delete_obsolete_files_period_micros=$del --min_level_to_compress=$mcz --max_grandparent_overlap_factor=$overlap --stats_per_interval=1 --max_bytes_for_level_base=$bpl --memtablerep=vector --use_existing_db=0 --disable_auto_compactions=1 --source_compaction_factor=10000000
echo "Running manual compaction to do a global sort map-reduce style...."
bpl=10485760;overlap=10;mcz=2;del=300000000;levels=2;ctrig=10000000; delay=10000000; stop=10000000; wbn=30; mbc=20; mb=1073741824;wbs=268435456; dds=1; sync=0; r=1000000000; t=1; vs=800; bs=65536; cs=1048576; of=500000; si=1000000; ./db_bench --benchmarks=compact --disable_seek_compaction=1 --mmap_read=0 --statistics=1 --histogram=1 --num=$r --threads=$t --value_size=$vs --block_size=$bs --cache_size=$cs --bloom_bits=10 --cache_numshardbits=4 --open_files=$of --verify_checksum=1 --db=/data/mysql/leveldb/test --sync=$sync --disable_wal=1 --compression_type=zlib --stats_interval=$si --compression_ratio=50 --disable_data_sync=$dds --write_buffer_size=$wbs --target_file_size_base=$mb --max_write_buffer_number=$wbn --max_background_compactions=$mbc --level0_file_num_compaction_trigger=$ctrig --level0_slowdown_writes_trigger=$delay --level0_stop_writes_trigger=$stop --num_levels=$levels --delete_obsolete_files_period_micros=$del --min_level_to_compress=$mcz --max_grandparent_overlap_factor=$overlap --stats_per_interval=1 --max_bytes_for_level_base=$bpl --memtablerep=vector --use_existing_db=1 --disable_auto_compactions=1 --source_compaction_factor=10000000
du -s -k test
504730832   test

Here are the command(s) for loading the data into leveldb:

echo "Bulk load database ...."
wbs=268435456; r=1000000000; t=1; vs=800; cs=1048576; of=500000; ./db_bench --benchmarks=fillrandom --num=$r --threads=$t --value_size=$vs --cache_size=$cs --bloom_bits=10 --open_files=$of --db=/data/mysql/leveldb/test --compression_ratio=50 --write_buffer_size=$wbs --use_existing_db=0

c. Bulk Load of keys in Sequential Order

Measure performance to load 1B keys into the database. The keys are inserted in sequential order. The database is empty at the beginning of this benchmark run and gradually fills up. No data is being read when the data load is in progress.

rocksdb:   36 minutes, 370 MB/sec (total data size 760 GB)
leveldb:   91 minutes, 146 MB/sec (total data size 760 GB)

Rocksdb was configured to use multi-threaded compactions so that multiple threads could be simultaneously compacting (via file-renames) non-overlapping key ranges in multiple levels. This was the primary reason why rocksdb is much much faster than leveldb for this workload. Here are the command(s) for loading the database into rocksdb.

echo "Load 1B keys sequentially into database....."
bpl=10485760;overlap=10;mcz=2;del=300000000;levels=6;ctrig=4; delay=8; stop=12; wbn=3; mbc=20; mb=67108864;wbs=134217728; dds=0; sync=0; r=1000000000; t=1; vs=800; bs=65536; cs=1048576; of=500000; si=1000000; ./db_bench --benchmarks=fillseq --disable_seek_compaction=1 --mmap_read=0 --statistics=1 --histogram=1 --num=$r --threads=$t --value_size=$vs --block_size=$bs --cache_size=$cs --bloom_bits=10 --cache_numshardbits=4 --open_files=$of --verify_checksum=1 --db=/data/mysql/leveldb/test --sync=$sync --disable_wal=1 --compression_type=zlib --stats_interval=$si --compression_ratio=50 --disable_data_sync=$dds --write_buffer_size=$wbs --target_file_size_base=$mb --max_write_buffer_number=$wbn --max_background_compactions=$mbc --level0_file_num_compaction_trigger=$ctrig --level0_slowdown_writes_trigger=$delay --level0_stop_writes_trigger=$stop --num_levels=$levels --delete_obsolete_files_period_micros=$del --min_level_to_compress=$mcz --max_grandparent_overlap_factor=$overlap --stats_per_interval=1 --max_bytes_for_level_base=$bpl --use_existing_db=0

Here are the command(s) for loading the data into leveldb:

echo "Load 1B keys sequentially into database....."
wbs=134217728; r=1000000000; t=1; vs=800; cs=1048576; of=500000; ./db_bench --benchmarks=fillseq --num=$r --threads=$t --value_size=$vs --cache_size=$cs --bloom_bits=10 --open_files=$of --db=/data/mysql/leveldb/test --compression_ratio=50 --write_buffer_size=$wbs --use_existing_db=0

d. Write Performance

Measure performance to randomly overwrite 1B keys into the database. The database was first created by sequentially inserting all the 1 B keys. The results here do not measure the sequential-insertion phase, it measures only second part of the test that overwrites 1 B keys in random order. The test was run with the Write-Ahead-Log (WAL) enabled but fsync on commit was not done to the WAL.

rocksdb: 15 hours 38 min;  56.295 micros/op, 17K ops/sec,  13.8 MB/sec
leveldb: many many days;  600 micros/op,     1.6K ops/sec, 1.3 MB/sec
          (in 5 days it overwrote only 662 million out of 1 billion keys, after which I killed the test)

Rocksdb was configured with 20 compaction threads. These threads can simultaneously compact non-overlapping key ranges in the same or different levels. Rocksdb was also configured for a 1TB database by setting the number of levels to 6 so that write amplification is reduced. L0-L1 compactions were given priority to reduce stalls. zlib compression was enabled only for levels 2 and higher so that L0 compactions can occur faster. Files were configured to be 64 MB in size so that frequent fsyncs after creation of newly compacted files are reduced. Here are the commands to overwrite 1 B keys in rocksdb:

echo "Overwriting the 1B keys in database in random order...."
bpl=10485760;overlap=10;mcz=2;del=300000000;levels=6;ctrig=4; delay=8; stop=12; wbn=3; mbc=20; mb=67108864;wbs=134217728; dds=0; sync=0; r=1000000000; t=1; vs=800; bs=65536; cs=1048576; of=500000; si=1000000; ./db_bench --benchmarks=overwrite --disable_seek_compaction=1 --mmap_read=0 --statistics=1 --histogram=1 --num=$r --threads=$t --value_size=$vs --block_size=$bs --cache_size=$cs --bloom_bits=10 --cache_numshardbits=4 --open_files=$of --verify_checksum=1 --db=/data/mysql/leveldb/test --sync=$sync --disable_wal=1 --compression_type=zlib --stats_interval=$si --compression_ratio=50 --disable_data_sync=$dds --write_buffer_size=$wbs --target_file_size_base=$mb --max_write_buffer_number=$wbn --max_background_compactions=$mbc --level0_file_num_compaction_trigger=$ctrig --level0_slowdown_writes_trigger=$delay --level0_stop_writes_trigger=$stop --num_levels=$levels --delete_obsolete_files_period_micros=$del --min_level_to_compress=$mcz --max_grandparent_overlap_factor=$overlap --stats_per_interval=1 --max_bytes_for_level_base=$bpl --use_existing_db=1

Here are the commands to overwrite 1 B keys in leveldb:

echo "Overwriting the 1B keys in database in random order...."
wbs=268435456; r=1000000000; t=1; vs=800; cs=1048576; of=500000; ./db_bench --benchmarks=overwrite --num=$r --threads=$t --value_size=$vs --cache_size=$cs --bloom_bits=10 --open_files=$of --db=/data/mysql/leveldb/test --compression_ratio=50 --write_buffer_size=$wbs --use_existing_db=1

g. Read performance

Measure random read performance of a database with 1 Billion keys, each key is 10 bytes and value is 800 bytes. Rocksdb and leveldb were both configured with a block size of 4 KB. Data compression is not enabled. There were 32 threads in the benchmark application issuing random reads to the database. rocksdb is configured to verify checksums on every read while leveldb has checksum verification switched off.

rocksdb:  70 hours,  8 micros/op, 126K ops/sec (checksum verification)
leveldb: 102 hours, 12 micros/op,  83K ops/sec (no checksum verification)

Data was first loaded into the database by sequentially writing all the 1B keys to the database. Once the load is complete, the benchmark randomly picks a key and issues a read request. The above measure measurement does not include the data loading part, it measures only the part that issues the random reads to database. The reason rocksdb is faster is because it does not use mmaped IO because mmaped IOs on some linux platforms are known to be slow. Also, rocksdb shards the block cache into 64 parts to reduce lock contention. rocksdb is configured to avoid compactions triggered by seeks whereas leveldb does seek-compaction for this workload.

Here are the commands used to run the benchmark with rocksdb:

echo "Load 1B keys sequentially into database....."
bpl=10485760;overlap=10;mcz=2;del=300000000;levels=6;ctrig=4; delay=8; stop=12; wbn=3; mbc=20; mb=67108864;wbs=134217728; dds=1; sync=0; r=1000000000; t=1; vs=800; bs=4096; cs=1048576; of=500000; si=1000000; ./db_bench --benchmarks=fillseq --disable_seek_compaction=1 --mmap_read=0 --statistics=1 --histogram=1 --num=$r --threads=$t --value_size=$vs --block_size=$bs --cache_size=$cs --bloom_bits=10 --cache_numshardbits=6 --open_files=$of --verify_checksum=1 --db=/data/mysql/leveldb/test --sync=$sync --disable_wal=1 --compression_type=none --stats_interval=$si --compression_ratio=50 --disable_data_sync=$dds --write_buffer_size=$wbs --target_file_size_base=$mb --max_write_buffer_number=$wbn --max_background_compactions=$mbc --level0_file_num_compaction_trigger=$ctrig --level0_slowdown_writes_trigger=$delay --level0_stop_writes_trigger=$stop --num_levels=$levels --delete_obsolete_files_period_micros=$del --min_level_to_compress=$mcz --max_grandparent_overlap_factor=$overlap --stats_per_interval=1 --max_bytes_for_level_base=$bpl --use_existing_db=0
echo "Reading 1B keys in database in random order...."
bpl=10485760;overlap=10;mcz=2;del=300000000;levels=6;ctrig=4; delay=8; stop=12; wbn=3; mbc=20; mb=67108864;wbs=134217728; dds=0; sync=0; r=1000000000; t=32; vs=800; bs=4096; cs=1048576; of=500000; si=1000000; ./db_bench --benchmarks=readrandom --disable_seek_compaction=1 --mmap_read=0 --statistics=1 --histogram=1 --num=$r --threads=$t --value_size=$vs --block_size=$bs --cache_size=$cs --bloom_bits=10 --cache_numshardbits=6 --open_files=$of --verify_checksum=1 --db=/data/mysql/leveldb/test --sync=$sync --disable_wal=1 --compression_type=none --stats_interval=$si --compression_ratio=50 --disable_data_sync=$dds --write_buffer_size=$wbs --target_file_size_base=$mb --max_write_buffer_number=$wbn --max_background_compactions=$mbc --level0_file_num_compaction_trigger=$ctrig --level0_slowdown_writes_trigger=$delay --level0_stop_writes_trigger=$stop --num_levels=$levels --delete_obsolete_files_period_micros=$del --min_level_to_compress=$mcz --max_grandparent_overlap_factor=$overlap --stats_per_interval=1 --max_bytes_for_level_base=$bpl --use_existing_db=1

Here are the commands used to run the test on leveldb:

echo "Load 1B keys sequentially into database....."
wbs=134217728; r=1000000000; t=1; vs=800; cs=1048576; of=500000; ./db_bench --benchmarks=fillseq --num=$r --threads=$t --value_size=$vs --cache_size=$cs --bloom_bits=10 --open_files=$of --db=/data/mysql/leveldb/test --compression_ratio=50 --write_buffer_size=$wbs --use_existing_db=0
echo "Reading the 1B keys in database in random order...."
wbs=268435456; r=1000000000; t=32; vs=800; cs=1048576; of=500000; ./db_bench --benchmarks=readrandom --num=$r --threads=$t --value_size=$vs --cache_size=$cs --bloom_bits=10 --open_files=$of --db=/data/mysql/leveldb/test --compression_ratio=50 --write_buffer_size=$wbs --use_existing_db=1

Salesforce Architecture – How They Handle 1.3 Billion Transactions A Day

This is a guest post written by Claude Johnson, a Lead Site Reliability Engineer at salesforce.com.

The following is an architectural overview of salesforce.com’s core platform and applications. Other systems such as Heroku’s Dyno architecture or the subsystems of other products such as work.com and do.com are specifically not covered by this material, although database.com is. The idea is to share with the technology community some insight about how salesforce.com does what it does. Any mistakes or omissions are mine.

This is by no means comprehensive but if there is interest, the author would be happy to tackle other areas of how salesforce.com works. Salesforce.com is interested in being more open with the technology communities that we have not previously interacted with. Here’s to the start of “Opening the Kimono” about how we work.

Since 1999, salesforce.com has been singularly focused on building technologies for business that are delivered over the Internet, displacing traditional enterprise software. Our customers pay via monthly subscription to access our services anywhere, anytime through a web browser. We hope this exploration of the core salesforce.com architecture will be the first of many contributions to the community.

Definitions

Let’s start with some basic salesforce.com terminology:

  • Instance – a complete set of systems, network and storage infrastructure, both shared and non-shared, that provides the salesforce.com service to a subset of our customers. For example, na14.salesforce.com is an instance.
  • Superpod – a set of systems, network and storage infrastructure, including outbound proxy servers, load balancers, mail servers, SAN fabric and other infrastructure supporting multiple instances. Superpods provide service isolation within a datacenter so that problems with shared or complex components cannot impact every instance in that datacenter.
  • Org (a.k.a., organization) – a single customer of the Salesforce application. Every trial started on http://www.salesforce.com or developer.force.com spawns a new org. An org is highly customizable and can have distinct security settings, record visibility and sharing settings, UI look and feel, workflows, triggers, custom objects, custom fields on standard salesforce.com CRM objects, and even custom REST APIs. An org can support anywhere from one to millions of licensed individual user accounts, portal user accounts and Force.com Sites user accounts.
  • Sandbox – an instance of the salesforce.com service that hosts full copies of production orgs for customer application development purposes. Customers using our platform can have full application development lifecycles. These are test environments for customers to do user acceptance testing against their applications before deploying changes into their production org.

Stats (As Of August 2013)

  • 17 North America instances, 4 EMEA instances and 2 APAC instances
  • 20 sandbox instances
  • 1,300,000,000+ daily transactions
  • 24,000 database transactions per second at peak (equivalent to a page view on other sites)
  • 15,000+ hardware systems
  • > 22 PB of raw SAN storage capacity
  • > 5K SAN ports

Software Technologies Employed

  • Linux for development and primary production systems
  • Solaris 10 w/ ZFS
  • Jetty
  • Solr
  • Memcache
  • Apache QPID
  • QFS
  • Puppet, Razor
  • Perl, Python
  • Nagios
  • Perforce, Git, Subversion

Hardware/Software Architecture

Logging In To The Salesforce.Com Service

We maintain a pool of servers to handle login traffic for all instances. A handful of servers from many (but not all) instances accept login requests and redirect the session to the user’s home instance. This is what happens when you log in via login.salesforce.com.

Customer traffic starts with our external DNS. Once a lookup has successfully returned the IP address for an instance, standard Internet routing directs it to the appropriate datacenter.

Once the traffic enters our network in that datacenter, it is directed to the load balancer pair on which that IP lives. All of our Internet-facing IPs are VIPs configured on an active/standby pair of load balancers.

Inside The Instance

The load balancer directs the traffic to the application tier of the given instance. At this tier, we service both standard web page traffic as well as our API traffic. API traffic makes up over 60% of the traffic serviced by our application tier overall. Depending on the needs of the customer’s request, it will be directed to additional server tiers for various types of backend processing.

Core App

The core app tier contains anywhere from ten to 40 app servers, depending on the instance. Each server runs a single Hotspot JVM configured with as much as a 14 GiB heap, depending on the server hardware configuration.

The batch server is responsible for running scheduled, automated processes on the database tier. For example, the Weekly Export process which is used to export customer data in a single archive file format as a form of backup.

Salesforce.com offers a number of services including basic and advanced content management. We have a content search server and a content batch server for managing asynchronous processes on the content application tier. The content batch servers schedule processing of content types, including functions such as rendering previews of certain file types and file type conversion.

Database

The primary data flow occurs between the core app server tier and the database tier. From a software perspective, everything goes through the database so database performance is critical. Each primary instance (e.g. NA, AP or EU instances) uses an 8 node clustered database tier. Customer sandbox (e.g. CS instances) have a 4 node clustered database tier.

Since salesforce.com is such a heavily database-driven system, reducing load on the database is critically important. To reduce load on the database tier, we developed ACS — API Cursor Server. This was a solution to 2 problems which enabled us to improve our core database performance significantly. First, we used to store cursors in the database but the deletes were impacting performance. Second, after moving to using database tables to hold cursors, the DDL overhead became a negative impact. Thus was born the ACS. ACS is a cursor cache running on a pair of servers, providing a method to offload cursor processing from the database tier.

Search

Our search tier runs on commodity Linux hosts, each of which is augmented with a 640 GiB PCI-E flash drive which serves as a caching layer for search requests. These hosts get their data from a shared SAN array via an NFS file system. Search indexes are stored on the flash drive to enable greater performance for search throughput.

Search indexing currently occurs on translation servers which mount LUNs from storage arrays via Fibre Channel SANs. Those LUNs make up a QFS file system which allows single writer but multi-reader access. Like most other critical systems, we run these in active/passive with the passive node doing some low priority search indexing work. It then ships its results to the active partner to write into the QFS file system.

The translation occurs when these same LUNs are mounted read-only from a group of four NFS servers running Solaris 10 on SPARC. These SAN mounted file systems then are shared via NFS to the search tier previously described.

Fileforce

We maintain a tier of servers that provide object storage, similar in concept to Amazon’s S3 or OpenStacks’ Swift project. This system, Fileforce, was developed internally to reduce the load on our DB tier. Prior to the introduction of Fileforce, all Binary Large Objects (BLOBs) were stored directly in the database. Once Fileforce came online, all BLOBs larger than 32 KiB were migrated into it. BLOBs smaller than 32 KiB in size continue living in the database. All BLOBs in Fileforce have a reference in the database so in order to restore Fileforce data from backups, we have to start a database instance based on a database backup from the same restore point.

Fileforce includes a bundler function, developed to reduce the disk seek load on the Fileforce servers. If 100+ objects smaller than 32 KB are stored in the database, a process runs on the app servers to bundle those objects into a single file. A reference to the bundled file remains in the database along with a seek offset into the bundle. This is similar to Facebook’s Haystack image storage system but built into an object storage system.

Support

Each instance contains various other servers for support roles such as debugging application servers and “Hammer testing” app servers in the app tier, hub servers which monitor each instance for health and monitor servers running Nagios. Outside of the instance itself reside supporting servers like storage management, database management, log aggregation, production access authentication and other functions.

Future Directions

On the database tier, we’re carefully examining several options for data storage systems. We’re also evaluating higher speed, lower latency interconnects such as Infiniband for our cluster interconnects with our existing database solution.

In the future, we expect to make the search hosts do all reads and writes. We are in the process of rolling out Apache Solr for search indexing. Solr runs locally on our existing search hosts. The SAN, NFS servers and SPARC based indexer hosts will all go away which will lead to a dramatic reduction in complexity of the entire search tier.

Our application container was previously Resin but over the last year, we’ve been migrating to Jetty. We’re also in the midst of a hardware refresh of our application tier hardware which will increase RAM sizes anywhere from 30% – 266% and introduce Sandy Bridge processors into our stack.

I hope this overview of the salesforce.com technology architecture and stack has been interesting and informative. Please leave a comment if you’re interested in additional guest posts deep diving into other parts of our expansive technology infrastructure. Thanks for reading!

(source Highscalability.com)