Concurrency in Erlang & Scala: The Actor Model

Applications are becoming increasingly concurrent, yet the traditional way of doing this, threads and locks, is very troublesome. This article highlights Erlang and Scala, two programming languages that use a different approach to concurrency: the actor model.

The continuous increase of computer processor clock rate has recently slowed down, in part due to problems with heat dissipation. As the clock rate of a processor is increased, the heat generated increases too. Increasing current clock rates even further is troublesome, as the heat generated would push computer chips towards the limits of what is physically possible. Instead, processor manufacturers have turned towards multi-core processors, which are processors capable of doing multiple calculations in parallel. There has been an increased interest in software engineering techniques to fully utilize the capabilities offered by these processors.

At the same time, applications are more frequently built in a distributed way. In order to perform efficiently, without wasting most of the time on waiting for I/O operations, applications are forced to do more and more operations concurrently, to obtain maximum efficiency.

However, reasoning about concurrent systems can be far from trivial, as there can be a large number of interacting processes, with no predefined execution order. This paper discusses the concurrency model in Erlang and Scala, two languages that have recently gained in popularity, in part due to their support for scalable concurrency. This first section introduces both languages, by describing their key features.

Erlang

Erlang is a concurrent programming language, originally designed by Ericsson (Erlang is named after A. K. Erlang, a Danish mathematician and Ericsson:Ericsson Language). It was designed to be distributed and fault-tolerant, for use in highly-available (non-stop) soft real-time telecom applications.

Erlang is a pure functional language, it features single assignment and eager evaluation. It also has built-in language constructs for distribution and concurrency. Erlang has a dynamic type system, where the typing of expressions is checked at run-time [1]. Typing information is fully optional and only used by the static type-checker, the run-time environment even allows running applications with invalid typing specifications. Run-time type checking is always performed by looking at the types of the data itself. This usually allows applications to run correctly, even when encountering unexpected data types.

The listings below show a simple program, written in Erlang and its execution in the Erlang shell. This program performs a recursive calculation of the factorial function.

A simple program in Erlang
1
2
3
4
5
-module(factorial).
-export([fact/1]).

fact(0) -> 1;
fact(N) -> N * fact(N-1).
Using the Erlang program
1
2
3
4
5
6
7
8
9
10
$ erl
Erlang (BEAM) emulator version 5.6.3 [source] [async-threads:0]
                                     [hipe] [kernel-poll:false]

Eshell V5.6.3  (abort with ^G)
1> c(factorial).
{ok,factorial}
2> factorial:fact(8).
40320
3>

To allow for non-stop application, Erlang support hot-swapping of code. This means that it is possible to replace code while executing a program, making it possible to do upgrades and maintenance without interrupting the running system.

Scala

Scala, which stands for Scalable Language, is a programming language that aims to provide both object-oriented and functional programming styles, while staying compatible with the Java Virtual Machine (JVM). It was designed in 2001 by Martin Odersky and his group at EPFL in Lausanne, Switzerland, by combining the experiences gathered from designing multiple other languages.

The Scala language has been designed to be scalable [2], as it is supposed to scale along with the needs of its users. Offering functional constructs allows the developer to write short and concise code, whereas having object-oriented concepts allows the language to be used for large complex projects. Scala is fully interoperable with the Java language, which allows developers to use all Java libraries from within Scala. As such, it is not a separate language community, but rather allows you to take advantage of the enormous Java ecosystem. There is also an initiative to build a Scala version that runs on top of the .NET Common Language Runtime (CLR) [3], this ensures the portability of Scala to other underlying platforms.

Scala tries to stay to stay close to pure object-oriented programming and therefore does not have constructs like static fields and methods. Every value in Scala is an object and every operation is a method call [4]. Scala allows you to define your own operators and language constructs. This makes it possible to extend the Scala language according to your specific needs, which again helps it to grow (scale up) along with the project.

Scala uses strict-typing, yet allows most of the typing to be unspecified. When type information is not specified, the compiler will do smart type inference and attempt to infer this information from the code itself. This save programming effort and allows for more generic code. Type information is only required when the compiler cannot prove that correct type usage will happen.

The listing below shows a simple program in Scala, which shows some of its functional features, as well as some object-oriented features. As can be seen, an object with a method is defined. The definition of this method however is done in a pure functional style. There is no need to write a return statement, as the value of the function is considered to be the return statement.

A simple program in Scala
1
2
3
4
5
6
7
8
9
object TestFactorial extends Application {
    def fact(n: Int): Int = {
        if (n == 0) 1
        else n * fact(n-1)
    }

    println("The factorial of 8 is "+fact(8))
    // Output: The factorial of 8 is 40320
}

 

A different way of concurrency: The Actor Model

The problem with threads

The traditional way of offering concurrency in a programming language is by using threads. In this model, the execution of the program is split up into concurrently running tasks. It is as if the program is being executed multiple times, the difference being that each of these copies operated on shared memory.

This can lead to a series of hard to debug problems, as can be seen below. The first problem, on the left, is the lost-update problem. Suppose two processes try to increment the value of a shared object acc. They both retrieve the value of the object, increment the value and store it back into the shared object. As these operations are not atomic, it is possible that their execution gets interleaved, leading to an incorrectly updated value of acc, as shown in the example.

The solution to this problems is the use of locks. Locks provide mutual exclusion, meaning that only one process can acquire the lock at the same time. By using a locking protocol, making sure the right locks are acquired before using an object, lost-update problems are avoided. However, locks have their own share of problems. One of them is the deadlock problem, which is pictured on the right. In this example two processes try to acquire the same two locks A and B. When both do so, but in a different order, a deadlock occurs. Both wait on the other to release the lock, which will never happen.

These are just some of the problems that might occur when attempting to use threads and locks.

Lost Update Problem

Process 1 Process 2
a = acc.get()
a = a + 100 b = acc.get()
b = b + 50
acc.set(b)
acc.set(a)

Deadlock Problem

Process 1 Process 2
lock(A) lock(B)
lock(B) lock(A)

… Deadlock! …

 

Both Erlang and Scala take a different approach to concurrency: the Actor Model. It is necessary to look at the concepts of the actor model first, before studying the peculiarities of the languages itself.

The Actor Model

The Actor Model, which was first proposed by Carl Hewitt in 1973 [5] and was improved, among others, by Gul Agha [6]. This model takes a different approach to concurrency, which should avoid the problems caused by threading and locking.

In the actor model, each object is an actor. This is an entity that has a mailbox and a behaviour. Messages can be exchanged between actors, which will be buffered in the mailbox. Upon receiving a message, the behaviour of the actor is executed, upon which the actor can: send a number of messages to other actors, create a number of actors and assume new behaviour for the next message to be received.

