Facebook has built its own switch. And it looks a lot like a server

parikh_002

SUMMARY:Facebook has built its own networking switch and developed a Linux-based operating systems to run it. The goal is to create networking infrastructure that mimics a server in terms of how its managed and configured.

Not content to remake the server, Facebook’s engineers have taken on the humble switch, building their own version of the networking box and the software to go with it. The resulting switch, dubbed Wedge, and the software called FBOSS will be provided to the Open Compute Foundation as an open source design for others to emulate. Facebook is already testing it with production traffic in its data centers.

Jay Parikh, the VP of infrastructure engineering at Facebook shared the news of the server onstage at the Gigaom Structure event Wednesday, explaining that Facebook’s goal in creating this project was to eliminate the network engineer and run its networking operations in the same easily swapped out and dynamic fashion as their servers. In many ways Facebook’s efforts with designing its own infrastructure have stemmed from the need to build hardware that was as flexible as the software running on top of it. It makes no sense to be innovating all the time with your code if you can’t adjust the infrastructure to run that code efficiently.

ocpnetwork

And networking has long been a frustrating aspect of IT infrastructure because it has been a black box that both delivered packets and also did the computing to figure out the path those packets should take. But as networks scaled out that combination — and the domination of the market by giants Cisco and Juniper — was becoming untenable. Thus efforts to separate the physical delivery of packets and the routing of the packets was split into two jobs allowing the networks to become software-defined — and allowing other companies to start innovating.

The creation of a custom-designed switch that allows Facebook to control its networking like it currently manages its servers has been a long time coming. It began the Open Compute effort with a redesigned server in 2011 and focused on servers and a bit of storage for the next two years. In May 2013 it called for vendors to submit designs for an open source switch and at our last year’s Structure event Parikh detailed Facebook’s new networking fabricthat allowed the social networking giant to move large amounts of traffic more efficiently.

But the combination of the modular hardware approach to the Wedge server and the Linux-based FBOSS operating system blow the switch apart in the same way Facebook blew the server apart. The switch will use the Group Hug microprocessor boards so any type of chip could slot into the box to control configuration and run the OS. The switch will still rely on a networking processor for routing and delivery of the packets and has a throughput of 640 Gbps, but eventually Facebook could separate the transport and decision-making process.

The whole goal here is to turn the monolithic switch into something that is modular and controlled by the FBOSS software that can be updated as needed without having to learn proprietary networking languages required by other providers’ gear. The question with Facebook’s efforts here is how it will affect the larger market for networking products.

Facebook’s infrastructure is relatively unique in that it wholly controls it and has the engineering talent to build software and new hardware to meet its computing needs. Google is another company that has built its own networking switch, but it didn’t open source those designs and keeps them close. But many enterprise customers don’t have the technical expertise of a web giant, so the tweaks that others contribute to the Open Compute Foundation to make the gear and the software will likely influence adoption.

(Via GigaOm.com)

Linux Containers and the Future Cloud

Linux-based container infrastructure is an emerging cloud technology based on fast and lightweight process virtualization. It provides its users an environment as close as possible to a standard Linux distribution. As opposed to para-virtualization solutions (Xen) and hardware virtualization solutions (KVM), which provide virtual machines (VMs), containers do not create other instances of the operating system kernel. Due to the fact that containers are more lightweight than VMs, you can achieve higher densities with containers than with VMs on the same host (practically speaking, you can deploy more instances of containers than of VMs on the same host).

Another advantage of containers over VMs is that starting and shutting down a container is much faster than starting and shutting down a VM. All containers under a host are running under the same kernel, as opposed to virtualization solutions like Xen or KVM where each VM runs its own kernel. Sometimes the constraint of running under the same kernel in all containers under a given host can be considered a drawback. Moreover, you cannot run BSD, Solaris, OS/x or Windows in a Linux-based container, and sometimes this fact also can be considered a drawback.

The idea of process-level virtualization in itself is not new, and it already was implemented by Solaris Zones as well as BSD jails quite a few years ago. Other open-source projects implementing process-level virtualization have existed for several years. However, they required custom kernels, which was often a major setback. Full and stable support for Linux-based containers on mainstream kernels by the LXC project is relatively recent, as you will see in this article. This makes containers more attractive for the cloud infrastructure. More and more hosting and cloud services companies are adopting Linux-based container solutions. In this article, I describe some open-source Linux-based container projects and the kernel features they use, and show some usage examples. I also describe the Docker tool for creating LXC containers.

