ejabberd Massive Scalability: 1 Node — 2+ Million Concurrent Users

How Far Can You Push ejabberd?

From our experience, we all get the idea that ejabberd is massively scalable. However, we wanted to provide benchmark results and hard figures to demonstrate our outstanding performance level and give a baseline about what to expect in simple cases.

That’s how we ended up with the challenge of fitting a very large number of concurrent users on a single ejabberd node.

It turns out you can get very far with ejabberd.

Scenario and Platforms

Here is our benchmark scenario: Target was to reach 2,000,000 concurrent users, each with 18 contacts on the roster and a session lasting around 1h. The scenario involves 2.2M registered users, so almost all contacts are online at the peak load. It means that presence packets were broadcast for those users, so there was some traffic as an addition to packets handling users connections and managing sessions. In that situation, the scenario produced 550 connections/second and thus 550 logins per second.

Database for authentication and roster storage was MySQL, running on the same node as ejabberd.

For the benchmark itself, we used Tsung, a tool dedicated to generating large loads to test servers performance. We used a single large instance to generate the load.

Both ejabberd and the test platform were running on Amazon EC2 instances. ejabberd was running on a single node of instance type m4.10xlarge (40 vCPU, 160 GiB). Tsung instance was identical.

Regarding ejabberd software itself, the test was made with ejabberd Community Server version 16.01. This is the standard open source version that is widely available and widely used across the world.

The connections were not using TLS to make sure we were focusing on testing ejabberd itself and not openSSL performance.

Code snippets and comments regarding the Tsung scenario are available for download: tsung_snippets.md

Overall Benchmark Results


We managed to surpass the target and we support more than2 million concurrent users on a single ejabberd.

For XMPP servers, the main limitation to handle a massive number of online users is usually memory consumption. With proper tuning, we managed to handle the traffic with a memory footprint of 28KB per online user.

The 40 CPUs were almost evenly used, with the exception of the first core that was handling all the network interruptions. It was more loaded by the Operating System and thus less loaded by the Erlang VM.

In the process, we also optimized our XML parser, released now as Fast XML, a high-performance, memory efficient Expat-based Erlang and Elixir XML parser.

Detailed Results

ejabberd Performance


Benchmark shows that we reached 2 million concurrent users after one hour. We were logging in about 33k users per minute, producing session traffic of a bit more than 210k XMPP packets per minute (this includes the stanzas to do the SASL authentication, binding, roster retrieval, etc). Maximum number of concurrent users is reached shortly after the 2 million concurrent users mark, by design in the scenario. At this point, we still connect new users but, as the first users start disconnecting, the number of concurrent users gets stable.

As we try to reproduce common client behavior we setup Tsung to send “keepalive pings” on the connections. Since each session sends one of such whitespace pings each minute, the number of such requests grows proportionately with the number of connected users. And while idle connections consume few resources on the server, it is important to note that in this scale they start to be noticeable. Once you have 2M users, you will be handling 33K/sec of such pings just from idle connections. They are not represented on the graphs, but are an important part of the real life traffic we were generating.

ejabberd Health


At all time, ejabberd health was fine. Typically, when ejabberd is overloaded, TCP connection establishment time and authentication tend to grow to an unacceptable level. In our case, both of those operations performed very fast during all bench, in under 10 milliseconds. There was almost no errors (the rare occurrences are artefacts of the benchmark process).

Platform Behavior


Good health and performance are confirmed by the state of the platform. CPU and memory consumption were totally under control, as shown in the graph. CPU consumption stays far from system limits. Memory grows proportionally to the number of concurrent users.

We also need to mention that values for CPUs are slightly overestimated as seen by the OS, as Erlang schedulers stay a bit of busy waiting when running out of work.

Challenge: The Hardest Part

The hardest part is definitely tuning the Linux system for ejabberd and for the benchmark tool, to overcome the default limitations. By default, Linux servers are not configured to allow you to handle, nor even generate, 2 million TCP sockets. It required quite a bit of network setup not to have problems with exhausted ports on the Tsung side.

On a similar topic, we worked with the Amazon server team, as we have been pushing the limits of their infrastructure like no one before. For example, we had to use a second Ethernet adapter with multiple IP addresses (2 x 15 IP, spread across 2 NICs). It also helped a lot to use latest Enhanced Networking drivers from Intel.

All in all, it was a very interesting process that helped make progress on Amazon Web Services by testing and tuning the platform itself.

What’s Next?

This benchmark was intended to demonstrate that ejabberd can scale to a large volume and serve as a baseline reference for more complex and full-featured platforms.

Next step is to keep on going with our benchmark and optimization iteration work. Our next target is to benchmark Multi-User Chat performance and Pubsub performance. The goal is to find the limits, optimize and demonstrate that massive internet scale can be done with these ejabberd components as well.

(via Blog.process-one.net)

Concurrency in Erlang & Scala: The Actor Model

Applications are becoming increasingly concurrent, yet the traditional way of doing this, threads and locks, is very troublesome. This article highlights Erlang and Scala, two programming languages that use a different approach to concurrency: the actor model.

The continuous increase of computer processor clock rate has recently slowed down, in part due to problems with heat dissipation. As the clock rate of a processor is increased, the heat generated increases too. Increasing current clock rates even further is troublesome, as the heat generated would push computer chips towards the limits of what is physically possible. Instead, processor manufacturers have turned towards multi-core processors, which are processors capable of doing multiple calculations in parallel. There has been an increased interest in software engineering techniques to fully utilize the capabilities offered by these processors.

At the same time, applications are more frequently built in a distributed way. In order to perform efficiently, without wasting most of the time on waiting for I/O operations, applications are forced to do more and more operations concurrently, to obtain maximum efficiency.

However, reasoning about concurrent systems can be far from trivial, as there can be a large number of interacting processes, with no predefined execution order. This paper discusses the concurrency model in Erlang and Scala, two languages that have recently gained in popularity, in part due to their support for scalable concurrency. This first section introduces both languages, by describing their key features.


Erlang is a concurrent programming language, originally designed by Ericsson (Erlang is named after A. K. Erlang, a Danish mathematician and Ericsson:Ericsson Language). It was designed to be distributed and fault-tolerant, for use in highly-available (non-stop) soft real-time telecom applications.

Erlang is a pure functional language, it features single assignment and eager evaluation. It also has built-in language constructs for distribution and concurrency. Erlang has a dynamic type system, where the typing of expressions is checked at run-time [1]. Typing information is fully optional and only used by the static type-checker, the run-time environment even allows running applications with invalid typing specifications. Run-time type checking is always performed by looking at the types of the data itself. This usually allows applications to run correctly, even when encountering unexpected data types.

The listings below show a simple program, written in Erlang and its execution in the Erlang shell. This program performs a recursive calculation of the factorial function.

A simple program in Erlang

fact(0) -> 1;
fact(N) -> N * fact(N-1).
Using the Erlang program
$ erl
Erlang (BEAM) emulator version 5.6.3 [source] [async-threads:0]
                                     [hipe] [kernel-poll:false]

Eshell V5.6.3  (abort with ^G)
1> c(factorial).
2> factorial:fact(8).

To allow for non-stop application, Erlang support hot-swapping of code. This means that it is possible to replace code while executing a program, making it possible to do upgrades and maintenance without interrupting the running system.


Scala, which stands for Scalable Language, is a programming language that aims to provide both object-oriented and functional programming styles, while staying compatible with the Java Virtual Machine (JVM). It was designed in 2001 by Martin Odersky and his group at EPFL in Lausanne, Switzerland, by combining the experiences gathered from designing multiple other languages.

The Scala language has been designed to be scalable [2], as it is supposed to scale along with the needs of its users. Offering functional constructs allows the developer to write short and concise code, whereas having object-oriented concepts allows the language to be used for large complex projects. Scala is fully interoperable with the Java language, which allows developers to use all Java libraries from within Scala. As such, it is not a separate language community, but rather allows you to take advantage of the enormous Java ecosystem. There is also an initiative to build a Scala version that runs on top of the .NET Common Language Runtime (CLR) [3], this ensures the portability of Scala to other underlying platforms.

Scala tries to stay to stay close to pure object-oriented programming and therefore does not have constructs like static fields and methods. Every value in Scala is an object and every operation is a method call [4]. Scala allows you to define your own operators and language constructs. This makes it possible to extend the Scala language according to your specific needs, which again helps it to grow (scale up) along with the project.

Scala uses strict-typing, yet allows most of the typing to be unspecified. When type information is not specified, the compiler will do smart type inference and attempt to infer this information from the code itself. This save programming effort and allows for more generic code. Type information is only required when the compiler cannot prove that correct type usage will happen.