Of importance in this model is that all communications are performed asynchronously. This implies that the sender does not wait for a message to be received upon sending it, it immediately continues its execution. There are no guarantees in which order messages will be received by the recipient, but they will eventually be delivered.

A second important property is that all communications happen by means of messages: there is no shared state between actors. If an actor wishes to obtain information about the internal state of another actor, it will have to use messages to request this information. This allows actors to control access to their state, avoiding problems like the lost-update problem. Manipulation of the internal state also happens through messages.

Each actor runs concurrently with other actors: it can be seen as a small independently running process.

Actors in Erlang

In Erlang, which is designed for concurrency, distribution and scalability, actors are part of the language itself. Due to its roots in the telecom industry, where a very large amount of concurrent processes are normal, it is almost impossible to think of Erlang without actors, which are also used to provide distribution. Actors in Erlang are called processes and are started using the built-in spawn function.

A simple application that uses an actor can be seen below. In this application, an actor is defined which acts as a basic counter. We send 100.000 increment messages to the actor and then request it to print its internal value.

Actors in Erlang
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
-module(counter).
-export([run/0, counter/1]).

run() ->
    S = spawn(counter, counter, [0]),
    send_msgs(S, 100000),
    S.

counter(Sum) ->
    receive
        value -> io:fwrite("Value is ~w~n", [Sum]);
        {inc, Amount} -> counter(Sum+Amount)
    end.

send_msgs(_, 0) -> true;
send_msgs(S, Count) ->
    S ! {inc, 1},
    send_msgs(S, Count-1).

% Usage:
%    1> c(counter).
%    2> S = counter:run().
%       ... Wait a bit until all children have run ...
%    3> S ! value.
%    Value is 100000

Lines 1 & 2 defines the module and the exported functions. Lines 4 till 7 contain the run function, which starts a counter process and starts sending increment messages. Sending these messages happens in lines 15 till 18, using the message-passing operator (!). As Erlang is a purely functional language, it has no loop structures. Therefore, this has to be expressed using recursion. These extremely deep recursion stacks would lead to stack overflows in Java, yet Erlang is optimized for these usage patterns. The increment message in this example also carries a parameter, to show Erlangs parameter capabilities. The state of the counter is also maintained using recursion: upon receiving an inc message, the counter calls itself with the new value which causes it to receive the next message. If no messages are available yet, the counter will block and wait for the next message.

Actor scheduling in Erlang

Erlang uses a preemptive scheduler for the scheduling of processes [7]. When they have executed for a too long period of time (usually measured in the amount of methods invoked or the amount of CPU-cycles used), or when they enter a receive statement with no messages available, the process is halted and placed on a scheduling queue.

This allows for a large number of processes to run, with a certain amount of fairness. Long running computations will not cause other processes to become unresponsive.

Starting with release R11B, which appeared in May 2006, the Erlang run-time environment has support for symmetric multiprocessing (SMP) [8]. This means that it is able to schedule processes in parallel on multiple CPUs, allowing it to take advantage of multi-core processors. The functional nature of Erlang allows for easy parallelization. An Erlang lightweight process (actor) will never run in parallel on multiple processors, but using a multi-threaded run-time allows multiple processes to run at the same time. Big performance speedups have been observed using this technique.

Actors in Scala

Actors in Scala are available through the scala.actors library. Their implementation is a great testament for the expressiveness of Scala: all functionality, operators and other language constructs included, is implemented in pure Scala, as a library, without requiring changes to Scala itself.

The same sample application, this time written in Scala can be seen below:

 

Actors in Scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import scala.actors.Actor
import scala.actors.Actor._

case class Inc(amount: Int)
case class Value

class Counter extends Actor {
    var counter: Int = 0;

    def act() = {
        while (true) {
            receive {
                case Inc(amount) =>
                    counter += amount
                case Value =>
                    println("Value is "+counter)
                    exit()
            }
        }
    }
}

object ActorTest extends Application {
    val counter = new Counter
    counter.start()

    for (i <- 0 until 100000) {
        counter ! Inc(1)
    }
    counter ! Value
    // Output: Value is 100000
}

 

We can see the following code: Lines 1 & 2 import the abstract Actor class (Note: This is actually a trait, a special composition mechanism used by Scala, but this is out of the scope of this article.) and its members (we need the ! operator for sending messages). Lines 4 & 5 define the Inc and Value} case classes, which will be used as message identifiers. The increment message has a parameter, as an example to demonstrate this ability.

Lines 7 till 21 define the Counter actor, as a subclass of Actor. The act() method is overridden, which provides the behavior of the actor (lines 10-20). This version of the counter actor is written using a more object-oriented style (though Scala fully supports the pure functional way, as shown in the Erlang example too). The state of the actor is maintained in an integer field counter. In the act() method, an endless receive loop is executed. This block processes any incoming messages, either by updating the internal state, or by printing its value and exiting.

Finally, on lines 23 till 32, we find the main application, which first constructs a counter, then sends 100.000 Inc messages and finally send it the Value message. The ! operator is used to send a message to an actor, a notation that was borrowed from Erlang.

The output of this program shows that the counter has been incremented up to a value of 100.000. This means that in this case all our messages were delivered in order. This might not always be the case: recall that there are no guarantees on the order of message delivery in the actor model.

The example above shows the ease with which actors can be used in Scala, even though they are not part of the language itself.

It also shows the similarities between the actors library in Scala and the Erlang language constructs. This is no coincidence, as the Scala actors library was heavily inspired by Erlang. The Scala developers have however expanded upon Erlangs concepts and added a number of features, which will be highlighted in the following sections.

Replies

The authors of scala.actors noticed a recurring pattern in the usage of actors: a request/reply pattern [9]. This pattern is illustrated below. Often, a message is sent to an actor and in that message, the sender is passed along. This allows the receiving actor to reply to the message.

To facilitate this, a reply() construct was added. This removes the need to send the sender along in the message and provides easy syntax for replying.

Normal request/reply
1
2
3
4
5
receive {
    case Msg(sender, value) =>
        val r = process(value)
        sender ! Response(r)
}
Request/reply using reply()
1
2
3
4
5
receive {
    case Msg(value) =>
        val r = process(value)
        reply(Response(r))
}

reply() construct is not present in Erlang, where you are forced to include the sender each time you want to be able to receive replies. This is not a bad thing however: Scala messages always carry the identity of the sender with them to enable this functionality. This causes a tiny bit of extra overhead, which might be too much in performance critical applications.

Synchronous messaging