The underlying infrastructure of modern Linux-based containers consists mainly of two kernel features: namespaces and cgroups. There are six types of namespaces, which provide per-process isolation of the following operating system resources: filesystems (MNT), UTS, IPC, PID, network and user namespaces (user namespaces allow mapping of UIDs and GIDs between a user namespace and the global namespace of the host). By using network namespaces, for example, each process can have its own instance of the network stack (network interfaces, sockets, routing tables and routing rules, netfilter rules and so on).

Creating a network namespace is very simple and can be done with the following iproute command: ip netns add myns1. With the ip netns command, it also is easy to move one network interface from one network namespace to another, to monitor the creation and deletion of network namespaces, to find out to which network namespace a specified process belongs and so on. Quite similarly, when using the MNT namespace, when mounting a filesystem, other processes will not see this mount, and when working with PID namespaces, you will see by running the ps command from that PID namespace only processes that were created from that PID namespace.

The cgroups subsystem provides resource management and accounting. It lets you define easily, for example, the maximum memory that a process may use. This is done by using cgroups VFS operations. The cgroups project was started by two Google developers, Paul Menage and Rohit Seth, back in 2006, and it initially was called “process containers”. Neither namespaces nor cgroups intervene in critical paths of the kernel, and thus they do not incur a high performance penalty, except for the memory cgroup, which can incur significant overhead under some workloads.

Linux-Based Containers

Basically, a container is a Linux process (or several processes) that has special features and that runs in an isolated environment, configured on the host. You might sometimes encounter terms like Virtual Environment (VE) and Virtual Private Server (VPS) for a container.

The features of this container depend on how the container is configured and on which Linux-based container is used, as Linux-based containers are implemented differently in several projects. I mention the most important ones in this article:

  • OpenVZ: the origins of the OpenVZ project are in a proprietary server virtualization solution called Virtuozzo, which originally was started by a company called SWsoft, founded in 1997. In 2005, a part of the Virtuozzo product was released as an open-source project, and it was called OpenVZ. Later, in 2008, SWsoft merged with a company called Parallels. OpenVZ is used for providing hosting and cloud services, and it is the basis of the Parallels Cloud Server. Like Virtuozzo, OpenVZ also is based on a modified Linux kernel. In addition, it has command-line tools (primarily vzctl) for management of containers, and it makes use of templates to create containers for various Linux distributions. OpenVZ also can run on some unmodified kernels, but with a reduced feature set. The OpenVZ project is intended to be fully mainlined in the future, but that could take quite a long time.
  • Google containers: in 2013, Google released the open-source version of its container stack, lmctfy (which stands for Let Me Contain That For You). Right now, it’s still in the beta stage. The lmctfy project is based on using cgroups. Currently, Google containers do not use the kernel namespaces feature, which is used by other Linux-based container projects, but using this feature is on the Google container project roadmap.
  • Linux-VServer: an open-source project that was first publicly released in 2001, it provides a way to partition resources securely on a host. The host should run a modified kernel.
  • LXC: the LXC (LinuX Containers) project provides a set of userspace tools and utilities to manage Linux containers. Many LXC contributors are from the OpenVZ team. As opposed to OpenVZ, it runs on an unmodified kernel. LXC is fully written in userspace and supports bindings in other programming languages like Python, Lua and Go. It is available in most popular distributions, such as Fedora, Ubuntu, Debian and more. Red Hat Enterprise Linux 6 (RHEL 6) introduced Linux containers as a technical preview. You can run Linux containers on architectures other than x86, such as ARM (there are several how-tos on the Web for running containers on Raspberry PI, for example).

I also should mention the libvirt-lxc driver, with which you can manage containers. This is done by defining an XML configuration file and then running virsh startvirsh console and visrh destroy to run, access and destroy the container, respectively. Note that there is no common code between libvirt-lxc and the userspace LXC project.

LXC Container Management