The listing below shows a simple program in Scala, which shows some of its functional features, as well as some object-oriented features. As can be seen, an object with a method is defined. The definition of this method however is done in a pure functional style. There is no need to write a return statement, as the value of the function is considered to be the return statement.

A simple program in Scala
object TestFactorial extends Application {
    def fact(n: Int): Int = {
        if (n == 0) 1
        else n * fact(n-1)

    println("The factorial of 8 is "+fact(8))
    // Output: The factorial of 8 is 40320


A different way of concurrency: The Actor Model

The problem with threads

The traditional way of offering concurrency in a programming language is by using threads. In this model, the execution of the program is split up into concurrently running tasks. It is as if the program is being executed multiple times, the difference being that each of these copies operated on shared memory.

This can lead to a series of hard to debug problems, as can be seen below. The first problem, on the left, is the lost-update problem. Suppose two processes try to increment the value of a shared object acc. They both retrieve the value of the object, increment the value and store it back into the shared object. As these operations are not atomic, it is possible that their execution gets interleaved, leading to an incorrectly updated value of acc, as shown in the example.

The solution to this problems is the use of locks. Locks provide mutual exclusion, meaning that only one process can acquire the lock at the same time. By using a locking protocol, making sure the right locks are acquired before using an object, lost-update problems are avoided. However, locks have their own share of problems. One of them is the deadlock problem, which is pictured on the right. In this example two processes try to acquire the same two locks A and B. When both do so, but in a different order, a deadlock occurs. Both wait on the other to release the lock, which will never happen.

These are just some of the problems that might occur when attempting to use threads and locks.

Lost Update Problem

Process 1 Process 2
a = acc.get()
a = a + 100 b = acc.get()
b = b + 50

Deadlock Problem

Process 1 Process 2
lock(A) lock(B)
lock(B) lock(A)

… Deadlock! …


Both Erlang and Scala take a different approach to concurrency: the Actor Model. It is necessary to look at the concepts of the actor model first, before studying the peculiarities of the languages itself.

The Actor Model

The Actor Model, which was first proposed by Carl Hewitt in 1973 [5] and was improved, among others, by Gul Agha [6]. This model takes a different approach to concurrency, which should avoid the problems caused by threading and locking.

In the actor model, each object is an actor. This is an entity that has a mailbox and a behaviour. Messages can be exchanged between actors, which will be buffered in the mailbox. Upon receiving a message, the behaviour of the actor is executed, upon which the actor can: send a number of messages to other actors, create a number of actors and assume new behaviour for the next message to be received.

Of importance in this model is that all communications are performed asynchronously. This implies that the sender does not wait for a message to be received upon sending it, it immediately continues its execution. There are no guarantees in which order messages will be received by the recipient, but they will eventually be delivered.

A second important property is that all communications happen by means of messages: there is no shared state between actors. If an actor wishes to obtain information about the internal state of another actor, it will have to use messages to request this information. This allows actors to control access to their state, avoiding problems like the lost-update problem. Manipulation of the internal state also happens through messages.

Each actor runs concurrently with other actors: it can be seen as a small independently running process.

Actors in Erlang

In Erlang, which is designed for concurrency, distribution and scalability, actors are part of the language itself. Due to its roots in the telecom industry, where a very large amount of concurrent processes are normal, it is almost impossible to think of Erlang without actors, which are also used to provide distribution. Actors in Erlang are called processes and are started using the built-in spawn function.

A simple application that uses an actor can be seen below. In this application, an actor is defined which acts as a basic counter. We send 100.000 increment messages to the actor and then request it to print its internal value.

Actors in Erlang
-export([run/0, counter/1]).

run() ->
    S = spawn(counter, counter, [0]),
    send_msgs(S, 100000),

counter(Sum) ->
        value -> io:fwrite("Value is ~w~n", [Sum]);
        {inc, Amount} -> counter(Sum+Amount)

send_msgs(_, 0) -> true;
send_msgs(S, Count) ->
    S ! {inc, 1},
    send_msgs(S, Count-1).

% Usage:
%    1> c(counter).
%    2> S = counter:run().
%       ... Wait a bit until all children have run ...
%    3> S ! value.
%    Value is 100000

Lines 1 & 2 defines the module and the exported functions. Lines 4 till 7 contain the run function, which starts a counter process and starts sending increment messages. Sending these messages happens in lines 15 till 18, using the message-passing operator (!). As Erlang is a purely functional language, it has no loop structures. Therefore, this has to be expressed using recursion. These extremely deep recursion stacks would lead to stack overflows in Java, yet Erlang is optimized for these usage patterns. The increment message in this example also carries a parameter, to show Erlangs parameter capabilities. The state of the counter is also maintained using recursion: upon receiving an inc message, the counter calls itself with the new value which causes it to receive the next message. If no messages are available yet, the counter will block and wait for the next message.

Actor scheduling in Erlang

Erlang uses a preemptive scheduler for the scheduling of processes [7]. When they have executed for a too long period of time (usually measured in the amount of methods invoked or the amount of CPU-cycles used), or when they enter a receive statement with no messages available, the process is halted and placed on a scheduling queue.

This allows for a large number of processes to run, with a certain amount of fairness. Long running computations will not cause other processes to become unresponsive.

Starting with release R11B, which appeared in May 2006, the Erlang run-time environment has support for symmetric multiprocessing (SMP) [8]. This means that it is able to schedule processes in parallel on multiple CPUs, allowing it to take advantage of multi-core processors. The functional nature of Erlang allows for easy parallelization. An Erlang lightweight process (actor) will never run in parallel on multiple processors, but using a multi-threaded run-time allows multiple processes to run at the same time. Big performance speedups have been observed using this technique.

Actors in Scala

Actors in Scala are available through the scala.actors library. Their implementation is a great testament for the expressiveness of Scala: all functionality, operators and other language constructs included, is implemented in pure Scala, as a library, without requiring changes to Scala itself.

The same sample application, this time written in Scala can be seen below:


Actors in Scala
import scala.actors.Actor
import scala.actors.Actor._

case class Inc(amount: Int)
case class Value

class Counter extends Actor {
    var counter: Int = 0;

    def act() = {
        while (true) {
            receive {
                case Inc(amount) =>
                    counter += amount
                case Value =>
                    println("Value is "+counter)

object ActorTest extends Application {
    val counter = new Counter

    for (i <- 0 until 100000) {
        counter ! Inc(1)
    counter ! Value
    // Output: Value is 100000


We can see the following code: Lines 1 & 2 import the abstract Actor class (Note: This is actually a trait, a special composition mechanism used by Scala, but this is out of the scope of this article.) and its members (we need the ! operator for sending messages). Lines 4 & 5 define the Inc and Value} case classes, which will be used as message identifiers. The increment message has a parameter, as an example to demonstrate this ability.

Lines 7 till 21 define the Counter actor, as a subclass of Actor. The act() method is overridden, which provides the behavior of the actor (lines 10-20). This version of the counter actor is written using a more object-oriented style (though Scala fully supports the pure functional way, as shown in the Erlang example too). The state of the actor is maintained in an integer field counter. In the act() method, an endless receive loop is executed. This block processes any incoming messages, either by updating the internal state, or by printing its value and exiting.

Finally, on lines 23 till 32, we find the main application, which first constructs a counter, then sends 100.000 Inc messages and finally send it the Value message. The ! operator is used to send a message to an actor, a notation that was borrowed from Erlang.

The output of this program shows that the counter has been incremented up to a value of 100.000. This means that in this case all our messages were delivered in order. This might not always be the case: recall that there are no guarantees on the order of message delivery in the actor model.

The example above shows the ease with which actors can be used in Scala, even though they are not part of the language itself.

It also shows the similarities between the actors library in Scala and the Erlang language constructs. This is no coincidence, as the Scala actors library was heavily inspired by Erlang. The Scala developers have however expanded upon Erlangs concepts and added a number of features, which will be highlighted in the following sections.


The authors of scala.actors noticed a recurring pattern in the usage of actors: a request/reply pattern [9]. This pattern is illustrated below. Often, a message is sent to an actor and in that message, the sender is passed along. This allows the receiving actor to reply to the message.

To facilitate this, a reply() construct was added. This removes the need to send the sender along in the message and provides easy syntax for replying.

Normal request/reply
receive {
    case Msg(sender, value) =>
        val r = process(value)
        sender ! Response(r)
Request/reply using reply()
receive {
    case Msg(value) =>
        val r = process(value)

reply() construct is not present in Erlang, where you are forced to include the sender each time you want to be able to receive replies. This is not a bad thing however: Scala messages always carry the identity of the sender with them to enable this functionality. This causes a tiny bit of extra overhead, which might be too much in performance critical applications.

Synchronous messaging

Scala contains a construct which can be used to wait for message replies. This allows for synchronous invocation, which is more like method invocation. The syntax to do this is shown below, to the right, contrasted by the normal way of writing this to the left.

When entering the receive block, or upon using the !? operator, the actor waits until it receives a message matched by any of the case clauses. When the actor receives a message that is not matched, it will stay in the mailbox of the actor and retried when a new receive block is entered.

Waitin for a reply
myService ! Msg(value)
receive {
    case Response(r) => // ...
Synchronous invocation using !?
myService !? Msg(value) match {
    case Response(r) => // ...


Messages, as used in the previous examples, are used in a somewhat loosely typed fashion. It is possible to send any kind of message to an actor. Messages that are not matched by any of the case clauses will remain in the mailbox, rather than causing an error.

Scala has a very rich type system and the Scala developers wanted to take advantage of this. Therefore they added the concept of channels [9]. These allow you to specify the type of messages that can be accepted using generics. This enables type-safe communication.

The mailbox of an actor is a channel that accepts any type of message.

Thread-based vs. Event-based actors

Scala makes the distinction between thread-based and event-based actors.

Thread-based actors are actors which each run in their own JVM thread. They are scheduled by the Java thread scheduler, which uses a preemptive priority-based scheduler. When the actor enters a receive block, the thread is blocked until messages arrive. Thread-based actors make it possible to do long-running computations, or blocking I/O operations inside actors, without hindering the execution of other actors.

There is an important drawback to this method: each thread can be considered as being heavy-weight and uses a certain amount of memory and imposes some scheduling overhead. When large amounts of actors are started, the virtual machine might run out of memory or it might perform suboptimal due to large scheduling overhead.

In situations where this is unacceptable, event-based actors can be used. These actor are not implemented by means of one thread per actor, yet instead they run on the same thread. An actor that waits for a message to be received is not represented by a blocked thread, but by a closure. This closure captures the state of the actor, such that it’s computation can be continued upon receiving a message [10]. The execution of this closure happens on the thread of the sender.

Event-based actors provide a more light-weight alternative, allowing for very large numbers of concurrently running actors. They should however not be used for parallelism: since all actors execute on the same thread, there is no scheduling fairness.

A Scala programmer can use event-based actors by using a react block instead of a receive block. There is one big limitation to using event-based actors: upon entering a react block, the control flow can never return to the enclosing actor. In practice, this does not prove to be a severe limitation, as the code can usually be rearranged to fit this scheme. This property is enforced through the advanced Scala type system, which allows specifying that a method never returns normally (the Nothing type). The compiler can thus check if code meets this requirement.

Actor scheduling in Scala

Scala allows programmers to mix thread-based actors and event-based actors in the same program: this way programmers can choose whether they want scalable, lightweight event-based actors, or thread-based actors that allow for parallelism, depending on the situation in which they are needed.

Scala uses a thread pool to execute actors. This thread pool will be resized automatically whenever necessary. If only event-based actors are used, the size of this thread pool will remain constant. When blocking operations are used, like receive blocks, the scheduler (which runs in its own separate thread) will start new threads when needed. Periodically, the scheduler will check if there are runnable tasks in the task queue, it will then check if all worker threads are blocked and start a new worker thread if needed.


The next section will evaluate some of the features in both languages. It aims to show that while some of these features facilitate the implementation, their power also comes with a risk. One should be aware of the possible drawbacks of the chosen technologies, to avoid potential pitfalls.

The dangers of synchronous message passing

The synchronous message passing style available in Scala (using !?) provides programmers with a convenient way of doing messaging round-trips. This allows for a familiar style of programming, similar to remote method invocation.

It should however be used with great care. Due to the very selective nature of the match clause that follows the use of !? (it usually matches only one type of message), the actor is effectively blocked until a suitable reply is received. This is implemented using a private return channel [9], which means that the progress of the actor is fully dependent on the actor from which it awaits a reply: it cannot handle any messages other than the expected reply, not even if they come from the actor from which it awaits reply.

This is a dangerous situation, as it might lead to deadlocks. To see this, consider the following example:

Actor deadlock: Actor A
actorB !? Msg1(value) match {
    case Response1(r) =>
      // ...

receive {
    case Msg2(value) =>
Actor deadlock: Actor B
ctorA !? Msg2(value) match {
    case Response2(r) =>
      // ...

receive {
    case Msg1(value) =>

In this example, each actor sends a message to the other actor. It will never receive an answer, as the actor is first awaiting a different message. If it were implemented using a message loop, like you would generally do in Erlang, no problem would arise. This is shown below. This does not mean that synchronous message passing should be avoided: in certain cases, it is necessary and in these cases the extra syntax makes this much easier to program. It is however important to be aware of the potential problems caused by this programming style.

Safe loop: Actor A
actorB ! Msg1(value)
while (true) {
    receive {
        case Msg2(value) =>
        case Response1(r) =>
          // ...
Safe loop: Actor B
actorA ! Msg2(value)
while (true) {
    receive {
        case Msg1(value) =>  
        case Response2(r) =>
          // ...

The full source code for these examples can be found online, see the appendix for more information.

Safety in Scala concurrency

Another potential pit-fall in Scala comes from the fact that it mixes actors with object-oriented programming. It is possible to expose the internal state of an actor through publicly available methods for retrieving and modifying this state. When doing so, it is possible to modify an object by directly invoking its methods, that is: without using messages. Doing so means that you no longer enjoy the safety provided by the actor model.

Erlang on the other hand, due to its functional nature, strictly enforces the use of messages between processes: there is no other way to retrieve and update information in other processes.

This illustrates possibly the biggest trade-off between Erlang and Scala: having a pure functional language, like Erlang, is safe, but more difficult for programmers to use. The object-oriented, imperative style of Scala is more familiar and makes programming easier, yet requires more discipline and care to produce safe and correct programs.


This article described the actor model for the implementation of concurrency in applications, as an alternative to threading and locking. It highlighted Erlang and Scala, two languages with an implementation of the actor model and showed how these languages implement this model.

Erlang is a pure functional language, providing little more than the basic features of functional languages. This should certainly not be seen as a weakness though: this simplicity allows it to optimize specifically for the cases for which it was defined as well as implement more advanced features like hot-swapping of code.

Scala on the other hand uses a mix of object-oriented and functional styles. This makes it easier for a programmer to write code, especially given the extra constructs offered by Scala, but this flexibility comes with a warning: discipline should be used to avoid inconsistencies.

The differences between these languages should be seen and evaluated in their design context. Both however provide an easy to use implementation of the actor model, which greatly facilitates the implementation of concurrency in applications.



Appendix: Source code + PDF

The source code for the sample programs and a PDF version can be found right here (tar.gz download). It has been tested on Ubuntu Linux 8.10, with Erlang 5.6.3 and Scala 2.7.2 final (installed from Debian packages). It should work on any platform.

(Via rocketeer.be)

CloudI: Bringing Erlang’s Fault-Tolerance to Polyglot Development

Clouds must be efficient to provide useful fault-tolerance and scalability, but they also must be easy to use.

CloudI (pronounced “cloud-e” /klaʊdi/) is an open source cloud computing platform built in Erlang that is most closely related to the Platform as a Service (PaaS) clouds. CloudI differs in a few key ways, most importantly: software developers are not forced to use specific frameworks, slow hardware virtualization, or a particular operating system. By allowing cloud deployment to occur without virtualization, CloudI leaves development process and runtime performance unimpeded, while quality of service can be controlled with clear accountability.

What makes a cloud a cloud?

The word “cloud” has become ubiquitous over the past few years. And its true meaning has become somewhat lost. In the most basic technological sense, these are the properties that a cloud computing platform must have:

And these are the properties that we’d like a cloud to have:

  • Easy integration
  • Simple deployment

My goal in building CloudI was to bring together these four attributes.

It’s important to understand that few programming languages can provide real fault-tolerance with scalability. In fact, I’d say Erlang is roughly alone in this regard.

I began by looking at the Erlang programming language (on top of which CloudI is built). The Erlang virtual machine provides fault-tolerance features and a highly scalable architecture while the Erlang programming language keeps the required source code small and easy to follow.

It’s important to understand that few programming languages can provide real fault-tolerance with scalability. In fact, I’d say Erlang is roughly alone in this regard. Let me take a detour to explain why and how.


What is fault-tolerance in cloud computing?

Fault-tolerance is robustness to error. That is, fault-tolerant systems are able to continue operating relatively normally even in the event of (hopefully isolated) errors.

Here, we see service C sending requests to services A and B. Although service B crashes temporarily, the rest of the system continues, relatively unimpeded.

Erlang tutorial for beginners

Erlang is known for achieving 9x9s of reliability (99.9999999% uptime, so less than 31.536 milliseconds of downtime per year) with real production systems (within the telecommunications industry). Normal web development techniques only achieve 5x9s reliability (99.999% uptime, so about 5.256 minutes of downtime per year), if they are lucky, due to slow update procedures and complete system failures. How does Erlang provide this advantage?

The Erlang virtual machine implements what is called an “Actor model”, a mathematical model for concurrent computation. Within the Actor model, Erlang’s lightweight processes are a concurrency primitive of the language itself. That is, within Erlang, we assume that everything is an actor. By definition, actors perform their actions concurrently; so if everything is an actor, we get inherent concurrency. (For more on Erlang’s Actor model, there’s a longer discussion here.)

As a result, Erlang software is built with many lightweight processes that keep process state isolated while providing extreme scalability. When an Erlang process needs external state, a message is normally sent to another process, so that the message queuing can provide the Erlang processes with efficient scheduling. To keep the Erlang process state isolated, the Erlang virtual machine does garbage collection for each process individually so that other Erlang processes can continue running concurrently without being interrupted.

The Erlang virtual machine garbage collection is an important difference when compared with Java virtual machine garbage collection because Java depends on a single heap, which lacks the isolated state provided by Erlang. The difference between Erlang garbage collection and Java garbage collection means that Java is unable to provide basic fault-tolerance guarantees simply due to the virtual machine garbage collection, even if libraries or language support was developed on top of the Java virtual machine. There have been attempts to develop fault-tolerance features in Java and other Java virtual machine based languages, but they continue to be failures due to the Java virtual machine garbage collection.


Basically, building real-time fault-tolerance support on top of the JVM is by definition impossible, because the JVM itself is not fault-tolerant.

Erlang processes

At a low level, what happens when we get an error in an Erlang process? The language itself uses message passing between processes to ensure that any errors have a scope limited by a concurrent process. This works by storing data types as immutable objects; these objects are copied to limit the scope of the process state (large binaries are a special exception because they are reference counted to conserve memory).

In basic terms, that means that if we want to send variable X to another process P, we have to copy over X as its own immutable variable X’. We can’t modify X’ from our current process, and so even if we trigger some error, our second process P will be isolated from its effects. The end result is low-level control over the scope of any errors due to the isolation of state within Erlang processes. If we wanted to get even more technical, we’d mention that Erlang’s lack of mutability gives it referential transparency unlike, say, Java.

This type of fault-tolerance is deeper than just adding try-catch statements and exceptions. Here, fault-tolerance is about handling unexpected errors, and exceptions are expected. Here, you’re trying to keep your code running even when one of your variables unexpectedly explodes.

Erlang’s process scheduling provides extreme scalability for minimal source code, making the system simpler and easier to maintain. While it is true that other programming languages have been able to imitate thescalability found natively within Erlang by providing libraries with user-level threading (possibly combined with kernel-level threading) and data exchanging (similar to message passing) to implement their own Actor model for concurrent computation, the efforts have been unable to replicate the fault-tolerance provided within the Erlang virtual machine.

This leaves Erlang alone amongst programming languages as being both scalable and fault-tolerant, making it an ideal development platform for a cloud.

Taking advantage of Erlang

So, with all that said, I can make the claim that CloudI brings Erlang’s fault-tolerance and scalability to various other programming languages (currently C++/C, Erlang (of course), Java, Python, and Ruby), implementing services within a Service Oriented Architecture (SOA).

This simplicity makes CloudI a flexible framework for polyglot software development, providing Erlang’s strengths without requiring the programmer to write or even understand a line of Erlang code.

Every service executed within CloudI interacts with the CloudI API. All of the non-Erlang programming language services are handled using the same internal CloudI Erlang source code. Since the same minimal Erlang source code is used for all the non-Erlang programming languages, other programming language support can easily be added with an external programming language implementation of the CloudI API. Internally, the CloudI API is only doing basic serialization for requests and responses. This simplicity makes CloudI a flexible framework for polyglot software development, providing Erlang’s strengths without requiring the programmer to write or even understand a line of Erlang code.

The service configuration specifies startup parameters and fault-tolerance constraints so that service failures can occur in a controlled and isolated way. The startup parameters clearly define the executable and any arguments it needs, along with default timeouts used for service requests, the method to find a service (called the “destination refresh method”), both an allow and deny simple access control list (ACL) to block outgoing service requests and optional parameters to affect how service requests are handled. The fault-tolerance constraints are simply two integers (MaxR: maximum restarts, and MaxT: maximum time period in seconds) that control a service in the same way an Erlang supervisor behavior (an Erlang design pattern) controls Erlang processes. The service configuration provides explicit constraints for the lifetime of the service which helps to make the service execution easy to understand, even when errors occur.

To keep the service memory isolated during runtime, separate operating system processes are used for each non-Erlang service (referred to as “external” services) with an associated Erlang process (for each non-Erlang thread of execution) that is scheduled by the Erlang VM. The Erlang CloudI API creates “internal” services, which are also associated with an Erlang process, so both “external” services and “internal” services are processed in the same way within the Erlang VM.


In cloud computing, it’s also important that your fault-tolerance extends beyond a single computer (i.e., distributed system fault-tolerance). CloudI uses distributed Erlang communication to exchange service registration information, so that services can be utilized transparently on any instance of CloudI by specifying a single service name for a request made with the CloudI API. All service requests are load-balanced by the sending service and each service request is a distributed transaction, so the same service with separate instances on separate computers is able to provide system fault-tolerance within CloudI. If necessary, CloudI can be deployed within a virtualized operating system to provide the same system fault-tolerance while facilitating a stable development framework.

For example, if an HTTP request needs to store some account data in a database it could be made to the configured HTTP service (provided by CloudI) which would send the request to an account data service for processing (based on the HTTP URL which is used as the service name) and the account data would then be stored in a database. Every service request receives a Universally Unique IDentifier (UUID) upon creation, which can be used to track the completion of the service request, making each service request within CloudI a distributed transaction. So, in the account data service example, it is possible to make other service requests either synchronously or asynchronously and use the service request UUIDs to handle the response data before utilizing the database. Having the explicit tracking of each individual service request helps ensure that service requests are delivered within the timeout period of a request and also provides a way to uniquely identify the response data (the service request UUIDs are unique among all connected CloudI nodes).

With a typical deployment, each CloudI node could contain a configured instance of the account data service (which may utilize multiple operating system processes with multiple threads that each have a CloudI API object) and an instance of the HTTP service. An external load balancer would easily split the HTTP requests between the CloudI nodes and the HTTP service would route each request as a service request within CloudI, so that the account data service can easily scale within CloudI.

CloudI in action

CloudI lets you take unscalable legacy source code, wrap it with a thin CloudI service, and then execute the legacy source code with explicit fault-tolerance constraints. This particular development workflow is important for fully utilizing multicore machines while providing a distributed system that is fault-tolerant during the processing of real-time requests. Creating an “external” service in CloudI is simply instantiating the CloudI API object in all the threads that have been configured within the service configuration, so that each thread is able to handle CloudI requests concurrently. A simple service example can utilize the single main thread to create a single CloudI API object, like the following Python source code:

import sys
from cloudi_c import API

class Task(object):
    def __init__(self):
        self.__api = API(0) # first/only thread == 0

    def run(self):
        self.__api.subscribe('hello_world_python/get', self.__hello_world)

    def __hello_world(self, command, name, pattern, request_info, request,
                      timeout, priority, trans_id, pid):
        return 'Hello World!'

if __name__ == '__main__':
    assert API.thread_count() == 1 # simple example, without threads
    task = Task()

The example service simply returns a “Hello World!” to an HTTP GET request by first subscribing with a service name and a callback function. When the service starts processing incoming CloudI service bus requests within the CloudI API poll function, incoming requests from the “internal” service which provides a HTTP server are routed to the example service based on the service name, because of the subscription. The service could also have returned no data as a response, if the request needed to be similar to a publish message in a typical distributed messaging API that provides publish/subscribe functionality. The example service wants to provide a response so that the HTTP server can provide the response to the HTTP client, so the request is a typical request/reply transaction. With both possible responses from a service, either data or no data, the service callback function controls the messaging paradigm used, instead of the calling service, so the request is able to route through as many services as necessary to provide a response when necessary, within the timeout specified for the request to occur within. Timeouts are very important for real-time event processing, so each request specifies an integer timeout which follows the request as it routes though any number of services. The end result is that real-time constraints are enforced alongside fault-tolerance constraints, to provide a dependable service in the presence of any number of software errors.

The presence of source code bugs in any software should be understood as a clear fact that is only mitigated with fault-tolerance constraints. Software development can reduce the presence of bugs as software is maintained, but it can also add bugs as software features are added. CloudI provides cloud computing which is able to address these important fault-tolerance concerns within real-time distributed systems software development. As I have demonstrated in this CloudI and Erlang tutorial, the cloud computing that CloudI provides is minimal so that efficiency is not sacrificed for the benefits of cloud computing.

(Via toptal.com)

The WhatsApp Architecture

WhatsApp stats: What has hundreds of nodes, thousands of cores, hundreds of terabytes of RAM, and hopes to serve the billions of smartphones that will soon be a reality around the globe? The Erlang/FreeBSD-based server infrastructure at WhatsApp. We’ve faced many challenges in meeting the ever-growing demand for our messaging services, but as we continue to push the envelope on size (>8000 cores) and speed (>70M Erlang messages per second) of our serving system.

A warning here, we don’t know a lot about the WhatsApp over all architecture. Just bits and pieces gathered from various sources. Rick Reed’s main talk is about the optimization process used to get to 2 million connections a server while using Erlang, which is interesting, but it’s not a complete architecture talk.


These stats are generally for the current system, not the system we have a talk on. The talk on the current system will include more on hacks for data storage, messaging, meta-clustering, and more BEAM/OTP patches.

  • 450 million active users, and reached that number faster than any other company in history.

  • 32 engineers, one developer supports 14 million active users

  • 50 billion messages every day across seven platforms (inbound + outbound)

  • 1+ million people sign up every day

  • $0 invested in advertising

  • $60 million investment from Sequoia Capital; $3.4 billion is the amount Sequoia will make

  • 35% is how much of Facebook’s cash is being used for the deal
  • Hundreds of nodes

  • >8000 cores

  • Hundreds of terabytes of RAM

  • >70M Erlang messages per second

  • In 2011 WhatsApp achieved 1 million established tcp sessions on a single machine with memory and cpu to spare. In 2012 that was pushed to over 2 million tcp connections. In 2013 WhatsApp tweeted out: On Dec 31st we had a new record day: 7B msgs inbound, 11B msgs outbound = 18 billion total messages processed in one day! Happy 2013!!!



  • Erlang

  • FreeBSD

  • Yaws, lighttpd

  • PHP

  • Custom patches to BEAM (BEAM is like Java’s JVM, but for Erlang)

  • Custom XMPP

  • Hosting may be in Softlayer


  • Seven client platforms: iPhone, Android, Blackberry, Nokia Symbian S60, Nokia S40, Windows Phone, ?

  • SQLite


  • Standard user facing server:

    • Dual Westmere Hex-core (24 logical CPUs);

    • 100GB RAM, SSD;

    • Dual NIC (public user-facing network, private back-end/distribution);


  • Focus is on messaging. Connecting people all over the world, regardless of where they are in the world, without having to pay a lot of money. Founder Jan Koum remembers how difficult it was in 1992 to connect to family all over the world.

  • Privacy. Shaped by Jan Koum’s experiences growing up in the Ukraine, where nothing was private. Messages are not stored on servers; chat history is not stored; goal is to know as little about users as possible; your name and your gender are not known; chat history is only on your phone.


  • WhatsApp server is almost completely implemented in Erlang.

    • Server systems that do the backend message routing are done in Erlang.

    • Great achievement is that the number of active users is managed with a really small server footprint. Team consensus is that it is largely because of Erlang.

    • Interesting to note Facebook Chat was written in Erlang in 2009, but they went away from it because it was hard to find qualified programmers.

  • WhatsApp server has started from ejabberd

    • Ejabberd is a famous open source Jabber server written in Erlang.

    • Originally chosen because its open, had great reviews by developers, ease of start and the promise of Erlang’s long term suitability for large communication system.

    • The next few years were spent re-writing and modifying quite a few parts of ejabberd, including switching from XMPP to internally developed protocol, restructuring the code base and redesigning some core components, and making lots of important modifications to Erlang VM to optimize server performance.

  • To handle 50 billion messages a day the focus is on making a reliable system that works. Monetization is something to look at later, it’s far far down the road.

  • A primary gauge of system health is message queue length. The message queue length of all the processes on a node is constantly monitored and an alert is sent out if they accumulate backlog beyond a preset threshold. If one or more processes falls behind that is alerted on, which gives a pointer to the next bottleneck to attack.

  • Multimedia messages are sent by uploading the image, audio or video to be sent to an HTTP server and then sending a link to the content along with its Base64 encoded thumbnail (if applicable).

  • Some code is usually pushed every day. Often, it’s multiple times a day, though in general peak traffic times are avoided. Erlang helps being aggressive in getting fixes and features into production. Hot-loading means updates can be pushed without restarts or traffic shifting. Mistakes can usually be undone very quickly, again by hot-loading. Systems tend to be much more loosely-coupled which makes it very easy to roll changes out incrementally.

  • What protocol is used in Whatsapp app? SSL socket to the WhatsApp server pools. All messages are queued on the server until the client reconnects to retrieve the messages. The successful retrieval of a message is sent back to the whatsapp server which forwards this status back to the original sender (which will see that as a “checkmark” icon next to the message). Messages are wiped from the server memory as soon as the client has accepted the message

  • How does the registration process work internally in Whatsapp? WhatsApp used to create a username/password based on the phone IMEI number. This was changed recently. WhatsApp now uses a general request from the app to send a unique 5 digit PIN. WhatsApp will then send a SMS to the indicated phone number (this means the WhatsApp client no longer needs to run on the same phone). Based on the pin number the app then request a unique key from WhatsApp. This key is used as “password” for all future calls. (this “permanent” key is stored on the device). This also means that registering a new device will invalidate the key on the old device.

  • Google’s push service is used on Android.

  • More users on Android. Android is more enjoyable to work with. Developers are able to prototype a feature and push it out to hundreds of millions of users overnight, if there’s an issue it can be fixed quickly. iOS, not so much.

The Quest For 2+ Million Connections Per Server

  • Experienced lots of user growth, which is a good problem to have, but it also means having to spend money buying more hardware and increased operational complexity of managing all those machines.

  • Need to plan for bumps in traffic. Examples are soccer games and earthquakes in Spain or Mexico. These happen near peak traffic loads, so there needs to be enough spare capacity to handle peaks + bumps. A recent soccer match generated a 35% spike in outbound message rate right at the daily peak.

  • Initial server loading was 200 simultaneous connections per server.

    • Extrapolated out would mean a lot of servers with the hoped for growth pattern.

    • Servers were brittle in the face of burst loads. Network glitches and other problems would occur. Needed to decouple components so things weren’t so brittle at high capacity.

    • Goal was a million connections per server. An ambitious goal given at the time they were running at 200K connections. Running servers with headroom to allow for world events, hardware failures, and other types of glitches would require enough resilience to handle the high usage levels and failures.

Tools And Techniques Used To Increase Scalability

  • Wrote system activity reporter tool (wsar):

    • Record system stats across the system, including OS stats, hardware stats, BEAM stats. It was build so it was easy to plugin metrics from other systems, like virtual memory. Track CPU utilization, overall utilization, user time, system time, interrupt time, context switches, system calls, traps, packets sent/received, total count of messages in queues across all processes, busy port events, traffic rate, bytes in/out, scheduling stats, garbage collection stats, words collected, etc.

    • Initially ran once a minute. As the systems were driven harder one second polling resolution was required because events that happened in the space if a minute were invisible. Really fine grained stats to see how everything is performing.

  • Hardware performance counters in CPU (pmcstat):

    • See where the CPU is at as a percentage of time. Can tell how much time is being spent executing the emulator loop. In their case it is 16% which tells them that only 16% is executing emulated code so even if you were able to remove all the execution time of all the Erlang code it would only save 16% out of the total runtime. This implies you should focus in other areas to improve efficiency of the system.

  • dtrace, kernel lock-counting, fprof

    • Dtrace was mostly for debugging, not performance.

    • Patched BEAM on FreeBSD to include CPU time stamp.

    • Wrote scripts to create an aggregated view of across all processes to see where routines are spending all the  time.

    • Biggest win was compiling the emulator with lock counting turned on.

  • Some Issues:

    • Earlier on saw more time spent in the garbage collections routines, that was brought down.

    • Saw some issues with the networking stack that was tuned away.

    • Most issues were with lock contention in the emulator which shows strongly in the output of the lock counting.

  • Measurement:

    • Synthetic workloads, which means generating traffic from your own test scripts, is of little value for tuning user facing systems at extreme scale.

      • Worked well for simple interfaces like a user table, generating inserts and reads as quickly as possible.

      • If supporting a million connections on a server it would take 30 hosts to open enough IP ports to generate enough connections to test just one server. For two million servers that would take 60 hosts. Just difficult to generate that kind of scale.

      • The type of traffic that is seen during production is difficult to generate. Can guess at a normal workload, but in actuality see networking events, world events, since multi-platform see varying behaviour between clients, and varying by country.

    • Tee’d workload:

      • Take normal production traffic and pipe it off to a separate system.

      • Very useful for systems for which side effects could be constrained. Don’t want to tee traffic and do things that would affect the permanent state of a user or result in multiple messages going to users.

      • Erlang supports hot loading, so could be under a full production load, have an idea, compile, load the change as the program is running and instantly see if that change is better or worse.

      • Added knobs to change production load dynamically and see how it would affect performance. Would be tailing the sar output looking at things like CPU usage, VM utilization, listen queue overflows, and turn knobs to see how the system reacted.

    • True production loads:

      • Ultimate test. Doing both input work and output work.

      • Put server in DNS a couple of times so it would get double or triple the normal traffic. Creates issues with TTLs because clients don’t respect DNS TTLs and there’s a delay, so can’t quickly react to getting more traffic than can be dealt with.

      • IPFW. Forward traffic from one server to another so could give a host exactly the number of desired client connections. A bug caused a kernel panic so that didn’t work very well.

  • Results:

    • Started at 200K simultaneous connections per server.

    • First bottleneck showed up at 425K. System ran into a lot of contention. Work stopped. Instrumented the scheduler to measure how much useful work is being done, or sleeping, or spinning. Under load it started to hit sleeping locks so 35-45% CPU was being used across the system but the schedulers are at 95% utilization.

    • The first round of fixes got to over a million connections.

      • VM usage is at 76%. CPU is at 73%. BEAM emulator running at 45% utilization, which matches closely to user percentage, which is good because the emulator runs as user.

      • Ordinarily CPU utilization isn’t a good measure of how busy a system is because the scheduler uses CPU.

    • A month later tackling bottlenecks 2 million connections per server was achieved.

      • BEAM utilization at 80%, close to where FreeBSD might start paging. CPU is about the same, with double the connections. Scheduler is hitting contention, but running pretty well.

    • Seemed like a good place to stop so started profiling Erlang code.

      • Originally had two Erlang processes per connection. Cut that to one.

      • Did some things with timers.

    • Peaked at 2.8M connections per server

      • 571k pkts/sec, >200k dist msgs/sec

      • Made some memory optimizations so VM load was down to 70%.

    • Tried 3 million connections, but failed.

      • See long message queues when the system is in trouble. Either a single message queue or a sum of message queues.

      • Added to BEAM instrumentation on message queue stats per process. How many messages are being sent/received, how fast.

      • Sampling every 10 seconds, could see a process had 600K messages in its message queue with a dequeue rate of 40K with a delay of 15 seconds. Projected drain time was 41 seconds.

  • Findings:

    • Erlang + BEAM + their fixes – has awesome SMP scalability. Nearly linear scalability. Remarkable. On a 24-way box can run the system with 85% CPU utilization and it’s keeping up running a production load. It can run like this all day.

      • Testament to Erlang’s program model.

      • The longer a server has been up it will accumulate long running connections that are mostly idle so it can handle more connections because these connections are not as busy per connection.

    • Contention was biggest issue.

      • Some fixes were in their Erlang code to reduce BEAM’s contention issues.

      • Some patched to BEAM.

      • Partitioning workload so work didn’t have to cross processors a lot.

      • Time-of-day lock. Every time a message is delivered from a port it looks to update the time-of-day which is a single lock across all schedulers which means all CPUs are hitting one lock.

      • Optimized use of timer wheels. Removed bif timer

      • Check IO time table grows arithmetically. Created VM thrashing has the hash table would be reallocated at various points. Improved to use geometric allocation of the table.

      • Added write file that takes a port that you already have open to reduce port contention.

      • Mseg allocation is single point of contention across all allocators. Make per scheduler.

      • Lots of port transactions when accepting a connection. Set option to reduce expensive port interactions.

      • When message queue backlogs became large garbage collection would destabilize the system. So pause GC until the queues shrunk.

    • Avoiding some common things that come at a price.

      • Backported a TSE time counter from FreeBSD 9 to 8. It’s a cheaper to read timer. Fast to get time of day, less expensive than going to a chip.

      • Backported igp network driver from FreeBSD 9 because having issue with multiple queue on NICs locking up.

      • Increase number of files and sockets.

      • Pmcstat showed a lot of time was spent looking up PCBs in the network stack. So bumped up the size of the hash table to make lookups faster.

    • BEAM Patches

      • Previously mentioned instrumentation patches. Instrument scheduler to get utilization information, statistics for message queues, number of sleeps, send rates, message counts, etc. Can be done in Erlang code with procinfo, but with a million connections it’s very slow.

      • Stats collection is very efficient to gather so they can be run in production.

      • Stats kept at 3 different decay intervals: 1, 10, 100 second intervals. Allows seeing issues over time.

      • Make lock counting work for larger async thread counts.

      • Added debug options to debug lock counters.

    • Tuning

      • Set the scheduler wake up threshold to low because schedulers would go to sleep and would never wake up.

      • Prefer mseg allocators over malloc.

      • Have an allocator per instance per scheduler.

      • Configure carrier sizes start out big and get bigger. Causes FreeBSD to use super pages. Reduced TLB thrash rate and improves throughput for the same CPU.

      • Run BEAM at real-time priority so that other things like cron jobs don’t interrupt schedule. Prevents glitches that would cause backlogs of important user traffic.

      • Patch to dial down spin counts so the scheduler wouldn’t spin.

    • Mnesia

      • Prefer os:timestamp to erlang:now.

      • Using no transactions, but with remote replication ran into a backlog. Parallelized replication for each table to increase throughput.

    • There are actually lots more changes that were made.


  • Optimization is dark grungy work suitable only for trolls and engineers. When Rick is going through all the changes that he made to get to 2 million connections a server it was mind numbing. Notice the immense amount of work that went into writing tools, running tests, backporting code, adding gobs of instrumentation to nearly every level of the stack, tuning the system, looking at traces, mucking with very low level details and just trying to understand everything. That’s what it takes to remove the bottlenecks in order to increase performance and scalability to extreme levels.

  • Get the data you needWrite tools. Patch tools. Add knobs. Ken was relentless in extending the system to get the data they needed, constantly writing tools and scripts to the data they needed to manage and optimize the system. Do whatever it takes.

  • Measure. Remove Bottlenecks. Test. Repeat. That’s how you do it.

  • Erlang rocks! Erlang continues to prove its capability as a versatile, reliable, high-performance platform. Though personally all the tuning and patching that was required casts some doubt on this claim.

  • Crack the virality code and profit. Virality is an allusive quality, but as WhatsApp shows, if you do figure out, man, it’s worth a lot of money.

  • Value and employee count are now officially divorced. There are a lot of force-multipliers out in the world today. An advanced global telecom infrastructure makes apps like WhatsApp possible. If WhatsApp had to make the network or a phone etc it would never happen. Powerful cheap hardware and Open Source software availability is of course another multiplier. As is being in the right place at the right time with the right product in front of the right buyer.

  • There’s something to this brutal focus on the user idea. WhatsApp is focussed on being a simple messaging app, not being a gaming network, or an advertising network, or a disappearing photos network. That worked for them. It guided their no ads stance, their ability to keep the app simple while adding features, and the overall no brainer it just works philosohpy on any phone.

  • Limits in the cause of simplicity are OK. Your identity is tied to the phone number, so if you change your phone number your identity is gone. This is very un-computer like. But it does make the entire system much simpler in design.

  • Age ain’t no thing. If it was age discrimination that prevented WhatsApp co-founder Brian Acton from getting a job at both Twitter and Facebook in 2009, then shame, shame, shame.

  • Start simply and then customize. When chat was launched initially the server side was based on ejabberd. It’s since been completely rewritten, but that was the initial step in the Erlang direction. The experience with the scalability, reliability, and operability of Erlang in that initial use case led to broader and broader use.

  • Keep server count low. Constantly work to keep server counts as low as possible while leaving enough headroom for events that create short-term spikes in usage. Analyze and optimize until the point of diminishing returns is hit on those efforts and then deploy more hardware.

  • Purposely overprovision hardware. This ensures that users have uninterrupted service during their festivities and employees are able to enjoy the holidays without spending the whole time fixing overload issues.

  • Growth stalls when you charge money. Growth was super fast when WhatsApp was free, 10,000 downloads a day in the early days. Then when switching over to paid that declined to 1,000 a day. At the end of the year, after adding picture messaging, they settled on charging a one-time download fee, later modified to an annual payment.

  • Inspiration comes from the strangest places. Experience with forgetting the username and passwords on Skype accounts drove the passion for making the app “just work.”

(via HighScalability.com)

An interview with Steve Vinoski

Today you can read my interview to Steve Vinoski, a famous Erlang developer/speaker and distributed systems expert. Steve will give the talk “Addressing Network Congestion in Riak Clusters” at Erlang User Conference 2013.

Some questions, some answers

Paolo – Hi Steve! It’s really good to have one of the most famous Erlangers here in my blog. Would you mind to introduce yourself to our readers in a few words?

Steve – I’m Steve Vinoski, a member of the architecture group at Basho Technologies, the makers of Riak and RiakCS. I have a background in middleware and distributed systems, and have been an Erlang user since 2006.

Paolo – I know you are expert in several programming languages. How did you end up using Erlang? Did you have any previous experience with functional languages?

Steve – As far as functional languages go, I’ve played with them on and off for decades, but never used one in production until I found Erlang.

I worked in middleware from 1991 to 2007, and in 2004 at IONA Technologies I started looking into innovative ways of expanding our product line and reducing the cost of product development. IONA’s products were written in C++, which I’ve used since 1988 and so I am well aware of its complexity, and Java, which frankly I’ve never liked (I like the JVM but don’t like the Java language). Neither language lends itself to rapid development or easy maintenance. I built a prototype that layered Ruby over one of our C++ products that allowed for an order of magnitude decrease in the number of lines of code required to write client applications, and built another prototype that provided a JavaScript layer for writing server applications, but customers didn’t seem interested, and both approaches only increased development and maintenance costs.

Then I found Erlang/OTP. I grew more and more intrigued as I discovered that it already provided numerous features that we had spent years developing and maintaining in our middleware systems, things like internode messaging, node monitoring, naming and discovery, portability across multiple network stacks, logging, tracing, etc. Not only did it provide all the features we needed, but its features were much more powerful and elegant. I put together a proposal for the IONA executive team that suggested we rebuild all of our product servers in Erlang so we could reduce maintenance costs, but the proposal was rejected because, as I later learned, they were trying to sell the company so it didn’t make sense to make such large changes to the code. I left IONA and joined Verivue, where we built video delivery hardware, and there I trained seven or eight other developers in Erlang and we used it to great advantage. After Verivue, I wanted to continue working with Erlang, which is part of the reason I joined Basho.

Paolo – In your blog you state that Erlang is your favourite programming language. Why?

Steve – To me Erlang/OTP is the type of system my middleware colleagues and I spent years trying to create. It’s got so many things a distributed systems developer wants: easy access to networking, libraries and utilities to make
interacting with distributed nodes straightforward, wonderful concurrency support, all the upgrading and reliability capabilities, and the Erlang language itself is sort of a “distributed systems DSL” where its elegance and small size make it easy to learn and easy to use to quickly become productive building distributed applications. And as if that’s not enough, the Erlang community is great, pleasantly supporting each other and newcomers while avoiding pointless arguments and rivalries you find in other communities. My use of other programming languages has actually decreased in recent years due primarily to my continued satisfaction with Erlang/OTP — it’s not great for every problem, but it’s fantastic for the types of problems I tend to work on.

Paolo – I know that in a previous working experience you had to deal with multimedia systems, a field where Erlang has still a minor impact with respect to languages like C++. Do you think Erlang will be able to find its place in this field as well? Can you give reasons for your answer?

Steve – Erlang/OTP is excellent for server systems in general, including multimedia servers. The Verivue system I worked on a few years ago had special TCP offload hardware for video delivery, so we didn’t need Erlang for that. Rather, we used Erlang for the control plane, which for example handled incoming client requests, looked up subscriber details in databases, and interacted with the hardware to set up multimedia data flows. Multimedia systems also have to integrate with billing systems, monitoring systems, and hardware from other vendors, and Erlang shines there as well, especially when it comes to finding bugs in the other systems and hot-loading code to compensate for those bugs. Customers tend to love you when you can quickly turn around fixes like that.

Another Erlang developer, Max Lapshin, built and supports erlyvideo, which seems to work well. I’ve never met Max but I know he faced some challenges along the way, as we did at Verivue, but I think he’s generally happy with how erlyvideo has turned out.

Paolo – Currently you are working at Basho, a very important company in the Erlang world. Do you mind telling our readers something more about your job?

Steve – At Basho I work in CTO Justin Sheehy’s architecture group. It’s a broad role with a lot of freedom to speak at and attend conferences and meetups, and I also work on research projects and pick up development tasks and projects from our Engineering team and Professional Services team when they need my help.

Paolo – At Erlang User Conference 2013 you will give a talk about Riak, its behaviour under extreme loads and the issues we may face when we want to scale it. Can you tell us something more about the topic?

Steve – At Basho we’re fortunate to have customers who continually push the boundaries of Riak’s comfort zone. Network traffic in Riak all goes over TCP — client requests, intracluster messages, and distributed Erlang communication. When clusters are extremely busy with client requests and transfer of data and messages between nodes, under certain conditions network throughput can drop significantly and messages can be lost, including messages intended for client applications. I am currently investigating the use of alternative network protocols to see if they can help prioritize different kinds of network traffic. This work is not yet finished, so my talk will give an overview of the problems along with the current status of the solution I’m investigating.

Paolo – I heard that you will also introduce during the talk a new Erlang network driver that should tackle some of this issues. Is this correct? Can you give us an insight?

Steve – Yes, I have been working on a new network driver. It implements an alternative UDP-based protocol for data transfer that can utilize full bandwidth when available but can also watch for congestion on network switches and quickly back off when detected. It also yields to TCP traffic under congestion conditions, preventing background data transfer tasks from shutting out more important messages like client requests and responses.

Paolo – Who should be interested in this talk? What are the minimum requisites needed in order to fully understand the topics of the talk?

Steve – Attendees should have a high-level understanding of Erlang’s architecture, what drivers are, and how they fit into the system. Other than that, my talk will explain in detail the problems I’m trying to address as well as the solution I’ve been investigating, so neither deep networking expertise nor deep understanding of Erlang internals is required.

Paolo – I can say without doubts that you are an expert in middleware and distributed computing systems. Can you suggest to our readers interested in those topics some books or internet resources?

Steve – The nice thing about distributed systems is that they never seem to get any easier, so there have been interesting research and development in this area for decades. The downside of that is that there are an enormous number of papers I could point to. In no particular order, here are some interesting papers and articles, most of which are currently sitting open in my browser tabs:

“Eventual Consistency Today: Limitations, Extensions, and Beyond”, Peter Bailis, Ali Ghodsi. This article provides an excellent description of eventual consistency and
recent work on eventually consistent systems.

“A comprehensive study of Convergent and Commutative Replicated Data Types”, M. Shapiro, N. Preguiça, C. Baquero, M. Zawirski. This paper explores and details data types that work well for applications built on eventually consistent systems.

“Notes on Distributed Systems for Young Bloods”, J. Hodges. This excellent blog post succinctly summarizes the past few decades of
distributed systems research and discoveries, and also explains some implementation concerns we’ve learned along the way to keep in mind when build distributed applications.

“Impossibility of Distributed Consensus with One Faulty Process”, M.Fischer, N. Lynch, M. Paterson. This paper is nearly 30 years old but is critical to understanding fundamental properties of distributed systems.

“Dynamo: Amazon’s Highly Available Key-value Store”, G. DeCandia, et al. A classic paper detailing trade-offs for high availability distributed systems.

Paolo – Day-by-day Erlang becomes more popular. In your opinion what can we expect from Erlang in the future? What are the next goals the Erlang community should try to reach?

Steve – Under the guidance of Ericsson’s OTP team and with valuable input from the open source community, Erlang/OTP continues to evolve gracefully to address production systems. I expect Erlang will continue to improve as a language
and platform for building large-scale systems that perform well and are relatively easy to understand, reason about, and maintain without requiring an army of developers. In particular I’m looking forward to the OTP team’s
continued work on optimizing multicore Erlang process scheduling. The Erlang community is very good at proving how good Erlang/OTP is through the results of the systems they build, so they need to keep doing that to broaden Erlang’s appeal. If you’re a developer building practical open source or commercial software, the presentations given by community members at events like the Erlang User Conference and the Erlang Factory conferences are amazing sources of knowledge and wisdom for what works well for Erlang/OTP applications and what can be problematic.

(Source: Paolo D’Incau’s Blog)

Dynamo Sure Works Hard

We tend to think of working hard as a good thing. We value a strong work ethic and determination is the face of adversity. But if you are working harder than you should to get the same results, then it’s not a virtue, it’s a waste of time and energy. If it’s your business systems that are working harder than they should, it’s a waste of your IT budget.

Dynamo based systems work too hard. SimpleDB/DynamoDB, Riak, Cassandra and Voldemort are all based, at least in part, on the design first described publicly in the Amazon Dynamo Paper. It has some very interesting concepts, but ultimately fails to provide a good balance of reliability, performance and cost. It’s pretty neat in that each transaction allows you dial in the levels of redundancy and consistency to trade off performance and efficiency. It can be pretty fast and efficient if you don’t need any consistency, but ultimately the more consistency you want the more have to pay for it via a lot of extra work.

Network Partitions are Rare, Server Failures are Not

… it is well known that when dealing with the possibility of network failures, strong consistency and high data availability cannot be achieved simultaneously. As such systems and applications need to be aware which properties can be achieved under which conditions.

For systems prone to server and network failures, availability can be increased by using optimistic replication techniques, where changes are allowed to propagate to replicas in the background, and concurrent, disconnected work is tolerated. The challenge with this approach is that it can lead to conflicting changes which must be detected and resolved. This process of conflict resolution introduces two problems: when to resolve them and who resolves them. Dynamo is designed to be an eventually consistent data store; that is all updates reach all replicas eventually.

– Amazon Dynamo Paper

The Dynamo system is a design that treats the probability of a network switch failure as having the same probability of machine failure, and pays the cost with every single read. This is madness. Expensive madness.

Within a datacenter, the Mean Time To Failure (MTTF) for a network switch is one to two orders of magnitude higher than servers, depending on the quality of the switch. This is according to data from Google about datacenter server failures, and the publish numbers of the MTBF of Cisco switches (There is a subtle difference between MTBF and MTTF, but for our purposes we can treat them the same)

It is claimed that when W + R > N you can get consistency. But it’s not true, because without distributed ACID transactions, it’s never possible to achieve W > 1 atomically.

Consider W=3, R=1 and N=3. If a network failure or more likely a client/app tier failure (hardware, OS or process crash) happens during the writing of data, it’s possible for only replica A to receive the write, with a lag until the cluster notices and syncs up. Then another client with R = 1 can do two consecutive reads, getting newer data first from a node A, and older data next from node B for the same key. But you don’t even need a failure or crash, once the first write occurs there is always a lag for the next server(s) to receive the write. It’s possible for a fast client to do the same read 2 times again, getting a newer version from one server, then an older version from another.

What is true is that if R > N / 2, then you get consistency where it’s not possible to read in a newer value, then a subsequent read get’s an older value.

For the vast majority of applications, it’s okay for a failure leading to temporary unavailability. Amazon believes it’s shopping cart is so important to capture writes it’s worth the cost of quorum reads, or inconsistency. Perhaps. But the problems and costs multiply. If you are doing extra reads to achieve high consistency, then you are putting extra load on each machine, requiring extra server hardware and extra networking infrastructure to provide the same baseline performance. All of this can increase the frequency of a component failure and increases operational costs (hardware, power, rack space and the personnel to maintain it all).

A Better Way?

What if a document had 1 master and N replicas to write to, but only a single master to read from? Clients know based on the document key and topology map which machine serves as the master. That would make the reads far cheaper and faster. All reads and writes for a document go to the same master, with writes replicated to replicas (which also serve as masters for other documents, each machine is both a master and replica).

But, you might ask, how do I achieve strong consistency if the master goes down or becomes unresponsive?

If when that happens, the cluster also notices the machine is unresponsive or too slow and removes it out of the cluster and fails over to a new master. Then the client tries again and has a successful read.

But, you might ask, what if the client asks the wrong server for a read?

If all machines in the cluster know their role and only one machine in the cluster can be a document master at any time, and the cluster manager (a regular server node elected by Paxos consensus) makes sure to remove the old master, and then assign the new master, and then tell the client about the new topology. Then the client updates it’s topology map, and retries at the new master.

But, you might ask, what if the topology has changed again, and the client again asks to read from the wrong server?

Then this wrong server will let the client know. The client will reload the topology maps, and re-request from the right server. If the right master server isn’t really right any more because of another topology change, it will reload and retry again. It will do this as many times as necessary, but typically it happens only once.

But, you might ask, what if there is a network partition, and the client is on the wrong (minor) side of the partition, and reads from a master server that doesn’t know it’s not a master server anymore?

Then it gets a stale read. But only for a little while, until the server itself realizes it’s no longer in heartbeat contact with the majority of the cluster. And partitions like this are the among the rarest form a cluster failure. It will require a network failure, and for the client to be on the wrong side of the partition.

But, you might ask, what if there is a network partition, and the client is on the wrong (smaller) side of the partition, and WRITES to a server that doesn’t know it’s not a master server anymore?

Then the write is lost. But if the client wanted true multi-node durability, then the write wouldn’t have succeeded (the client would timeout waiting for replicas(s) to receive the update) and the client wouldn’t unknowingly lose data.

What I’m describing is the Couchbase clustering system.

Let’s Run Some Numbers

Given the MTTF of a server, how much hardware and how quickly must the cluster failover to a new master and still meet our SLAs requirements vs a Dynamo based system?

Let’s start with some assumptions:

We want to achieve 4000 transactions/sec with 3 node replication factor. Our load mix is 75% reads/25% writes.

We want to have some consistency, so that we don’t read newer values, then older values, so for Dynamo:

R = 2, W = 2, N = 3

But for Couchbase:

R = 1, W = 1, N = 3

This means for a Dynamo style cluster, the load will be:
Read transactions/sec: 9000 reads (reads spread over 3 nodes)
Write transactions/sec: 3000 writes (writes spread over 3 nodes)

This means for a Couchbase style cluster, the load will be:
Read transactions/sec: 3000 reads (each document read only on 1 master node, but all document masters evenly spread across 3 nodes)
Write transaction/sec: 3000 writes (writes spread over 3 nodes)

Let’s assume both systems are equally as reliable at the machine level. Google’s research indicates in their datacenter each server has a MTTF of 3141 hrs or 2.7 failures per year. Google also reports a rack failure (usually power supply) of 10.2 years, roughly 30x a reliable as a server, so we’ll ignore that to make the analysis simpler. (This is from Googles paper studying server failures here)

The MTBF of Cisco network switch is published at 54,229 hrs on the low end, to 1,023,027 hrs on the high end. For our purposes, we’ll ignore switch failures, since the failures affects availability and consistency of both system about the same, and it’s 1 to 2 orders of magnitude rarer than a server failure. (This is from a Cisco product spreadsheet here)

Assume we want to meet a latency SLA 99.9% of the time (the actual latency SLA threshold number doesn’t matter here).

On Dynamo, that means each node can fail SLA the 1.837% of the time. Because it queries 3 nodes, but only uses the values from the first 2 nodes and the chances of SLA failure are the same across nodes, the formula is different (only two must meet the SLA):

0.0001 = (3 − 2 * P) * P ^ 2


P = 0.001837

On Couchbase, if a master node fails, it must recognize it and fail it out. Given Google’s MTTF failure above and it can fail out a node in 30 secs, and let’s say it will take 4.5 minutes for it warm up the RAM cache, given 2.7 failures/year with 5 minutes of downtime for each before a failover completes, then queries will fail 0.00095% of time due to node failure.

For Couchbase meet the same SLA:

0.0001 = P(SlaFail) + P(NodeFail) - (P(SlaFail) * P(NodeFail))

0.0001 = P(SlaFail) + 0.0000095 - (P(SlaFail) * 0.0000095)

0.0001 ~= 0.00009 + 0.0000095 - (0.00009 * 0.0000095)

Note: Some things I’m omitting from the analysis are when a Dynamo node fails the lower latency requirement from meeting the SLA for 2 nodes vs. 3 (it would drop from 1.837% to ~0.05%), and also the increased work on the remaining servers when a Couchbase server fails. Both are only temporary and go away when a new server is added back and initialized in the cluster, and shouldn’t change the numbers significantly. Also there is the time to add in a new node and rebalance load on it. At Couchbase we work very hard to make that as fast and efficient as possible. I’ll assume Dynamo systems do the same, that the cost is the same and omit it, though I think we are the leaders in rebalance performance.

With this analysis, a Couchbase node can only fail it’s SLA 0.9% of the requests, and a Dynamo node can fail it 1.837%. Sounds good for Dynamo, but it must do for 2X the throughput per node on 3x the data, and with 2x the total network traffic. And for very low latency response times (our customers often want sub-millisecond latency) typically meeting the SLA means a DBMS must a large amount of relevant data and metadata in RAM, because there is a huge cost for random disk fetches on latency. With disk fetches 2 orders of magnitude slower on SSDs (100x), and 4 orders of magnitude slower on HDDs (10000x) the disk accesses pile up faster without much more RAM, so do the latencies.

So each Dynamo node can fail it’s SLA at a higher rate is very small win when it will still need to keep nearly 3X the working set ready in memory because each node will be serving 3x the data at all times for read requests (it can fail it’s SLA slightly more often, so it’s actually about 2.97x the necessary RAM), and will use 2x the network capacity.

Damn Dynamo, you sure do work hard!

Now Couchbase isn’t perfect either, far from it. Follow me on twitter @damienkatz. I’ll be posting more about the Couchbase shortcomings and capabilities, and technical roadmap soon.

(via planeterlang.org)