Scala contains a construct which can be used to wait for message replies. This allows for synchronous invocation, which is more like method invocation. The syntax to do this is shown below, to the right, contrasted by the normal way of writing this to the left.

When entering the receive block, or upon using the !? operator, the actor waits until it receives a message matched by any of the case clauses. When the actor receives a message that is not matched, it will stay in the mailbox of the actor and retried when a new receive block is entered.

Waitin for a reply
1
2
3
4
myService ! Msg(value)
receive {
    case Response(r) => // ...
}
Synchronous invocation using !?
1
2
3
myService !? Msg(value) match {
    case Response(r) => // ...
}

Channels

Messages, as used in the previous examples, are used in a somewhat loosely typed fashion. It is possible to send any kind of message to an actor. Messages that are not matched by any of the case clauses will remain in the mailbox, rather than causing an error.

Scala has a very rich type system and the Scala developers wanted to take advantage of this. Therefore they added the concept of channels [9]. These allow you to specify the type of messages that can be accepted using generics. This enables type-safe communication.

The mailbox of an actor is a channel that accepts any type of message.

Thread-based vs. Event-based actors

Scala makes the distinction between thread-based and event-based actors.

Thread-based actors are actors which each run in their own JVM thread. They are scheduled by the Java thread scheduler, which uses a preemptive priority-based scheduler. When the actor enters a receive block, the thread is blocked until messages arrive. Thread-based actors make it possible to do long-running computations, or blocking I/O operations inside actors, without hindering the execution of other actors.

There is an important drawback to this method: each thread can be considered as being heavy-weight and uses a certain amount of memory and imposes some scheduling overhead. When large amounts of actors are started, the virtual machine might run out of memory or it might perform suboptimal due to large scheduling overhead.

In situations where this is unacceptable, event-based actors can be used. These actor are not implemented by means of one thread per actor, yet instead they run on the same thread. An actor that waits for a message to be received is not represented by a blocked thread, but by a closure. This closure captures the state of the actor, such that it’s computation can be continued upon receiving a message [10]. The execution of this closure happens on the thread of the sender.

Event-based actors provide a more light-weight alternative, allowing for very large numbers of concurrently running actors. They should however not be used for parallelism: since all actors execute on the same thread, there is no scheduling fairness.

A Scala programmer can use event-based actors by using a react block instead of a receive block. There is one big limitation to using event-based actors: upon entering a react block, the control flow can never return to the enclosing actor. In practice, this does not prove to be a severe limitation, as the code can usually be rearranged to fit this scheme. This property is enforced through the advanced Scala type system, which allows specifying that a method never returns normally (the Nothing type). The compiler can thus check if code meets this requirement.

Actor scheduling in Scala

Scala allows programmers to mix thread-based actors and event-based actors in the same program: this way programmers can choose whether they want scalable, lightweight event-based actors, or thread-based actors that allow for parallelism, depending on the situation in which they are needed.

Scala uses a thread pool to execute actors. This thread pool will be resized automatically whenever necessary. If only event-based actors are used, the size of this thread pool will remain constant. When blocking operations are used, like receive blocks, the scheduler (which runs in its own separate thread) will start new threads when needed. Periodically, the scheduler will check if there are runnable tasks in the task queue, it will then check if all worker threads are blocked and start a new worker thread if needed.

Evaluation

The next section will evaluate some of the features in both languages. It aims to show that while some of these features facilitate the implementation, their power also comes with a risk. One should be aware of the possible drawbacks of the chosen technologies, to avoid potential pitfalls.

The dangers of synchronous message passing

The synchronous message passing style available in Scala (using !?) provides programmers with a convenient way of doing messaging round-trips. This allows for a familiar style of programming, similar to remote method invocation.

It should however be used with great care. Due to the very selective nature of the match clause that follows the use of !? (it usually matches only one type of message), the actor is effectively blocked until a suitable reply is received. This is implemented using a private return channel [9], which means that the progress of the actor is fully dependent on the actor from which it awaits a reply: it cannot handle any messages other than the expected reply, not even if they come from the actor from which it awaits reply.

This is a dangerous situation, as it might lead to deadlocks. To see this, consider the following example:

Actor deadlock: Actor A
1
2
3
4
5
6
7
8
9
actorB !? Msg1(value) match {
    case Response1(r) =>
      // ...
}

receive {
    case Msg2(value) =>
      reply(Response2(value))
}
Actor deadlock: Actor B
1
2
3
4
5
6
7
8
9
ctorA !? Msg2(value) match {
    case Response2(r) =>
      // ...
}

receive {
    case Msg1(value) =>
      reply(Response1(value))
}

In this example, each actor sends a message to the other actor. It will never receive an answer, as the actor is first awaiting a different message. If it were implemented using a message loop, like you would generally do in Erlang, no problem would arise. This is shown below. This does not mean that synchronous message passing should be avoided: in certain cases, it is necessary and in these cases the extra syntax makes this much easier to program. It is however important to be aware of the potential problems caused by this programming style.

Safe loop: Actor A
1
2
3
4
5
6
7
8
9
actorB ! Msg1(value)
while (true) {
    receive {
        case Msg2(value) =>
          reply(Response2(value))
        case Response1(r) =>
          // ...
    }      
}
Safe loop: Actor B
1
2
3
4
5
6
7
8
9
actorA ! Msg2(value)
while (true) {
    receive {
        case Msg1(value) =>  
          reply(Response1(value))
        case Response2(r) =>
          // ...
    }      
}

The full source code for these examples can be found online, see the appendix for more information.

Safety in Scala concurrency

Another potential pit-fall in Scala comes from the fact that it mixes actors with object-oriented programming. It is possible to expose the internal state of an actor through publicly available methods for retrieving and modifying this state. When doing so, it is possible to modify an object by directly invoking its methods, that is: without using messages. Doing so means that you no longer enjoy the safety provided by the actor model.

Erlang on the other hand, due to its functional nature, strictly enforces the use of messages between processes: there is no other way to retrieve and update information in other processes.

This illustrates possibly the biggest trade-off between Erlang and Scala: having a pure functional language, like Erlang, is safe, but more difficult for programmers to use. The object-oriented, imperative style of Scala is more familiar and makes programming easier, yet requires more discipline and care to produce safe and correct programs.

Summary

This article described the actor model for the implementation of concurrency in applications, as an alternative to threading and locking. It highlighted Erlang and Scala, two languages with an implementation of the actor model and showed how these languages implement this model.

Erlang is a pure functional language, providing little more than the basic features of functional languages. This should certainly not be seen as a weakness though: this simplicity allows it to optimize specifically for the cases for which it was defined as well as implement more advanced features like hot-swapping of code.