First, you should verify that your host supports LXC by running lxc-checkconfig. If everything is okay, you can create a container by using one of several ready-made templates for creating containers. In lxc-0.9, there are 11 such templates, mostly for popular Linux distributions. You easily can tailor these templates according to your requirements, if needed. So, for example, you can create a Fedora container called fedoraCT with:


lxc-create -t fedora -n fedoraCT

 

The container will be created by default under /var/lib/lxc/fedoraCT. You can set a different path for the generated container by adding the --lxcpath PATH option.

The -t option specifies the name of the template to be used, (fedora in this case), and the -n option specifies the name of the container (fedoraCT in this case). Note that you also can create containers of other distributions on Fedora, for example of Ubuntu (you need the debootstrap package for it). Not all combinations are guaranteed to work.

You can pass parameters to lxc-create after adding --. For example, you can create an older release of several distributions with the -R or -r option, depending on the distribution template. To create an older Fedora container on a host running Fedora 20, you can run:


lxc-create -t fedora -n fedora19 -- -R 19

 

You can remove the installation of an LXC container from the filesystem with:


lxc-destroy -n fedoraCT

 

For most templates, when a template is used for the first time, several required package files are downloaded and cached on disk under /var/cache/lxc. These files are used when creating a new container with that same template, and as a result, creating a container that uses the same template will be faster next time.

You can start the container you created with:


lxc-start -n fedoraCT

 

And stop it with:


lxc-stop -n fedoraCT

 

The signal used by lxc-stop is SIGPWR by default. In order to use SIGKILL in the earlier example, you should add -k to lxc-stop:


lxc-stop -n fedoraCT -k

 

You also can start a container as a dæmon by adding -d, and then log on into it with lxc-console, like this:


lxc-start -d -n fedoraCT
lxc-console -n fedoraCT

 

The first lxc-console that you run for a given container will connect you to tty1. If tty1 already is in use (because that’s the second lxc-console that you run for that container), you will be connected to tty2 and so on. Keep in mind that the maximum number of ttys is configured by the lxc.tty entry in the container configuration file.

You can make a snapshot of a non-running container with:


lxc-snapshot -n fedoraCT

 

This will create a snapshot under /var/lib/lxcsnaps/fedoraCT. The first snapshot you create will be called snap0; the second one will be called snap1 and so on. You can time-restore the snapshot at a later time with the -r option—for example:


lxc-snapshot -n fedoraCT -r snap0 restoredFdoraCT

 

You can list the snapshots with:


lxc-snapshot -L -n fedoraCT

 

You can display the running containers by running:


lxc-ls --active

 

Managing containers also can be done via scripts, using scripting languages. For example, this short Python script starts the fedoraCT container:


#!/usr/bin/python3

import lxc

container = lxc.Container("fedoraCT")
container.start()

 

Container Configuration

A default config file is generated for every newly created container. This config file is created, by default, in /var/lib/lxc/<containerName>/config, but you can alter that using the --lxcpath PATH option. You can configure various container parameters, such as network parameters, cgroups parameters, device parameters and more. Here are some examples of popular configuration items for the container config file:

  • You can set various cgroups parameters by setting values to the lxc.cgroup.[subsystem name] entries in the config file. The subsystem name is the name of the cgroup controller. For example, configuring the maximum memory a container can use to be 256MB is done by setting lxc.cgroup.memory.limit_in_bytes to be 256MB.
  • You can configure the container hostname by setting lxc.utsname.
  • There are five types of network interfaces that you can set with the lxc.network.type parameter: emptyvethvlan,macvlan and phys. Using veth is very common in order to be able to connect a container to the outside world. By using phys, you can move network interfaces from the host network namespace to the container network namespace.
  • There are features that can be used for hardening the security of LXC containers. You can avoid some specified system calls from being called from within a container by setting a secure computing mode, or seccomp, policy with the lxc.seccomp entry in the configuration file. You also can remove capabilities from a container with the lxc.cap.drop entry. For example, setting lxc.cap.drop = sys_module will create a container without the CAP_SYS_MDOULE capability. Trying to run insmod from inside this container will fail. You also can define Apparmor and SELinux profiles for your container. You can find examples in the LXC README and inman 5 lxc.conf.

Docker

Docker is an open-source project that automates the creation and deployment of containers. Docker first was released in March 2013 with Apache License Version 2.0. It started as an internal project by a Platform-as-a-Service (PaaS) company called dotCloud at the time, and now called Docker Inc. The initial prototype was written in Python; later the whole project was rewritten in Go, a programming language that was developed first at Google. In September 2013, Red Hat announced that it will collaborate with Docker Inc. for Red Hat Enterprise Linux and for the Red Hat OpenShift platform. Docker requires Linux kernel 3.8 (or above). On RHEL systems, Docker runs on the 2.6.32 kernel, as necessary patches have been backported.

Docker utilizes the LXC toolkit and as such is currently available only for Linux. It runs on distributions like Ubuntu 12.04, 13.04; Fedora 19 and 20; RHEL 6.5 and above; and on cloud platforms like Amazon EC2, Google Compute Engine and Rackspace.

Docker images can be stored on a public repository and can be downloaded with the docker pull command—for example, docker pull ubuntu or docker pull busybox.

To display the images available on your host, you can use thedocker images command. You can narrow the command for a specific type of images (fedora, for example) with docker images fedora.

On Fedora, running a Fedora docker container is simple; after installing the docker-io package, you simply start the docker dæmon with systemctl start docker, and then you can start a Fedora docker container with docker run -i -t fedora /bin/bash.

Docker has git-like capabilities for handling containers. Changes you make in a container are lost if you destroy the container, unless you commit your changes (much like you do in git) withdocker commit <containerId> <containerName/containerTag>. These images can be uploaded to a public registry, and they are available for downloading by anyone who wants to download them. Alternatively, you can set a private Docker repository.

Docker is able to create a snapshot using the kernel device mapper feature. In earlier versions, before Docker version 0.7, it was done using AUFS (union filesystem). Docker 0.7 adds “storage plugins”, so people can switch between device mapper and AUFS (if their kernel supports it), so that Docker can run on RHEL releases that do not support AUFS.

You can create images by running commands manually and committing the resulting container, but you also can describe them with a Dockerfile. Just like a Makefile will compile code into a binary executable, a Dockerfile will build a ready-to-run container image from simple instructions. The command to build an image from a Dockerfile is docker build. There is a tutorial about Dockerfiles and their command syntax on the Docker Web site. For example, the following short Dockerfile is for installing the iperfpackage for a Fedora image:


FROM fedora
MAINTAINER Rami Rosen
RUN yum install -y iperf

 

You can upload and store your images for free on the Docker public index. Just like with GitHub, storing public images is free and just requires you to register an account.

The Checkpoint/Restore Feature

The CRIU (Checkpoint/Restore in userspace) project is implemented mostly in userspace, and there are more than 100 little patches scattered in the kernel for supporting it. There were several attempts to implement Checkpoint/Restore in kernel space solely, some of them by the OpenVZ project. The kernel community rejected all of them though, as they were too complex.

The Checkpoint/Restore feature enables saving a process state in several image files and restoring this process from the point at which it was frozen, on the same host or on a different host at a later time. This process also can be an LXC container. The image files are created using Google’s protocol buffer (PB) format. The Checkpoint/Restore feature enables performing maintenance tasks, such as upgrading a kernel or hardware maintenance on that host after checkpointing its applications to persistent storage. Later on, the applications are restored on that host.

Another feature that is very important in HPC is load balancing using live migration. The Checkpoint/Restore feature also can be used for creating incremental snapshots, which can be used after a crash occurs. As mentioned earlier, some kernel patches were needed for supporting CRIU; here are some of them:

  • A new system call named kcmp() was added; it compares two processes to determine if they share a kernel resource.
  • A socket monitoring interface called sock_diag was added to UNIX sockets in order to be able to find the peer of a UNIX domain socket. Before this change, the ss tool, which relied on parsing of /proc entries, did not show this information.
  • A TCP connection repair mode was added.
  • procfs entry was added (/proc/PID/map_files).

Let’s look at a simple example of using the criu tool. First, you should check whether your kernel supports Checkpoint/Restore, by running criu check --ms. Look for a response that says "Looks good."

Basically, checkpointing is done by:


criu dump -t <pid>

 

You can specify a folder where the process state files will be saved by adding -D folderName.

You can restore with criu restore <pid>.