Scala on the other hand uses a mix of object-oriented and functional styles. This makes it easier for a programmer to write code, especially given the extra constructs offered by Scala, but this flexibility comes with a warning: discipline should be used to avoid inconsistencies.

The differences between these languages should be seen and evaluated in their design context. Both however provide an easy to use implementation of the actor model, which greatly facilitates the implementation of concurrency in applications.

 

References

Appendix: Source code + PDF

The source code for the sample programs and a PDF version can be found right here (tar.gz download). It has been tested on Ubuntu Linux 8.10, with Erlang 5.6.3 and Scala 2.7.2 final (installed from Debian packages). It should work on any platform.

(Via rocketeer.be)

CloudI: Bringing Erlang’s Fault-Tolerance to Polyglot Development

Clouds must be efficient to provide useful fault-tolerance and scalability, but they also must be easy to use.

CloudI (pronounced “cloud-e” /klaʊdi/) is an open source cloud computing platform built in Erlang that is most closely related to the Platform as a Service (PaaS) clouds. CloudI differs in a few key ways, most importantly: software developers are not forced to use specific frameworks, slow hardware virtualization, or a particular operating system. By allowing cloud deployment to occur without virtualization, CloudI leaves development process and runtime performance unimpeded, while quality of service can be controlled with clear accountability.

What makes a cloud a cloud?

The word “cloud” has become ubiquitous over the past few years. And its true meaning has become somewhat lost. In the most basic technological sense, these are the properties that a cloud computing platform must have:

And these are the properties that we’d like a cloud to have:

  • Easy integration
  • Simple deployment

My goal in building CloudI was to bring together these four attributes.

It’s important to understand that few programming languages can provide real fault-tolerance with scalability. In fact, I’d say Erlang is roughly alone in this regard.

I began by looking at the Erlang programming language (on top of which CloudI is built). The Erlang virtual machine provides fault-tolerance features and a highly scalable architecture while the Erlang programming language keeps the required source code small and easy to follow.

It’s important to understand that few programming languages can provide real fault-tolerance with scalability. In fact, I’d say Erlang is roughly alone in this regard. Let me take a detour to explain why and how.

toptal-blog-image-1373446088602

What is fault-tolerance in cloud computing?

Fault-tolerance is robustness to error. That is, fault-tolerant systems are able to continue operating relatively normally even in the event of (hopefully isolated) errors.

Here, we see service C sending requests to services A and B. Although service B crashes temporarily, the rest of the system continues, relatively unimpeded.

Erlang tutorial for beginners

Erlang is known for achieving 9x9s of reliability (99.9999999% uptime, so less than 31.536 milliseconds of downtime per year) with real production systems (within the telecommunications industry). Normal web development techniques only achieve 5x9s reliability (99.999% uptime, so about 5.256 minutes of downtime per year), if they are lucky, due to slow update procedures and complete system failures. How does Erlang provide this advantage?

The Erlang virtual machine implements what is called an “Actor model”, a mathematical model for concurrent computation. Within the Actor model, Erlang’s lightweight processes are a concurrency primitive of the language itself. That is, within Erlang, we assume that everything is an actor. By definition, actors perform their actions concurrently; so if everything is an actor, we get inherent concurrency. (For more on Erlang’s Actor model, there’s a longer discussion here.)

As a result, Erlang software is built with many lightweight processes that keep process state isolated while providing extreme scalability. When an Erlang process needs external state, a message is normally sent to another process, so that the message queuing can provide the Erlang processes with efficient scheduling. To keep the Erlang process state isolated, the Erlang virtual machine does garbage collection for each process individually so that other Erlang processes can continue running concurrently without being interrupted.

The Erlang virtual machine garbage collection is an important difference when compared with Java virtual machine garbage collection because Java depends on a single heap, which lacks the isolated state provided by Erlang. The difference between Erlang garbage collection and Java garbage collection means that Java is unable to provide basic fault-tolerance guarantees simply due to the virtual machine garbage collection, even if libraries or language support was developed on top of the Java virtual machine. There have been attempts to develop fault-tolerance features in Java and other Java virtual machine based languages, but they continue to be failures due to the Java virtual machine garbage collection.

toptal-blog-image-1373446107190

Basically, building real-time fault-tolerance support on top of the JVM is by definition impossible, because the JVM itself is not fault-tolerant.

Erlang processes

At a low level, what happens when we get an error in an Erlang process? The language itself uses message passing between processes to ensure that any errors have a scope limited by a concurrent process. This works by storing data types as immutable objects; these objects are copied to limit the scope of the process state (large binaries are a special exception because they are reference counted to conserve memory).

In basic terms, that means that if we want to send variable X to another process P, we have to copy over X as its own immutable variable X’. We can’t modify X’ from our current process, and so even if we trigger some error, our second process P will be isolated from its effects. The end result is low-level control over the scope of any errors due to the isolation of state within Erlang processes. If we wanted to get even more technical, we’d mention that Erlang’s lack of mutability gives it referential transparency unlike, say, Java.

This type of fault-tolerance is deeper than just adding try-catch statements and exceptions. Here, fault-tolerance is about handling unexpected errors, and exceptions are expected. Here, you’re trying to keep your code running even when one of your variables unexpectedly explodes.

Erlang’s process scheduling provides extreme scalability for minimal source code, making the system simpler and easier to maintain. While it is true that other programming languages have been able to imitate thescalability found natively within Erlang by providing libraries with user-level threading (possibly combined with kernel-level threading) and data exchanging (similar to message passing) to implement their own Actor model for concurrent computation, the efforts have been unable to replicate the fault-tolerance provided within the Erlang virtual machine.

This leaves Erlang alone amongst programming languages as being both scalable and fault-tolerant, making it an ideal development platform for a cloud.

Taking advantage of Erlang

So, with all that said, I can make the claim that CloudI brings Erlang’s fault-tolerance and scalability to various other programming languages (currently C++/C, Erlang (of course), Java, Python, and Ruby), implementing services within a Service Oriented Architecture (SOA).

This simplicity makes CloudI a flexible framework for polyglot software development, providing Erlang’s strengths without requiring the programmer to write or even understand a line of Erlang code.

Every service executed within CloudI interacts with the CloudI API. All of the non-Erlang programming language services are handled using the same internal CloudI Erlang source code. Since the same minimal Erlang source code is used for all the non-Erlang programming languages, other programming language support can easily be added with an external programming language implementation of the CloudI API. Internally, the CloudI API is only doing basic serialization for requests and responses. This simplicity makes CloudI a flexible framework for polyglot software development, providing Erlang’s strengths without requiring the programmer to write or even understand a line of Erlang code.