Summary

In this article, I’ve described what Linux-based containers are, and I briefly explained the underlying cgroups and namespaces kernel features. I have discussed some Linux-based container projects, focusing on the promising and popular LXC project. I also looked at the LXC-based Docker engine, which provides an easy and convenient way to create and deploy LXC containers. Several hands-on examples showed how simple it is to configure, manage and deploy LXC containers with the userspace LXC tools and the Docker tools.

Due to the advantages of the LXC and the Docker open-source projects, and due to the convenient and simple tools to create, deploy and configure LXC containers, as described in this article, we presumably will see more and more cloud infrastructures that will integrate LXC containers instead of using virtual machines in the near future. However, as explained in this article, solutions like Xen or KVM have several advantages over Linux-based containers and still are needed, so they probably will not disappear from the cloud infrastructure in the next few years.

Acknowledgements

Thanks to Jérôme Petazzoni from Docker Inc. and to Michael H. Warfield for reviewing this article.

Resources

Google Containers: https://github.com/google/lmctfy

OpenVZ: http://openvz.org/Main_Page

Linux-VServer: http://linux-vserver.org

LXC: http://linuxcontainers.org

libvirt-lxc: http://libvirt.org/drvlxc.html

Docker: https://www.docker.io

Docker Public Registry: https://index.docker.io

(Via LinuxJournal.com)

More About Apache Kafka: Next Generation Distributed Messaging System

Introduction

Apache Kafka is a distributed publish-subscribe messaging system. It was originally developed at LinkedIn Corporation and later on became a part of Apache project. Kafka is a fast, scalable, distributed in nature by its design, partitioned and replicated commit log service.

Apache Kafka differs from traditional messaging system in:

  • It is designed as a distributed system which is very easy to scale out.
  • It offers high throughput for both publishing and subscribing.
  • It supports multi-subscribers and automatically balances the consumers during failure.
  • It persist messages on disk and thus can be used for batched consumption such as ETL, in addition to real time applications.

In this article, I will highlight the architecture points, features and characteristics of Apache Kafka that will help us to understand how Kafka is better than traditional message server.

I will compare the traditional message serverRabbitMQ and Apache ActiveMQcharacteristics with Kafka and discuss certain scenarios where Kafka is a better solution than traditional message server. In the last section, we will explore a working sample application to showcase Kafka usage as message server. Complete source code of the sample application is available on GitHub. A detailed discussion around sample application is in the last section of this article.

Architecture

Firstly, I want to introduce the basic concepts of Kafka. Its architecture consists of the following components:

  • A stream of messages of a particular type is defined as a topic. A Message is defined as a payload of bytes and a Topicis a category or feed name to which messages are published.
  • Producer can be anyone who can publish messages to a Topic.
  • The published messages are then stored at a set of servers called Brokers or Kafka Cluster.
  • Consumer can subscribe to one or more Topics and consume the published Messages by pulling data from the Brokers.

7Fig1

Figure 1: Kafka Producer, Consumer and Broker environment

Producer can choose their favorite serialization method to encode the message content. For efficiency, the producer can send a set of messages in a single publish request. Following code examples shows how to create a Producer to send messages.

Sample producer code:

producer = new Producer(…); 
message = new Message(“test message str”.getBytes()); 
set = new MessageSet(message); 
producer.send(“topic1”, set); 

For subscribing topic, a consumer first creates one or more message streams for the topic. The messages published to that topic will be evenly distributed into these streams. Each message stream provides an iterator interface over the continual stream of messages being produced. The consumer then iterates over every message in the stream and processes the payload of the message. Unlike traditional iterators, the message stream iterator never terminates. If currently no message is there to consume, the iterator blocks until new messages are published to the topic. Kafka supports both the point-to-point delivery model in which multiple consumers jointly consume a single copy of message in a queue as well as the publish-subscribe model in which multiple consumers retrieve its own copy of the message. Following code examples shows a Consumer to consume messages.

Sample consumer code:

streams[] = Consumer.createMessageStreams(“topic1”, 1) 
for (message : streams[0]) { 
bytes = message.payload(); 
// do something with the bytes 
}

The overall architecture of Kafka is shown in Figure 2. Since Kafka is distributed in nature, a Kafka cluster typically consists of multiple brokers. To balance load, a topic is divided into multiple partitions and each broker stores one or more of those partitions. Multiple producers and consumers can publish and retrieve messages at the same time.

3Fig2

Figure 2: Kafka Architecture

Kafka Storage

Kafka has a very simple storage layout. Each partition of a topic corresponds to a logical log. Physically, a log is implemented as a set of segment files of equal sizes. Every time a producer publishes a message to a partition, the broker simply appends the message to the last segment file. Segment file is flushed to disk after configurable numbers of messages have been published or after a certain amount of time elapsed. Messages are exposed to consumer after it gets flushed.

Unlike traditional message system, a message stored in Kafka system doesn’t have explicit message ids.

Messages are exposed by the logical offset in the log. This avoids the overhead of maintaining auxiliary, seek-intensive random-access index structures that map the message ids to the actual message locations. Messages ids are incremental but not consecutive. To compute the id of next message adds a length of the current message to its logical offset.

Consumer always consumes messages from a particular partition sequentially and if the consumer acknowledges particular message offset, it implies that the consumer has consumed all prior messages. Consumer issues asynchronous pull request to the broker to have a buffer of bytes ready to consume. Each asynchronous pull request contains the offset of the message to consume. Kafka exploits the sendfile API to efficiently deliver bytes in a log segment file from a broker to a consumer.

3Fig3

Figure 3: Kafka Storage Architecture

Kafka Broker

Unlike other message system, Kafka brokers are stateless. This means that the consumer has to maintain how much it has consumed. Consumer maintains it by itself and broker would not do anything. Such design is very tricky and innovative in itself.

  • It is very tricky to delete message from the broker as broker doesn’t know whether consumer consumed the message or not. Kafka innovatively solves this problem by using a simple time-based SLA for the retention policy. A message is automatically deleted if it has been retained in the broker longer than a certain period.
  • This innovative design has a big benefit, as consumer can deliberately rewind back to an old offset and re-consume data. This violates the common contract of a queue, but proves to be an essential feature for many consumers.

Zookeeper and Kafka

Consider a distributed system with multiple servers, each of which is responsible for holding data and performing operations on that data. Some potential examples are distributed search engine, distributed build system or known system like Apache Hadoop. One common problem with all these distributed systems is how would you determine which servers are alive and operating at any given point of time? Most importantly, how would you do these things reliably in the face of the difficulties of distributed computing such as network failures, bandwidth limitations, variable latency connections, security concerns, and anything else that can go wrong in a networked environment, perhaps even across multiple data centers? These types of questions are the focus of Apache ZooKeeper, which is a fast, highly available, fault tolerant, distributed coordination service. Using ZooKeeper you can build reliable, distributed data structures for group membership, leader election, coordinated workflow, and configuration services, as well as generalized distributed data structures like locks, queues, barriers, and latches. Many well-known and successful projects already rely on ZooKeeper. Just a few of them include HBase, Hadoop 2.0, Solr Cloud, Neo4J, Apache Blur (incubating), and Accumulo.

ZooKeeper is a distributed, hierarchical file system that facilitates loose coupling between clients and provides an eventually consistent view of its znodes, which are like files and directories in a traditional file system. It provides basic operations such as creating, deleting, and checking existence of znodes. It provides an event-driven model in which clients can watch for changes to specific znodes, for example if a new child is added to an existing znode. ZooKeeper achieves high availability by running multiple ZooKeeper servers, called an ensemble, with each server holding an in-memory copy of the distributed file system to service client read requests.

3Fig4

Figure 4: ZooKeeper Ensemble Architecture

Figure 4 above shows typical ZooKeeper ensemble in which one server acting as a leader while the rest are followers. On start of ensemble leader is elected first and all followers replicate their state with leader. All write requests are routed through leader and changes are broadcast to all followers. Change broadcast is termed as atomic broadcast.

Usage of Zookepper in Kafka: As for coordination and facilitation of distributed system ZooKeeper is used, for the same reason Kafka is using it. ZooKeeper is used for managing, coordinating Kafka broker. Each Kafka broker is coordinating with other Kafka brokers using ZooKeeper. Producer and consumer are notified by ZooKeeper service about the presence of new broker in Kafka system or failure of the broker in Kafka system. As per the notification received by the Zookeeper regarding presence or failure of the broker producer and consumer takes decision and start coordinating its work with some other broker. Overall Kafka system architecture is shown below in Figure 5 below.