The service configuration specifies startup parameters and fault-tolerance constraints so that service failures can occur in a controlled and isolated way. The startup parameters clearly define the executable and any arguments it needs, along with default timeouts used for service requests, the method to find a service (called the “destination refresh method”), both an allow and deny simple access control list (ACL) to block outgoing service requests and optional parameters to affect how service requests are handled. The fault-tolerance constraints are simply two integers (MaxR: maximum restarts, and MaxT: maximum time period in seconds) that control a service in the same way an Erlang supervisor behavior (an Erlang design pattern) controls Erlang processes. The service configuration provides explicit constraints for the lifetime of the service which helps to make the service execution easy to understand, even when errors occur.

To keep the service memory isolated during runtime, separate operating system processes are used for each non-Erlang service (referred to as “external” services) with an associated Erlang process (for each non-Erlang thread of execution) that is scheduled by the Erlang VM. The Erlang CloudI API creates “internal” services, which are also associated with an Erlang process, so both “external” services and “internal” services are processed in the same way within the Erlang VM.

toptal-blog-image-1373446123352

In cloud computing, it’s also important that your fault-tolerance extends beyond a single computer (i.e., distributed system fault-tolerance). CloudI uses distributed Erlang communication to exchange service registration information, so that services can be utilized transparently on any instance of CloudI by specifying a single service name for a request made with the CloudI API. All service requests are load-balanced by the sending service and each service request is a distributed transaction, so the same service with separate instances on separate computers is able to provide system fault-tolerance within CloudI. If necessary, CloudI can be deployed within a virtualized operating system to provide the same system fault-tolerance while facilitating a stable development framework.

For example, if an HTTP request needs to store some account data in a database it could be made to the configured HTTP service (provided by CloudI) which would send the request to an account data service for processing (based on the HTTP URL which is used as the service name) and the account data would then be stored in a database. Every service request receives a Universally Unique IDentifier (UUID) upon creation, which can be used to track the completion of the service request, making each service request within CloudI a distributed transaction. So, in the account data service example, it is possible to make other service requests either synchronously or asynchronously and use the service request UUIDs to handle the response data before utilizing the database. Having the explicit tracking of each individual service request helps ensure that service requests are delivered within the timeout period of a request and also provides a way to uniquely identify the response data (the service request UUIDs are unique among all connected CloudI nodes).

With a typical deployment, each CloudI node could contain a configured instance of the account data service (which may utilize multiple operating system processes with multiple threads that each have a CloudI API object) and an instance of the HTTP service. An external load balancer would easily split the HTTP requests between the CloudI nodes and the HTTP service would route each request as a service request within CloudI, so that the account data service can easily scale within CloudI.

CloudI in action

CloudI lets you take unscalable legacy source code, wrap it with a thin CloudI service, and then execute the legacy source code with explicit fault-tolerance constraints. This particular development workflow is important for fully utilizing multicore machines while providing a distributed system that is fault-tolerant during the processing of real-time requests. Creating an “external” service in CloudI is simply instantiating the CloudI API object in all the threads that have been configured within the service configuration, so that each thread is able to handle CloudI requests concurrently. A simple service example can utilize the single main thread to create a single CloudI API object, like the following Python source code:

import sys
sys.path.append('/usr/local/lib/cloudi-1.2.3/api/python/')
from cloudi_c import API

class Task(object):
    def __init__(self):
        self.__api = API(0) # first/only thread == 0

    def run(self):
        self.__api.subscribe('hello_world_python/get', self.__hello_world)
        self.__api.poll()

    def __hello_world(self, command, name, pattern, request_info, request,
                      timeout, priority, trans_id, pid):
        return 'Hello World!'

if __name__ == '__main__':
    assert API.thread_count() == 1 # simple example, without threads
    task = Task()
    task.run()

The example service simply returns a “Hello World!” to an HTTP GET request by first subscribing with a service name and a callback function. When the service starts processing incoming CloudI service bus requests within the CloudI API poll function, incoming requests from the “internal” service which provides a HTTP server are routed to the example service based on the service name, because of the subscription. The service could also have returned no data as a response, if the request needed to be similar to a publish message in a typical distributed messaging API that provides publish/subscribe functionality. The example service wants to provide a response so that the HTTP server can provide the response to the HTTP client, so the request is a typical request/reply transaction. With both possible responses from a service, either data or no data, the service callback function controls the messaging paradigm used, instead of the calling service, so the request is able to route through as many services as necessary to provide a response when necessary, within the timeout specified for the request to occur within. Timeouts are very important for real-time event processing, so each request specifies an integer timeout which follows the request as it routes though any number of services. The end result is that real-time constraints are enforced alongside fault-tolerance constraints, to provide a dependable service in the presence of any number of software errors.

The presence of source code bugs in any software should be understood as a clear fact that is only mitigated with fault-tolerance constraints. Software development can reduce the presence of bugs as software is maintained, but it can also add bugs as software features are added. CloudI provides cloud computing which is able to address these important fault-tolerance concerns within real-time distributed systems software development. As I have demonstrated in this CloudI and Erlang tutorial, the cloud computing that CloudI provides is minimal so that efficiency is not sacrificed for the benefits of cloud computing.

(Via toptal.com)

RebornDB: The Next Generation Distributed Key-Value Store

pasted image 0 (1)There are many key-value stores in the world and they are widely used in many systems. E.g, we can use a Memcached to store a MySQL query result for later same query, use MongoDB to store documents for better searching, etc.

For different scenarios, we should choose different key-value store. There is no silver-bullet key-value store for all solutions. But if you just want a simple key-value store, easy to use, very fast, supporting many powerful data structures, redis may be a good choice for your start.

Redis is advanced key-value cache and store, under BSD license. It is very fast, has many data types(String, Hash, List, Set, Sorted Set …), uses RDB or AOF persistence and replication to guarantee data security, and supplies many language client libraries.

Most of all, market chooses Redis. There are many companies using Redis and it has proved its worth.

Although redis is great, it still has some disadvantages, and the biggest one is memory limitation.  Redis keeps all data in memory, which limits the whole dataset size and lets us save more data impossibly.

The official redis cluster solves this by splitting data into many redis servers, but it has not been proven in many practical environments yet. At the same time, it need us to change our client libraries to support “MOVED” redirection and other special commands, this is unacceptable in running production too. So redis cluster is not a good solution now.

QDB

We like redis, and want to go beyond its limitation, so we building a service named QDB, which is compatible with redis, saves data in disk to exceed memory limitation and keeps hot data in memory for performance.

Introduction