Figure 5: Overall Kafka architecture as distributed system

Apache Kafka v. other message servers

We’ll look at two different projects using Apache Kafka to differentiate from other message servers. These projects are LinkedIn and mine project is as follows:

LinkedIn Study

LinkedIn team conducted an experimental study, comparing the performance of Kafka with Apache ActiveMQ version 5,4 and RabbitMQ version 2.4. They used ActiveMQ’s default persistent message store kahadb. LinkedIn ran their experiments on two Linux machines, each with 8 2GHz cores, 16GB of memory, 6 disks with RAID 10. Two machines are connected with a 1GB network link. One of the machines was used as the Broker and the other machine was used as the Producer or the Consumer.

Producer Test

LinkedIn configured the broker in all systems to asynchronously flush messages to its persistence store. For each system, they ran a single Producer to publish a total of 10 million messages, each of 200 bytes. Kafka producer send messages in batches of size 1 and 50. ActiveMQ and RabbitMQ don’t seem to have an easy way to batch messages and LinkedIn assumes that it used a batch size of 1. Result graph is shown in Figure 6 below:

Figure 6: Producer performance result of LinkedIn experiment

Few reasons why Kafka output is much better are as follows:

  • Kafka producer doesn’t wait for acknowledgements from the broker and sends messages as faster as the broker can handle.
  • Kafka has a more efficient storage format. On average, each message had an overhead of 9 bytes in Kafka, versus 144 bytes in ActiveMQ. This is because of overhead of heavy message header, required by JMS and overhead of maintaining various indexing structures. LinkedIn observed that one of the busiest threads in ActiveMQ spent most of its time accessing a B-Tree to maintain message metadata and state.

Consumer Test

For consumer test LinkedIn used a single consumer to retrieve a total of 10 million messages. LinkedIn configured all systems so that each pull request should prefetch approximately the same amount data—up to 1,000 messages or about 200KB. For both ActiveMQ and RabbitMQ, LinkedIn set the consumer acknowledgement mode to be automatic. The results are presented in Figure 7.

Figure 7: Consumer performance result of LinkedIn experiment

Few reasons why Kafka output is much better are as follows:

  • Kafka has a more efficient storage format; fewer bytes were transferred from the broker to the consumer in Kafka.
  • The broker in both ActiveMQ and RabbitMQ containers had to maintain the delivery state of every message. LinkedIn team observed that one of the ActiveMQ threads was busy writing KahaDB pages to disks during this test. In contrast, there were no disk write activities on the Kafka broker. Finally, by using the sendfile API, Kafka reduces the transmission overhead

Currently I am working in a project which provides real-time service that quickly and accurately extracts OTC(over the counter) pricing content from messages. Project is very critical in nature as it deals with financial information of nearly 25 asset classes including Bonds, Loans and ABS(Asset Backed Securities). Project raw information sources cover major financial market areas of Europe, North America, Canada and Latin America. Below are some stats about the project which show how important it is to have an efficient distributed message server as part of the solution:

  • 1,300,000+ messages daily
  • 12,000,000+OTC prices parsed daily
  • 25+ supported asset classes
  • 70,000+ unique instruments parsed daily.

Messages contain PDF, Word documents, Excel files and certain other formats. OTC prices are also extracted from the attachments.

Because of the performance limitations of traditional message servers, as message queue becomes large while processing large attachment files, our project was facing serious problems and JMSqueue needed to be started two to three times in a day. Restarting a JMS Queue potentially loses the entire messages in the queue. Project needed a framework which can hold messages irrespective of the parser (consumer) behavior. Kafka features are well suited for the requirements in our project.

Features of the project in current setup:

  1. Fetchmail utility is used for remote-mail retrieval of messages which are further processed by the usage of Procmail utility filters like separate distribution of attachment based messages.
  2. Each message is retrieved in a separate file which is processed (read & delete) for insertion as a message in message server.
  3. Message content is retrieved from message server queue for parsing and information extraction.

Sample Application