QDB is a redis like, fast key-value store.It has below good features:

  • Compatible with Redis. If you are familiar with Redis, you can use QDB easily. QDB supports most of Redis commands and data structures (String, Hash, List, Set, Sorted Set).

  • Saves data in disk (exceeding memory limitation) and keeps hot data in memory, thanks to the backend storage.

  • Mutli backend storage support, you can choose RocksDB, LevelDB or GoLevelDB (Later, we will use RocksDB for example).

  • Bidirectional synchronization with Redis, we can sync data from Redis as a slave and replicate data into Redis as a master.

Backend Storage

QDB uses LevelDB/RocksDB/GoLevelDB as the backend storage. These storages are all based on log structured merge tree (LSM Tree) with very fast write and good read performance, at the same time, they all use bloom filter and LRU cache to improve read performance.

LevelDB is the earliest version developed by google, RocksDB is a optimized version maintained by facebook, GoLevelDB is a pure go implementation of LevelDB. If you only want a quick trial and don’t want to build and install RocksDB or LevelDB, you can use GoLevelDB directly, but we don’t recommend you to use it in production because of its low performance.

LevelDB and RocksDB are both great for your production, but we prefer RocksDB because its awesome performance, Later we may only support RocksDB and GoLevelDB, one for production and one for trial and test.

RebornDB

QDB is great, we can save huge data in one machine with high write/read performance. But as the dataset grows, we still meet the problem that we cannot keep all data in one machine. At the same time, the QDB server will become both a bottleneck and a single point of failure.

We must consider cluster solution now.

Introduction

RebornDB is a proxy-based distributed redis cluster solution. It’s similar to twemproxy which is nearly the earliest and famousest redis proxy-based cluster solution.

But twemproxy has its own problems. It only supports static cluster topology, so we cannot add or remove redis for data re-sharding dynamically. If we run many twemproxys and want to add a backend redis, how to let all twemproxys update the configuration safely is another problem which increases complexity for IT operation. At the same time, twitter, the company developing twemproxy, has already given up and not used it in production now.

Unlike twemproxy, RebornDB has a killer feature: re-sharding dataset dynamically,  this is useful when your dataset grows quickly and you have to add more storage nodes for scalability. And above all, RebornDB will do re-sharding tranparently and not influence current running services.

Architecture

We can think RebornDB is a black box and use any existent redis client to communicate with it like a single redis server. The following picture shows the RebornDB architecture.

pasted image 0

RebornDB has following components: reborn-proxy, backend store, coordinator, reborn-config, and reborn-agent.

Reborn-proxy

The unique outward service for clientsis reborn-proxy. Any redis client can connect to any reborn-proxy and run commands.

Reborn-proxy parses command sent from client using RESP, dispatches it to the corresponding backend store, receives the reply and returns to the client.

Reborn-proxy is stateless, which means that you can horizontally scale redis-proxy easily to serve more requests.

We may have many reborn-proxys, how to let clients to discover them is another topic in distributed system design, but we will not dive into it here, some practical approaches are using DNS, LVS, HAProxy, etc…

Backend store

Backend store is reborn-server (a modifed redis version) or QDB. We introduces a concept named group to manage one or more backend stores. A group must have a master and zero, one, or more slaves to form a replication topology.

We split whole dataset into 1024 slots(we will use hash(key) % 1024 to determine which slot the key belongs to), and save different slots in different groups. If you want to do re-sharding, you can add a new group and let RebornDB migrate all data in one slot from other group to it.

We can also use different backend stores in different groups. E.g, we want group1 to save hot data and group2 to save large cold data, and we can use reborn-server in group1 and QDB in group2. Reborn-server is much faster than QDB, so we can guarantee hot data read/write performance.

Coordinator

We use zookeeper or etcd as the coordinator, which coordinate all services when we need to do some write operations, like resharding,  failover, etc.

All RebornDB informations are saved in coordinator, e.g. key routing rules, with them reborn-proxy can dispatch commands to correct backend store.

Reborn-config

Reborn-config is the management tool, we can use it to add/remove group, add/remove store in a group, migrate data from one group to another, etc.

We must use reborn-config if we want to change RebornDB cluster information. E.g, we cann’t use “SLAVEOF NO ONE” command on backend store directly to promote a master, and must use “reborn-config server promote groupid server”. We must not only change the replication topology in group, but also update the information in coordinator too, only reborn-config can do this.

Reborn-config also supplies a web site so that you can manage RebornDB easily, and you can also use its HTTP restful API for more control.

Reborn-agent

Reborn-agent is a HA tool. You can use it to start/stop applications (reborn-config, qdb-server, reborn-server, reborn-proxy). We will discuss in detail in subsequent High Availabilitysection.

Resharding

RebornDB supports resharding dynamically. How do we support this?

As we said above, we will split whole dataset into 1024 slots, and let different groups store different slots. When we add a new group, we will migrate some slots from old groups into the new group. We call this migration when in resharding, and the minimum migration unit is slot in RebornDB.

Let’s start with a simple example below:

 

pasted image 0 (2)

 

We have two groups, group1 has slots 1,2, and group2 has slots 3, 4, 5. Now group2 has a high workload, we will add a group3 and migrate slot 5 into it.

We can use following command to migrate slot 5 from group2 to group3.

reborn-config slot migrate 5 5 3

This comamnd looks simple, but we need to do more work internally to guarantee migration safety. We must use a 2PC to tell reborn-proxy that we will migrate slot5 from group2 to group3, after all reborn-proxys confirm and reponse, we will begin to migrate.

 

pasted image 0 (3)

 

The migration flow is simple: get a key in slot5, migrate its data from group2 to group3, then delete the key in group2, once again. In the end, group2 has no slot5 data and all slot5 data is in group3.

When slot5 is in migration, Reborn-proxy may handle a command which the key belongs to slot5, but reborn-proxy cannot know whether this key is in group2 or in group3 at that time. So reborn-proxy will send a key migration command to group2 first, then dispatch this command to group3.

The key migration is atomic, so we can be sure that the key is in group3 after doing migration command, whether it was in group2 or group3 before.

If no data belongs to slot5 exists in group2, we will stop migration, and the topology looks like below:

 

pasted image 0 (4)

 

High Availability

RebornDB uses reborn-agent to supply HA solution.

Reborn-agent will check applications it started whether are alive or not every second. If the reborn-agent finds an application is down, it will restart it again.

Reborn-agent is similar like supervisor but has more great features.

Reborn-agent supplies HTTP Restful API so that we can add/remove applications which need to be monitored dynamically. E.g, we can use HTTP “/api/start_redis” API to start a new reborn-server, or “/api/start_proxy” API to start a new reborn-proxy, we can also use “/api/stop” to stop a running application and remove it from monitoring list.