Sample application is modified version of the original application which I am using in my project. I have tried to make artifacts of sample application simple by removing the usage of logging and multi-threading features. Intent of sample application is to show how to use Kafka producer and consumer API. Application contains a sample producer (simple producer code to demonstrate Kafka producer API usage and publish messages on a particular topic), sample consumer (simple consumer code to demonstrate Kafka consumer API usage) and message content generation (API to generate message content in a file at a particular file path)API. Below Figure shows the components and their relation with other components in the system.

Figure 8: Sample application architecture components

Sample application has a similar structure of the example application presented in Kafka source code. Source code of the application contains the ‘src’ java source folder and ‘config’ folder containing several configuration files and shell scripts for the execution of the sample application. For executing sample application, please follow the instructions mentioned in ReadMe.md file or Wiki page on Github website.

Application code is Apache Maven enabled and is very easy to setup for customization. Several Kafka build scripts are also modified for re-building the sample application code if anyone wants to modify or customize the sample application code. Detailed description about how to customize the sample application is documented in project’s Wiki page on GitHub.

Now let’s have a look on the core artifacts of the sample application.

Kafka Producer code example

/** 
 * Instantiates a new Kafka producer. 
 * 
 * @param topic the topic 
 * @param directoryPath the directory path 
 */ 
public KafkaMailProducer(String topic, String directoryPath) { 
       props.put("serializer.class", "kafka.serializer.StringEncoder"); 
       props.put("metadata.broker.list", "localhost:9092"); 
       producer = new kafka.javaapi.producer.Producer<Integer, String>(new ProducerConfig(props)); 
       this.topic = topic; 
       this.directoryPath = directoryPath; 
} 

public void run() { 
      Path dir = Paths.get(directoryPath); 
      try { 
           new WatchDir(dir).start(); 
           new ReadDir(dir).start(); 
      } catch (IOException e) { 
           e.printStackTrace(); 
      } 
}

Above code snippet has basic Kafka producer API usage like setting up property of the producer i.e. on which topic messages are going to publish, which serializer class we can use and information regarding broker. Basic functionality of the class is to read the email message file from email directory and publish it as a message on Kafka broker. Directory is watched using java.nio.WatchService class and as soon as email message is dumped in to the directory it will be read up and published on Kafka broker as a message.

Kafka Consumer code example

public KafkaMailConsumer(String topic) { 
       consumer = 
Kafka.consumer.Consumer.createJavaConsumerConnector(createConsumerConfig()); 
       this.topic = topic; 
} 

/** 
 * Creates the consumer config. 
 * 
 * @return the consumer config 
 */ 
private static ConsumerConfig createConsumerConfig() { 
      Properties props = new Properties(); 
      props.put("zookeeper.connect", KafkaMailProperties.zkConnect); 
      props.put("group.id", KafkaMailProperties.groupId); 
      props.put("zookeeper.session.timeout.ms", "400"); 
      props.put("zookeeper.sync.time.ms", "200"); 
      props.put("auto.commit.interval.ms", "1000"); 
      return new ConsumerConfig(props); 
} 

public void run() { 
      Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); 
      topicCountMap.put(topic, new Integer(1)); 
      Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap); 
      KafkaStream<byte[], byte[]> stream = consumerMap.get(topic).get(0); 
      ConsumerIterator<byte[], byte[]> it = stream.iterator();
      while (it.hasNext()) 
      System.out.println(new String(it.next().message())); 
}

Above code shows basic consumer API. As mentioned above consumer needs to set stream of messages for consumption. In run method this is what we are doing and then printing a consumed message on the console. In my project we are using to feed into the parser system to extract OTC prices.

We using Kafka as the message server currently in our QA system as Proof of Concept (POC) project and the overall performance looks better than JMS message server. One positive feature we are excited about is the re-consumption of messages which enables our parsing system to re-parse certain messages as per the business needs. Based on the positive response of Kafka we are now planning to use it as a log aggregator and analyze logs instead of using Nagios system.

Conclusion

Kafka is a novel system for processing of large chunks of data. Pull-based consumption model of Kafka allows a consumer to consume messages at its own speed. If some exception occurs while consuming the messages, the consumer has always a choice to re-consume the message.

(Via InfoQ.com)