Reborn-agent is not only for local application monitoring, but also for backend store HA. Mutli reborn-agents will first elect a leader through coordinator, a leader reborn-agent will check backend store alive every second, if it finds the backend store is down, it will do failover. If the down backend store is slave, reborn-agent will only set it offline in coordinator, but if the down backend store is master, reborn-agent will select a new master from slaves, and do failover.

Todo……

Although RebornDB has many great features, we still need more work to improve it. We may do following things later:

  • More user friendly. Now running RebornDB is not easy, we may do some work like initializing slots, adding server to group, assigning slots to one group, etc. How to reduce the user threshold must be considered for us in future work.

  • Replication migration. Now we migrate slot key by key, it is not fast when a slot has much data. Using replication migration may be better. In the above example, group2 first generate a snapshot from which group3 can get all slot5 data at that point time, then group3 syncs the changes from group2 incrementally. When we find group3 catches all data changes in slot5 with group2, we will do switchover, and delete slot5 in group2.

  • Pretty dashboard. We want to control and monitor everything through dashboard, in order to provide a better user experience.

  • P2P based cluster, now RebornDB is a proxy-based cluster solution, we may redesign whole architecture and use P2P like offical redis cluster later.

Conclusion

Building up a distributed key-value store is not an easy thing. The road ahead will be long and we have just made a small step now.

If you want to use a redis-like key-value store, saving more data and supporting resharding dynamically in distributed system, RebornDB is a good choice for you.

You can try it here and any advice and feedback is very welcome. :-)
(via HighScalability.com)

Linux Kernel 4.1 Released

Version 4.1 of the Linux kernel was released this week, and it includes a number of new features in the following areas.

Power Management

Several power-saving features have been integrated into the kernel, and the majority of them target high-end Intel chips. The improvements consist of tweaks to improve performance and reduce power consumption, and early reports claim they can help extend battery life by up to an hour.

EXT4 Encryption

It’s now possible to encrypt EXT4 filesystems, thanks to code contributed by Google. The code originally was developed for Android and has been pushed up-line to the mainstream kernel. This will be interesting news to government contractors and others who work in secure environments, where filesystem-level encryption is mandated.

Graphics

Version 4.1 includes improvements that effect several GPUs, including the GeForce GTX series. NVIDIA enjoys a troubled relationship with Linux, to which many users and developers will attest, and the company’s unwillingness to release open-source driver code has created a lot of problems for the kernel team. Linus has been quite vocal on the subject. Here’s a brief summary of his views on the subject (contains cursing): https://www.youtube.com/watch?v=iYWzMvlj2RQ.

The new version includes improvements that make it easier to configure Nouveau for the GeForce GTX 750. In the past, the process involved the manual extraction of firmware blobs, which is exactly as painful as it sounds.

Intel GPU support in virtual machines had been a bit troublesome in the past, but Linux 4.1 now supports XenGT vGPU for use with the Intel Xen hypervisor. A version for KVM is in the works too.

Version 4.1 also supports DisplayPort MST for Radeon cards.

Better Laptop Support

Updates to the Atmel MXT driver mean that the touchscreen and touchpad of Google’s Pixel 2 Chromebook are now fully supported.

Linux 4.1 also now can control the keyboard backlights on Dell laptops, which is great news for midnight typists!

The updated Toshiba ACPI driver now supports a host of fancy features, including adaptive keyboard buttons, USB sleep and charge, hotkeys, keyboard backlighting and more.

Game Support

X-Box One controllers gain force feedback and rumbler support in kernel version 4.1.

These changes are just the highlights; there are tons of other entries in the change logs.

Currently, 4.1 has not yet made it into the repos of the major distributions. That’s understandable, as it’s only just been released, but it’s only a matter of time until it is included. For instance, Ubuntu will start supporting it in October 2015. If you want to start using it before then, you can install it from a custom repository (as provided by the community), or compile it from source.
(via Linuxjournal.com)

POWER8 packs more than twice the big data punch

Last week, in London, the top minds in financial technology came face to face at the STAC Summit to discuss the technology challenges facing the financial industry. The impact big data is having on the financial industry was a hot topic, and many discussions revolved around having the best technology on hand for handling data faster to gain a competitive advantage.

It was in this setting that IBM shared recent benchmark results revealing that an IBM POWER8-based system server can deliver more than twice the performance of the best x86 server when running standard financial industry workloads.

IBM-POWER8-x86-STAC-benchmarks

In fact, IBM’s POWER8-based systems set four new performance records for financial workloads. According to the most recent published and certified STAC-A2 benchmarks, IBM POWER8-based systems deliver:

  • 3x performance over the best 2-socket solution using x86 CPUs
  • 1x performance for path scaling over the best 4-socket solution using x86 CPUs (NEW PUBLIC RECORD)
  • 7x performance over the best 2 x86 CPU and one Xeon Phi co-processor
  • 16 percent increase for asset capacity over the best 4-socket solution (NEW PUBLIC RECORD)

Why the financial industry created the STAC benchmark

STAC-A2 is a set of standard benchmarks that help estimate the relative performance of full systems running complete financial applications. This enables clients in the financial industry to evaluate how IBM POWER8-based systems will perform on real applications.

The IBM Power System S 824 delivered more than twice the performance of the best x86 based server measuredSTAC-A2 gives a much more accurate view of the expected performance as compared to micro benchmarks or simple code loops. STAC recently performed STAC-A2 Benchmark tests on a stack consisting of the STAC-A2 Pack for Linux on an IBM Power System S824 server using two IBM POWER8 Processor cards at 3.52 GHz and 1TB of DRAM, with Red Hat Enterprise Linux version 7.

And, as reported above, according to audited STAC results, the IBM Power System S824 delivered more than twice the performance of the best x86 based server measured. Those are the kind of results that matter—real results for real client challenges.

POWER8 processors are based on high performance, multi-threaded cores with each core of the Power System S824 server running up to eight simultaneous threads at 3.5 GHz. Power System S824 also has a very high bandwidth memory interface that runs at 192 GB/s per socket which is almost three times the speed of a typical x86 processor. These factors along with a balanced system structure including a large internal 8MB per core L3 are the primary reasons why financial computing workloads run significantly faster on POWER8-based systems than alternatives.

The STAC-A2 financial industry benchmarks add to the performance data that the Cabot Partners published recently. Cabot evaluated the performance of POWER8-based systems versus x86-based systems, evaluating functionality, performance and price/performance across several industries, including life sciences, financial services, oil and gas and analytics, referencing standard benchmarks as well as application oriented benchmark data.

The findings in the STAC-A2 benchmarking report position POWER8 as the ideal platform for the financial industry. This data, combined with the recently published Cabot Partners report, represents overwhelming proof that IBM POWER8-based systems take the performance lead in the financial services space (and beyond)—clearly packing a stronger punch when compared to the competition.

 

(via Smartercomputingblog.com)

Socket Sharding in NGINX Release 1.9.1

NGINX release 1.9.1 introduces a new feature that enables use of the SO_REUSEPORT socket option, which is available in newer versions of many operating systems, including DragonFly BSD and Linux (kernel version 3.9 and later). This option allows multiple sockets to listen on the same IP address and port combination. The kernel then load balances incoming connections across the sockets, effectively sharding the socket.

(For NGINX Plus customers, this feature will be available in Release 7, which is scheduled for later this year.)

The SO_REUSEPORT socket option has many real-world applications, such as the potential for easy rolling upgrades of services. For NGINX, it improves performance by more evenly distributing connections across workers.

As depicted in the figure, when the SO_REUSEPORT option is not enabled, a single listening socket by default notifies workers about incoming connections as the workers become available. If you include the accept_mutex off directive in the events context, the single listener instead notifies all workers about a new connection at the same time, putting them in competition to grab it. This is known as the thundering herd problem.

Slack-for-iOS-Upload-1-e1432652484191

With the SO_REUSEPORT option enabled, there is a separate listening socket for each worker. The kernel determines which available socket (and by implication, which worker) gets the connection. While this does decrease latency and improve performance as workers accept connections, it can also mean that workers are given new connections before they are ready to handle them.

Slack-for-iOS-Upload-e1432652376641

Configuring Socket Sharding

To enable the SO_REUSEPORT socket option, include the new reuseport parameter to the listen directive, as in this example:


http {
     server {
          listen       80 reuseport;
          server_name  localhost;
          …
     }
}

Including the reuseport parameter disables accept_mutex for the socket, because the lock is redundant with reuseport. It can still be worth setting accept_mutex if there are ports on which you don’t set reuseport.

Benchmarking the Performance Improvement

I ran a wrk benchmark with 4 NGINX workers on a 36-core AWS instance. To eliminate network effects, I ran both client and NGINX on localhost, and also had NGINX return the string OK instead of a file. I compared three NGINX configurations: the default (equivalent to accept_mutex on), with accept_mutex off, and with reuseport. As shown in the figure, reuseport increases requests per second by 2 to 3 times, and reduces both latency and the standard deviation for latency.
reuseport-benchmark

I also ran a more real-world benchmark with the client and NGINX on separate hosts and with NGINX returning an HTML file. As shown in the chart, with reuseport the decrease in latency was similar to the previous benchmark, and the standard deviation decreased even more dramatically (almost ten-fold). Other results (not shown in the chart) were also encouraging. With reuseport, the load was spread evenly across the worker processes. In the default condition (equivalent to accept_mutex on), some workers got a higher percentage of the load, and with accept_mutex off all workers experienced high load.

Latency (ms) Latency stdev (ms) CPU Load
Default 15.65 26.59 0.3
accept_mutex off 15.59 26.48 10
reuseport 12.35 3.15 0.3

(Via Nginx’s blog)

Web Crawling & Analytics Case Study – Database Vs Self Hosted Message Queuing Vs Cloud Message Queuing

image00

The Business Problem:

 To build a repository of used car prices and identify trends based on data available from used car dealers. The solution to the problem necessarily involved building large scale crawlers to crawl & parse thousands of used car dealer websites everyday.

1st Solution: Database Driven Solution to Crawling & Parsing

Our initial Infrastructure consisted of a crawling, parsing and database insertion web services all written in Python. When the crawling web service finishes with crawling a web site it pushes the output data to the database & the parsing web service picks it from there & after parsing the data, pushes the structured data into the database.

 

arch-with-db

 Problems with Database driven approach:

  • Bottlenecks: Writing the data into database and reading it back proved be a huge bottleneck and slowed down the entire process & left to high & low capacity issues in the crawling & parsing functions.

  • High Processing Cost: Due to the slow response time of many websites the parsing service would remain mostly idle which lead to a very high cost of servers & processing.

We tried to speed up the process by directly posting the data to the parsing service from crawling service but this resulted in loss of data when the parsing service was busy. Additionally, the approach presented a massive scaling challenge from read & write bottlenecks from the database.

2nd Solution: Self Hosted / Custom Deployment Using RabbitMQ

arch-with-rabbit-mq

To overcome the above mentioned problems and to achieve the ability to scale we moved to a new architecture using RabbitMQ. In the new architecture crawlers and parsers were Amazon EC2 micro instances. We used Fabric to push commands to the scripts running in the instances. The crawling instance would pull the used car dealer website from the website queue, crawl the relevant pages and push output the data to a crawled pages queue.The parsing instance would pull the data from the crawled pages queue, parse them and push data into parsed data queue and a data base insertion script would transfer that data into Postgres.

 This approach speeded up the crawling and parsing cycle. Scaling was just a matter of adding more instances created from specialized AMIs.

Problems with RabbitMQ Approach:

  • Setting up, deploying & maintaining this infrastructure across hundreds of servers was a nightmare for a small team

  • We suffered data losses every time there was a deployment & maintenance issues. Due to the tradeoff we were forced to make between speed and persistence of data in RabbitMQ, there was a chance we lost some valuable data if the server hosting RabbitMQ crashed.

3rd Solution: Cloud Messaging Deployment Using IronMQ & IronWorker

The concept of having multiple queues and multiple crawlers and parsers pushing and pulling data from them gave us a chance to scale the infrastructure massively. We were looking for solutions which could help us overcome the above problems using a similar architecture but without the headache of deployment & maintenance management.

The architecture, business logic & processing methods of using Iron.io & Ironworkers were similar to RabbitMQ but without the deployment & maintenance efforts. All our code is written in python and since Iron.io supports python we could set up the crawl & parsing workers and queues within 24 hours with minimal deployment & maintenance efforts. Reading and writing data into IronMQ is fast and all the messages in IronMQ are persistent and the chance of losing data is very less.


arch-with-iron-io

COMPARISON MATRIX BETWEEN DIFFERENT APPROACHES

Key Variables Database Driven Batch Processing Self Hosted – RabbitMQ Cloud Based – Iron MQ
Speed of processing a batch Slow Fast Fast
Data Loss from Server Crashes & Production Issues Low Risk Medium Risk Low Risk
Custom Programming for Queue Management High Effort Low Effort Low Effort
Set Up for Queue Management NA Medium Effort Low Effort
Deployment & Maintenance of Queues NA High Effort Low Effort

(Via DataScienceCentral.com)