Auth0 Architecture – Running In Multiple Cloud Providers And Regions

15920200395_04c420407a_m

Auth0 provides authentication, authorization and single sign on services for apps of any type: mobile, web, native; on any stack.

Authentication is critical for the vast majority of apps. We designed Auth0 from the beginning with multiple levels of redundancy. One of this levels is hosting. Auth0 can run anywhere: our cloud, your cloud, or even your own servers. And when we run Auth0 we run it on multiple-cloud providers and in multiple regions simultaneously.

This article is a brief introduction of the infrastructure behind app.auth0.com and the strategies we use to keep it up and running with high availability.

Core Service Architecture

The core service is relatively simple:

  • Front-end servers: these consist of several x-large VMs, running Ubuntu on Microsoft Azure.

  • Store: mongodb, running on dedicated memory optimized X-large VMs.

  • Intra-node service routing: nginx

All components of Auth0 (e.g. Dashboard, transaction server, docs) run on all nodes. All identical.

Multi-Cloud / High Availability

png_base647fe106fda2c43970

Multi Cloud Architecture

Last week, Azure suffered a global outage that lasted for hours. During that time our HA plan activated and we switched over to AWS

  • The services runs primarily on Microsoft Azure (IaaS). Secondary nodes on stand-by always ready on AWS.

  • We use Route53 with a failover routing policy. TTL at 60 secs. The Route53 health check detects using a probe against primary DC, if it fails (3 times, 10 seconds interval) it changes the DNS entry to point to secondary DC. So max downtime in case of primary failure is ~2 minutes.

  • We use puppet to deploy on every “push to master”. Using puppet allows us to be cloud independent on the configuration/deployment process. Puppet Master runs on our build server (TeamCity currently).

  • MongoDB is replicated often to secondary DC and secondary DC is configured as read-only.

  • While running on the secondary DC, only runtime logins are allowed and the dashboard is set to “read-only mode”.

  • We replicate all the configuration needed for a login to succeed (application info, secrets, connections, users, etc). We don’t replicate transactional data (tokens, logs).

  • In case of failover, there might might some logging records that are lost. We are planning to improve that by having a real-time replica across Azure and AWS.

  • We use our own version of chaos monkey to test the resiliency of our infrastructure https://github.com/auth0/chaos-mona

cqHiNgjo8PF+

Automated Testing

  • We have 1000+ unit and integration tests.

  • We use saucelabs to run cross-browser (desktop/mobile) integration tests for Lock, our JavaScript login widget.

  • We use phantomjs/casper for integration tests. We test, for instance, that a full flow login with Google and other providers works fine.

  • All these run before every push to production.

CDN

Our use case is simple, we need to serve our JS library and its configuration (which providers are enabled, etc.). Assets and configuration data is uploaded to S3. It has to support TLS on our own custom domain (https://cdn.auth0.com). We ended up building our own CDN.

  • We tried 3 reputable CDN providers, but run into a whole variety of issues: The first one we tried when we didn’t have our own domain for cdn. At some point we decided we needed our own domain over SSL/TLS. This cdn was too expensive if you want SSL and customer domain at that point (600/mo). We also had issues configuring it to work with gzip and S3. Since S3 cannot serve both version (zipped and not) of the same file and this CDN doesn’t have content negotiation, some browsers (cough IE) don’t play well with this. So we moved to another CDN which was much cheaper.

  • The second CDN, we had a handful of issues and we couldn’t understand the root cause of them. Their support was on chat and it took time to get answers. Sometimes it seemed to be S3 issues, sometimes they had issues on routing, etc.

  • We decided to spend more money and we moved to a third CDN. Given that this CDN is being used by high load services like GitHub we thought it was going to be fine. However, our requirements were different from GitHub. If the CDN doesn’t work for GitHub, you won’t see an image on the README.md. In our case, our customers depends on the CDN to serve the Login Widget, which means that if it doesn’t work, then their customers can’t login.

  • We ended up building our own CDN using nginx, varnish and S3. It’s hosted on every region on AWS and so far it has been working great (no downtime). We use Route53 latency based routing.

Sandbox (Used To Run Untrusted Code)

One of the features we provide is the ability to run custom code as part of the login transaction. Customers can write these rules and we have a public repository for commonly used rules.

  • The sandbox is built on CoreOS, Docker and etcd.

  • There is a pool of Docker instances that gets assigned to a tenant on-demand.

  • Each tenant gets its own docker instance and there is a recycling policy based on idle time.

  • There is a controller doing the recycling policy and a proxy that routes the request to the right container.

custom_code

sandbox_vm

More information about the sandbox is in this JSConf presentation Nov 2014: https://www.youtube.com/watch?feature=player_detailpage&v=I4VkZ5H9PE8#t=7015 and slides: http://tjanczuk.github.io/about/sandbox.html

Monitoring

Initially we used pingdom (we still use it), but we decided to develop our own health check system that can run arbitrary health checks based on node.js scripts. These run from all AWS regions.

  • It uses the same sandbox we developed for our service. We call the sandbox via an http API and send the node.js script to run as an HTTP POST.

  • We monitor all the components and we also do synthetic transactions against the service (e.g. a login transaction).

cFZ4mniHT_0+

c0k6Kb6aaui+

If a health check fails we get notified through Slack. We have two Slack channels #p1 and #p2. If the failure happens 1 time, it gets posted to #p2. If it happens 2 times in a row it gets posted to #p1 and all members of devops get an SMS (via Twilio).

For detailed performance counters and response times we use statsd and we send all the metrics to Librato. This is an example of a chart you can create.

cIzBgYAL6NL+

We also setup alerts based on derivative metrics (i.e. how much something grows or shrinks in a time period). For instance, we have one based on logins: if Derivate(logins) > X => Send an alert to Slack.

crcAsxzWUf7+

Finally, we have alerts coming from NewRelic for infrastructure components.

cmKYmWpoKFs+

For logging we use ElasticSearch, Logstash and Kibana. We are storing logs from nginx and mongodb at this point. We are also parsing mongo logs using logstash in order to identify slow queries (anything with a high number of collscans).

cWjCKlf_z3j+

Website

  • All related web properties: the auth0.com site, our blog, etc. run completely separate from the app and runtime, on their own Ubuntu + Docker VMs.

Future

This is where we are going:

  • We are moving to CoreOS and Docker. We want to move to a model where we manage clusters as a whole instead of doing configuration management over individual nodes. Docker helps also by removing some moving parts by doing image-based deployment (and be able to rollback at that level as well).

  • MongoDB will be geo-replicated across DCs between AWS and Azure. We are testing latency.

  • For all the search related features we are moving to ElasticSearch to provide search based on any criteria. MongoDB didn’t work out well in this scenario (given our multi-tenancy).

(Via HighScalability.com)

Spark Sets New Record in Sort Performance

Databricks, a company founded by the creators of Apache Spark, has recently announced a new record in the Daytona GraySort contest using the Spark processing engine. The Daytona GraySort contest is a 3rd party benchmark measuring how fast a system can sort 100 Terabytes of data.Databricks posted a throughput of 4.27 TB/min over a cluster of 206 machines for their official run which constitutes a 3x performance improvement, using 10x fewer machines when compared to the previous record submitted by Yahoo! running Hadoop MapReduce.

In a blog post announcing their submission to the Daytona GraySort contest, Databricks explained some of the technological improvements recently introduced to Spark that allowed it to sustain such a large throughput.

Spark 1.1 introduced a new shuffle implementation called sort-based shuffle. The previous shuffle implementation required an in-memory buffer for each partition in the shuffle which lead to notable memory overhead. The new sort-based shuffle requires only one in-memory buffer at a time. This significantly reduced the memory usage and allowed for considerably more tasks to be run concurrently on the same hardware.

In addition to the new shuffle algorithm, the network module was revamped based on Netty’s native Epoll socket transport which maintains its on pool of memory, bypassing the JVM’s memory allocator and reducing the impact of garbage collection. The new network module was then used to build an external shuffle service to allow shuffled files to be served even during garbage collection pauses in the main Spark executor.

Finally, Spark 1.1 included TimSort as its new default sorting algorithm. TimSort is derived from merge sort and insertion sort and performs better than quicksort in most real-world datasets, especially for datasets that are partially ordered.

All of these improvements allowed the Spark cluster to sustain 3GB/s/node I/O activity during the map phase, and 1.1 GB/s/node network activity during the reduce phase which saturated the 10Gbps ethernet link.

Spark is an advanced execution engine born out of research done at the AMPLab at UC Berkley. It allows programs to run up to 10x faster than Hadoop MapReduce and when data is on disk, and up to 100x faster when data resides in memory. Spark supports programs written in Java, Scala or Python and uses familiar functional programming constructs to build data processing flows.

Spark has garnered significant attention as a next generation execution platform for Hadoop and is seen by some as a replacement for MapReduce. It graduated to a top level Apache project in February and since then has been included in the Cloudera, Hortonworks and MapR’s Hadoop distributions. More recently, Hortonworks announced they will support running Hive on Spark as part of their Stinger.next initiative.

Databricks was founded in 2013 as a commercial entity supporting Spark and its associated projects. Those projects include Spark Streaming for stream processing, Spark SQL for querying Hive data and MLlib for machine learning.

(Via InfoQ.com)

Scaling Docker with Kubernetes

Kubernetes is an open source project to manage a cluster of Linux containers as a single system, managing and running Docker containers across multiple hosts, offering co-location of containers, service discovery and replication control. It was started by Google and now it is supported by Microsoft, RedHat, IBM and Docker amongst others.

Google has been using container technology for over ten years, starting over 2 billion containers per week. With Kubernetes it shares its container expertise creating an open platform to run containers at scale.

The project serves two purposes. Once you are using Docker containers the next question is how to scale and start containers across multiple Docker hosts, balancing the containers across them. It also adds a higher level API to define how containers are logically grouped, allowing to define pools of containers, load balancing and affinity.

Kubernetes is still at a very early stage, which translates to lots of changes going into the project, some fragile examples, and some cases for new features that need to be fleshed out, but the pace of development, and the support by other big companies in the space, is highly promising.

Kubernetes concepts

The Kubernetes architecture is defined by a master server and multiple minions. The command line tools connect to the API endpoint in the master, which manages and orchestrates all the minions, Docker hosts that receive the instructions from the master and run the containers.

  • Master: Server with the Kubernetes API service. Multi master configuration is on the roadmap.
  • Minion: Each of the multiple Docker hosts with the Kubelet service that receive orders from the master, and manages the host running containers.
  • Pod: Defines a collection of containers tied together that are deployed in the same minion, for example a database and a web server container.
  • Replication controller: Defines how many pods or containers need to be running. The containers are scheduled across multiple minions.
  • Service: A definition that allows discovery of services/ports published by containers, and external proxy communications. A service maps the ports of the containers running on pods across multiple minions to externally accesible ports.
  • kubecfg: The command line client that connects to the master to administer Kubernetes.

architecture-small

Kubernetes is defined by states, not processes. When you define a pod, Kubernetes tries to ensure that it is always running. If a container is killed, it will try to start a new one. If a replication controller is defined with 3 replicas, Kubernetes will try to always run that number, starting and stopping containers as necessary.

The example app used in this article is the Jenkins CI server, in a typical master-slaves setup to distribute the jobs. Jenkins is configured with the Jenkins swarm plugin to run a Jenkins master and multiple Jenkins slaves, all of them running as Docker containers across multiple hosts. The swarm slaves connect to the Jenkins master on startup and become available to run Jenkins jobs. The configuration files used in the example are available in GitHub, and the Docker images are available as csanchez/jenkins-swarm, for the master Jenkins, extending the official Jenkins image with the swarm plugin, and csanchez/jenkins-swarm-slave, for each of the slaves, just running the slave service on a JVM container.

Creating a Kubernetes cluster

Kubernetes provides scripts to create a cluster with several operating systems and cloud/virtual providers: Vagrant (useful for local testing), Google Compute Engine, Azure, Rackspace, etc.

The examples will use a local cluster running on Vagrant, using Fedora as OS, as detailed in the getting started instructions, and have been tested on Kubernetes 0.5.4. Instead of the default three minions (Docker hosts) we are going to run just two, which is enough to show the Kubernetes capabilities without requiring a more powerful machine.

Once you have downloaded Kubernetes and extracted it, the examples can be run from that directory. In order to create the cluster from scratch the only command needed is ./cluster/kube-up.sh.

$ export KUBERNETES_PROVIDER=vagrant
$ export KUBERNETES_NUM_MINIONS=2
$ ./cluster/kube-up.sh

Get the example configuration files:

$ git clone https://github.com/carlossg/kubernetes-jenkins.git

The cluster creation will take a while depending on machine power and internet bandwidth, but should eventually finish without errors and it only needs to be ran once.

Command line tool

The command line tool to interact with Kubernetes is called kubecfg, with a convenience script in cluster/kubecfg.sh.

In order to check that our cluster is up and running with two minions, just run the kubecfg list minions command and it should display the two virtual machines in the Vagrant configuration.

$ ./cluster/kubecfg.sh list minions

Minion identifier
----------
10.245.2.2
10.245.2.3

Pods

The Jenkins master server is defined as a pod in Kubernetes terminology. Multiple containers can be specified in a pod, that would be deployed in the same Docker host, with the advantage that containers in a pod can share resources, such as storage volumes, and use the same network namespace and IP. Volumes are by default empty directories, type emptyDir, that live for the lifespan of the pod, not the specific container, so if the container fails the persistent storage will live on. Other volume type is hostDir, that will mount a directory from the host server in the container.

In this Jenkins specific example we could have a pod with two containers, the Jenkins server and, for instance, a MySQL container to use as database, although we will only focus on a standalone Jenkins master container.

In order to create a Jenkins pod we run kubecfg with the Jenkins container pod definition, using Docker image csanchez/jenkins-swarm, ports 8080 and 50000 mapped to the container in order to have access to the Jenkins web UI and the slave API, and a volume mounted in /var/jenkins_home. You can find the example code in GitHub as well.

The Jenkins web UI pod (pod.json) is defined as follows:

{
  "id": "jenkins",
  "kind": "Pod",
  "apiVersion": "v1beta1",
  "desiredState": {
    "manifest": {
      "version": "v1beta1",
      "id": "jenkins",
      "containers": [
        {
          "name": "jenkins",
          "image": "csanchez/jenkins-swarm:1.565.3.3",
          "ports": [
            {
              "containerPort": 8080,
              "hostPort": 8080
            },
            {
              "containerPort": 50000,
              "hostPort": 50000
            }
          ],
          "volumeMounts": [
            {
              "name": "jenkins-data",
              "mountPath": "/var/jenkins_home"
            }
          ]
        }
      ],
      "volumes": [
        {
          "name": "jenkins-data",
          "source": {
            "emptyDir": {}
          }
        }
      ]
    }
  },
  "labels": {
    "name": "jenkins"
  }
}

And create it with:

$ ./cluster/kubecfg.sh -c kubernetes-jenkins/pod.json create pods

Name                Image(s)                           Host                Labels              Status
----------          ----------                         ----------          ----------          ----------
jenkins             csanchez/jenkins-swarm:1.565.3.3   <unassigned>        name=jenkins        Pending

After some time, depending on your internet connection, as it has to download the Docker image to the minion, we can check its status and in which minion is started.

$ ./cluster/kubecfg.sh list pods
Name                Image(s)                           Host                    Labels              Status
----------          ----------                         ----------              ----------          ----------
jenkins             csanchez/jenkins-swarm:1.565.3.3   10.0.29.247/10.0.29.247   name=jenkins        Running

If we ssh into the minion that the pod was assigned to, minion-1 or minion-2, we can see how Docker started the container defined, amongst other containers used by Kubernetes for internal management (kubernetes/pause and google/cadvisor).

$ vagrant ssh minion-2 -c "docker ps"

CONTAINER ID        IMAGE                              COMMAND                CREATED             STATUS              PORTS                                              NAMES
7f6825a80c8a        google/cadvisor:0.6.2              "/usr/bin/cadvisor"    3 minutes ago       Up 3 minutes                                                           k8s_cadvisor.b0dae998_cadvisormanifes12uqn2ohido76855gdecd9roadm7l0.default.file_cadvisormanifes12uqn2ohido76855gdecd9roadm7l0_28df406a
5c02249c0b3c        csanchez/jenkins-swarm:1.565.3.3   "/usr/local/bin/jenk   3 minutes ago       Up 3 minutes                                                           k8s_jenkins.f87be3b0_jenkins.default.etcd_901e8027-759b-11e4-bfd0-0800279696e1_bf8db75a
ce51fda15f55        kubernetes/pause:go                "/pause"               10 minutes ago      Up 10 minutes                                                          k8s_net.dbcb7509_0d38f5b2-759c-11e4-bfd0-0800279696e1.default.etcd_0d38fa52-759c-11e4-bfd0-0800279696e1_e4e3a40f
e6f00165d7d3        kubernetes/pause:go                "/pause"               13 minutes ago      Up 13 minutes       0.0.0.0:8080->8080/tcp, 0.0.0.0:50000->50000/tcp   k8s_net.9eb4a781_jenkins.default.etcd_901e8027-759b-11e4-bfd0-0800279696e1_7bd4d24e
7129fa5dccab        kubernetes/pause:go                "/pause"               13 minutes ago      Up 13 minutes       0.0.0.0:4194->8080/tcp                             k8s_net.a0f18f6e_cadvisormanifes12uqn2ohido76855gdecd9roadm7l0.default.file_cadvisormanifes12uqn2ohido76855gdecd9roadm7l0_659a7a52

And, once we know the container id, we can check the container logs with vagrant ssh minion-1 -c “docker logs cec3eab3f4d3″

We should also see the Jenkins web UI at http://10.245.2.2:8080/ or http://10.0.29.247:8080/, depending on what minion it was started in.

Service discovery

Kubernetes allows defining services, a way for containers to use discovery and proxy requests to the appropriate minion. With this definition in service-http.json we are creating a service with id jenkins pointing to the pod with the label name=jenkins, as declared in the pod definition, and forwarding the port 8888 to the container’s 8080.

{
  "id": "jenkins",
  "kind": "Service",
  "apiVersion": "v1beta1",
  "port": 8888,
  "containerPort": 8080,
  "selector": {
    "name": "jenkins"
  }
}

Creating the service with kubecfg:

$ ./cluster/kubecfg.sh -c kubernetes-jenkins/service-http.json create services

Name                Labels              Selector            IP                  Port
----------          ----------          ----------          ----------          ----------
jenkins                                 name=jenkins        10.0.29.247         8888

Each service is assigned a unique IP address tied to the lifespan of the Service. If we had multiple pods matching the service definition the service would load balance the traffic across all of them.

Another feature of services is that a number of environment variables are available for any subsequent containers ran by Kubernetes, providing the ability to connect to the service container, in a similar way as running linked Docker containers. This will provide useful for finding the master Jenkins server from any of the slaves.

JENKINS_PORT='tcp://10.0.29.247:8888'
JENKINS_PORT_8080_TCP='tcp://10.0.29.247:8888'
JENKINS_PORT_8080_TCP_ADDR='10.0.29.247'
JENKINS_PORT_8080_TCP_PORT='8888'
JENKINS_PORT_8080_TCP_PROTO='tcp'
JENKINS_SERVICE_PORT='8888'
SERVICE_HOST='10.0.29.247'

Another tweak we need to do is to open port 50000, needed by the Jenkins swarm plugin. It can be achieved creating another service service-slave.json so Kubernetes forwards traffic to that port to the Jenkins server container.

{
  "id": "jenkins-slave",
  "kind": "Service",
  "apiVersion": "v1beta1",
  "port": 50000,
  "containerPort": 50000,
  "selector": {
    "name": "jenkins"
  }
}

The service is created with kubecfg again.

$ ./cluster/kubecfg.sh -c kubernetes-jenkins/service-slave.json create services

Name                Labels              Selector            IP                  Port
----------          ----------          ----------          ----------          ----------
jenkins-slave                           name=jenkins        10.0.86.28          50000

An all the defined services are available now, including some Kubernetes internal ones:

$ ./cluster/kubecfg.sh list services

Name                Labels              Selector                                  IP                  Port
----------          ----------          ----------                                ----------          ----------
kubernetes-ro                           component=apiserver,provider=kubernetes   10.0.22.155         80
kubernetes                              component=apiserver,provider=kubernetes   10.0.72.49          443
jenkins                                 name=jenkins                              10.0.29.247         8888
jenkins-slave                           name=jenkins                              10.0.86.28          50000

Replication controllers

Replication controllers allow running multiple pods in multiple minions. Jenkins slaves can be run this way to ensure there is always a pool of slaves ready to run Jenkins jobs.

In a replication.json definition:

{
  "id": "jenkins-slave",
  "apiVersion": "v1beta1",
  "kind": "ReplicationController",
  "desiredState": {
    "replicas": 1,
    "replicaSelector": {
      "name": "jenkins-slave"
    },
    "podTemplate": {
      "desiredState": {
        "manifest": {
          "version": "v1beta1",
          "id": "jenkins-slave",
          "containers": [
            {
              "name": "jenkins-slave",
              "image": "csanchez/jenkins-swarm-slave:1.21",
              "command": [
                "sh", "-c", "/usr/local/bin/jenkins-slave.sh -master http://$JENKINS_SERVICE_HOST:$JENKINS_SERVICE_PORT -tunnel $JENKINS_SLAVE_SERVICE_HOST:$JENKINS_SLAVE_SERVICE_PORT -username jenkins -password jenkins -executors 1"
              ]
            }
          ]
        }
      },
      "labels": {
        "name": "jenkins-slave"
      }
    }
  },
  "labels": {
    "name": "jenkins-slave"
  }
}

The podTemplate section allows the same configuration options as a pod definition. In this case we want to make the Jenkins slave connect automatically to our Jenkins master, instead of relying on Jenkins multicast discovery. To do so we execute the jenkins-slave.sh command with -master parameter to point the slave to the Jenkins master running in Kubernetes. Note that we use the Kubernetes provided environment variables for the Jenkins service definition (JENKINS_SERVICE_HOST and JENKINS_SERVICE_PORT). The image command is overridden to configure the container this way, useful to reuse existing images while taking advantage of the service environment variables. It can be done in pod definitions too.

Create the replicas with kubecfg:

$ ./cluster/kubecfg.sh -c kubernetes-jenkins/replication.json create replicationControllers

Name                Image(s)                            Selector             Replicas
----------          ----------                          ----------           ----------
jenkins-slave       csanchez/jenkins-swarm-slave:1.21   name=jenkins-slave   1

Listing the pods now would show new ones being created, up to the number of replicas defined in the replication controller.

$ ./cluster/kubecfg.sh list pods

Name                                   Image(s)                            Host                    Labels               Status
----------                             ----------                          ----------              ----------           ----------
jenkins                                csanchez/jenkins-swarm:1.565.3.3    10.245.2.3/10.245.2.3   name=jenkins         Running
07651754-4f88-11e4-b01e-0800279696e1   csanchez/jenkins-swarm-slave:1.21   10.245.2.2/10.245.2.2   name=jenkins-slave   Pending

The first time running jenkins-swarm-slave image the minion has to download it from the Docker repository, but after a while, depending on your internet connection, the slaves should automatically connect to the Jenkins server. Going into the server where the slave is started, docker ps has to show the container running and docker logs is useful to debug any problems on container startup.

$ vagrant ssh minion-1 -c "docker ps"

CONTAINER ID        IMAGE                               COMMAND                CREATED              STATUS              PORTS                    NAMES
870665d50f68        csanchez/jenkins-swarm-slave:1.21   "/usr/local/bin/jenk   About a minute ago   Up About a minute                            k8s_jenkins-slave.74f1dda1_07651754-4f88-11e4-b01e-0800279696e1.default.etcd_11cac207-759f-11e4-bfd0-0800279696e1_9495d10e
cc44aa8743f0        kubernetes/pause:go                 "/pause"               About a minute ago   Up About a minute                            k8s_net.dbcb7509_07651754-4f88-11e4-b01e-0800279696e1.default.etcd_11cac207-759f-11e4-bfd0-0800279696e1_4bf086ee
edff0e535a84        google/cadvisor:0.6.2               "/usr/bin/cadvisor"    27 minutes ago       Up 27 minutes                                k8s_cadvisor.b0dae998_cadvisormanifes12uqn2ohido76855gdecd9roadm7l0.default.file_cadvisormanifes12uqn2ohido76855gdecd9roadm7l0_588941b0
b7e23a7b68d0        kubernetes/pause:go                 "/pause"               27 minutes ago       Up 27 minutes       0.0.0.0:4194->8080/tcp   k8s_net.a0f18f6e_cadvisormanifes12uqn2ohido76855gdecd9roadm7l0.default.file_cadvisormanifes12uqn2ohido76855gdecd9roadm7l0_57a2b4de

The replication controller can automatically be resized to any number of desired replicas:

$ ./cluster/kubecfg.sh resize jenkins-slave 2

And again the pods are updated to show where each replica is running.

$ ./cluster/kubecfg.sh list pods
Name                                   Image(s)                            Host                    Labels               Status
----------                             ----------                          ----------              ----------           ----------
07651754-4f88-11e4-b01e-0800279696e1   csanchez/jenkins-swarm-slave:1.21   10.245.2.2/10.245.2.2   name=jenkins-slave   Running
a22e0d59-4f88-11e4-b01e-0800279696e1   csanchez/jenkins-swarm-slave:1.21   10.245.2.3/10.245.2.3   name=jenkins-slave   Pending
jenkins                                csanchez/jenkins-swarm:1.565.3.3    10.245.2.3/10.245.2.3   name=jenkins         Running

220140917 kubernetes-jenkins

Scheduling

Right now the default scheduler is random, but resource based scheduling will be implemented soon. At the time of writing there are several issues opened to add scheduling based on memory and CPU usage. There is also work in progress in an Apache Mesos based scheduler. Apache Mesos is a framework for distributed systems providing APIs for resource management and scheduling across entire datacenter and cloud environments.

Self healing

One of the benefits of using Kubernetes is the automated management and recovery of containers.

If the container running the Jenkins server dies for any reason, for instance because the process being ran crashes, Kubernetes will notice and will create a new container after a few seconds.

$ vagrant ssh minion-2 -c 'docker kill `docker ps | grep csanchez/jenkins-swarm: | sed -e "s/ .*//"`'
51ba3687f4ee


$ ./cluster/kubecfg.sh list pods
Name                                   Image(s)                            Host                    Labels               Status
----------                             ----------                          ----------              ----------           ----------
jenkins                                csanchez/jenkins-swarm:1.565.3.3    10.245.2.3/10.245.2.3   name=jenkins         Failed
07651754-4f88-11e4-b01e-0800279696e1   csanchez/jenkins-swarm-slave:1.21   10.245.2.2/10.245.2.2   name=jenkins-slave   Running
a22e0d59-4f88-11e4-b01e-0800279696e1   csanchez/jenkins-swarm-slave:1.21   10.245.2.3/10.245.2.3   name=jenkins-slave   Running

And some time later, typically no more than a minute…

Name                                   Image(s)                            Host                    Labels               Status
----------                             ----------                          ----------              ----------           ----------
jenkins                                csanchez/jenkins-swarm:1.565.3.3    10.245.2.3/10.245.2.3   name=jenkins         Running
07651754-4f88-11e4-b01e-0800279696e1   csanchez/jenkins-swarm-slave:1.21   10.245.2.2/10.245.2.2   name=jenkins-slave   Running
a22e0d59-4f88-11e4-b01e-0800279696e1   csanchez/jenkins-swarm-slave:1.21   10.245.2.3/10.245.2.3   name=jenkins-slave   Running

Running the Jenkins data dir in a volume we guarantee that the data is kept even after the container dies, so we do not lose any Jenkins jobs or data created. And because Kubernetes is proxying the services in each minion the slaves will reconnect to the new Jenkins server automagically no matter where they run! And exactly the same will happen if any of the slave containers dies, the system will automatically create a new container and thanks to the service discovery it will automatically join the Jenkins server pool.

If something more drastic happens, like a minion dying, Kubernetes does not offer yet the ability to reschedule the containers in the other existing minions, it would just show the pods as Failed.

$ vagrant halt minion-2
==> minion-2: Attempting graceful shutdown of VM...
$ ./cluster/kubecfg.sh list pods
Name                                   Image(s)                            Host                    Labels               Status
----------                             ----------                          ----------              ----------           ----------
jenkins                                csanchez/jenkins-swarm:1.565.3.3    10.245.2.3/10.245.2.3   name=jenkins         Failed
07651754-4f88-11e4-b01e-0800279696e1   csanchez/jenkins-swarm-slave:1.21   10.245.2.2/10.245.2.2   name=jenkins-slave   Running
a22e0d59-4f88-11e4-b01e-0800279696e1   csanchez/jenkins-swarm-slave:1.21   10.245.2.3/10.245.2.3   name=jenkins-slave   Failed

Tearing down

kubecfg offers several commands to stop and delete the replication controllers, pods and services definitions.

To stop the replication controller, setting the number of replicas to 0, and causing the termination of all the Jenkins slaves containers:

$ ./cluster/kubecfg.sh stop jenkins-slave

To delete it:

$ ./cluster/kubecfg.sh rm jenkins-slave

To delete the jenkins server pod, causing the termination of the Jenkins master container:

$ ./cluster/kubecfg.sh delete pods/jenkins

To delete the services:

$ ./cluster/kubecfg.sh delete services/jenkins
$ ./cluster/kubecfg.sh delete services/jenkins-slave

Conclusion

Kubernetes is still a very young project, but highly promising to manage Docker deployments across multiple servers and simplify the execution of long running and distributed Docker containers. By abstracting infrastructure concepts and working on states instead of processes, it provides easy definition of clusters, including self healing capabilities out of the box. In short, Kubernetes makes management of Docker fleets easier.

About the Author

Carlos Sanchez has been working on automation and quality of software development, QA and operations processes for over 10 years, from build tools and continuous integration to deployment automation, DevOps best practices and continuous delivery. He has delivered solutions to Fortune 500 companies, working at several US based startups, most recently MaestroDev, a company he cofounded. Carlos has been a speaker at several conferences around the world, including JavaOne, EclipseCON, ApacheCON, JavaZone, Fosdem or PuppetConf. Very involved in open source, he is a member of the Apache Software Foundation amongst other open source groups, contributing to several projects, such as Apache Maven, Fog or Puppet.

(Via InfoQ.com)

DDN Pushes the Envelope for Parallel Storage I/O

Today at Supercomputing 2014, DataDirect Networks lifted the veil a bit more on Infinite Memory Engine (IME), its new software that will employ Flash storage and a bunch of smart algorithms to create a buffer between HPC compute and parallel file system resources, with the goal of improving file I/O by up to 100x. The company also announced the latest release of its Exascaler, its Lustre-based storage appliance lineup.

The data patterns have been changing at HPC sites in a way that is creating bottlenecks in the I/O. While many HPC shops may think they’re primarily working with large and sequential files, the reality is that most data is relatively small and random, and that fragmented I/O creates problems when moving the data across the interconnect, says Jeff Sisilli, Sr. Director Product Marketing at DataDirect Networks.

“Parallel file systems were really built for large files,” Sisilli tells HPCwire. “What we’re finding is 90 percent of typical I/O in HPC data centers utilizes small files, those less than 32KB. What happens is, when you inject those into a parallel file system, it starts to really bring down performance.”

DDN says it overcame the restrictions in how parallel file systems were created with IME, which creates a storage tier above the file system and provides a “fast data” layer between the compute nodes in an HPC cluster and the backend file system. The software, which resides on the I/O nodes in the cluster, utilizes any available Flash solid state drives (SSDs) or other non-volatile memory (NVM) storage resources available, creating a “burst buffer” to absorb peak loads and eliminate I/O contention.

IME_diagram-236x300
IME works in two ways. First, it removes any limitations of the POSIX layer, such as file locks, that can slow down communication. Secondly, algorithms bundle up the small and random I/O operations into larger files that can be more efficiently read into the file system.

In lab tests at a customer site, DDN ran IME against the S3D turbulent flow modeling software. The software was really designed for larger sequential files, but is often used in the real world with smaller and random files. In the customer’s case, these “mal-aligned and fragmented” files were causing I/O throughput across the InfiniBand interconnect to drop to 25 MBs per second.

After introducing IME, the customer was able to ingest data from the compute cluster onto IME’s SSDs at line rate. “This customer was using InfiniBand, and we were able to fill up InfiniBand all the way to line rate, and absorb at 50 GB per second,” Sisilli says.

The data wasn’t written back into the file system quite that quickly. But because the algorithms were able to align all those small files and convert fragments into full stripe writes, it did provide a speed up compared to 25MB per second. “We were able to drain out the buffer and write to the parallel file system at 4GB per second, which is two orders of magnitude faster than before,” Sisilli says.

The “net net” of IME, Sisilli says, is it frees up HPC compute cluster resources. “From the parallel file system side, we’re able to shield the parallel file system and underlying storage arrays from fragmented I/O, and have those be able to ingest optimized data and utilize much less hardware to be able to get to the performance folks need up above,” he says.

IME will work with any Lustre- or GPFS-based parallel file system. That includes DDN’s own EXAscaler line of Lustre-based storage appliances, or the storage appliances of any other vendor. There are no application modifications required to use IME, which also features data erasure encoding capabilities typicaly found in object file stores. The only requirements are that the application is POSIX compliant or uses the MPI job scheduler. DDN also provides an API that customers can use if they want to modify their apps to work with IME; the company has plans to create an ecosystem of compatible tools using this API.

There are other vendors developing similar Flash-bashed storage buffer offerings. But DDN says the fact that it’s taking an open, software-based approach gives customer an advantage over those vendors that are requiring customers to purchase specialized hardware, or those that work with only certain types of Interconnects.

IME_burst_buffer-300x153

IME isn’t available yet; it’s still in technology preview mode. But when it becomes available, scalability won’t be an issue. The software will be able to corral and make available petabytes worth of Flash or NVM storage resources living across thousands of nodes, Sisilli says. “What we’re recommending is [to have in IME] anywhere between two to three amount of your compute cluster memory to have a great working space within IME to accelerate your applications and and do I/O,” he says. “That can be all the way down to terabytes, and for supercomputers, it’s multi petabytes.”

IME is still undergoing tests, and is expected to become generally available in the second quarter of 2015. DDN will offer it as an appliance or as software.

DDN also today unveiled a new release of EXAScaler. With Version 2.1, DDN has improved read and write I/O performance by 25 percent. That will give DDN a comfortable advantage over competing file systems for some time, says Roger Goff, Sr. Product Manager for DDN.

“We know what folks are about to announce because they pre-announce those things,” Goff says. “Our solution is tremendously faster than what you will see [from other vendors], particularly on a per-rack performance basis.”

Other new features in version 2.1 include support for self-encrypting drives; improved rebuild times; InfiniBand optimizations; and better integration with DDN’s Storage Fusion Xcelerator (SFX Flash Caching) software.

DDN has also standardized on the Lustre file system from Intel, called Intel Enterprise Edition for Lustre version 2.5. That brings it several new capabilities, including a new MapReduce connector for running Hadoop workloads.

“So instead of having data replicated across multiple nodes in the cluster, which is the native mode for HDFS, with this adapter, you can run those Hadoop applications and take advantages of the single-copy nature of a parallel file system, yet have the same capability of a parallel file system to scale to thousands and thousands of clients accessing that same data,” Goff says.

EXAScaler version 2.1 is available now across all three EXAScaler products, including the entry-level SFA7700, the midrange ES12k/SFA12k-20, and the high-end SFA12KX/SFA212k-40.

( Via HPCwire.com )

Amazon Announces EC2 Container Service For Managing Docker Containers On AWS

Earlier this year I wrote about container computing and enumerated some of the benefits that you get when you use it as the basis for a distributed application platform: consistency & fidelity, development efficiency, and operational efficiency. Because containers are lighter in weight and have less memory and computational overhead than virtual machines, they make it easy to support applications that consist of hundreds or thousands of small, isolated “moving parts.” A properly containerized application is easy to scale and maintain, and makes efficient use of available system resources.

Introducing Amazon EC2 Container Service

meter80_1
In order to help you to realize these benefits, we are announcing a preview of our new container management service, EC2 Container Service (or ECS for short). This service will make it easy for you to run any number of Docker containers across a managed cluster of Amazon Elastic Compute Cloud (EC2) instances using powerful APIs and other tools. You do not have to install cluster management software, purchase and maintain the cluster hardware, or match your hardware inventory to your software needs when you use ECS. You simply launch some instances in a cluster, define some tasks, and start them. ECS is built around a scalable, fault-tolerant, multi-tenant base that takes care of all of the details of cluster management on your behalf.

By the way, don’t let the word “cluster” scare you off! A cluster is simply a pool of compute, storage, and networking resources that serves as a host for one or more containerized applications. In fact, your cluster can even consist of a single t2.micro instance. In general, a single mid-sized EC2 instance has sufficient resources to be used productively as a starter cluster.

EC2 Container Service Benefits
Here’s how this service will help you to build, run, and scale Docker-based applications:

  • Easy Cluster Management – ECS sets up and manages clusters made up of Docker containers. It launches and terminates the containers and maintains complete information about the state of your cluster. It can scale to clusters that encompass tens of thousands of containers across multiple Availability Zones.
  • High Performance – You can use the containers as application building blocks. You can start, stop, and manage thousands of containers in seconds.
  • Flexible Scheduling – ECS includes a built-in scheduler that strives to spread your containers out across your cluster to balance availability and utilization. Because ECS provides you with access to complete state information, you can also build your own scheduler or adapt an existing open source scheduler to use the service’s APIs.
  • Extensible & Portable – ECS runs the same Docker daemon that you would run on-premises. You can easily move your on-premises workloads to the AWS cloud, and back.
  • Resource Efficiency – A containerized application can make very efficient use of resources. You can choose to run multiple, unrelated containers on the same EC2 instance in order to make good use of all available resources. You could, for example, decide to run a mix of short-term image processing jobs and long-running web services on the same instance.
  • AWS Integration – Your applications can make use of AWS features such as Elastic IP addresses, resource tags, and Virtual Private Cloud (VPC). The containers are, in effect, a new base-level building block in the same vein as EC2 and S3.
  • Secure – Your tasks run on EC2 instances within an Amazon Virtual Private Cloud. The tasks can take advantage of IAM roles, security groups, and other AWS security features. Containers run in a multi-tenant environment and can communicate with each other only across defined interfaces. The containers are launched on EC2 instances that you own and control.

Using EC2 Container Service
ECS was designed to be easy to set up and to use!

You can launch an ECS-enabled AMI and your instances will be automatically checked into your default cluster. If you want to launch into a different cluster you can specify it by modifying the configuration file in the image, or passing in User Data on launch. To ECS-enable a Linux AMI, you simply install the ECS Agent and the Docker daemon.

ECS will add the newly launched instance to its capacity pool and run containers on it as directed by the scheduler. In other words, you can add capacity to any of your clusters by simply launching additional EC2 instances in them!

The ECS Agent will be available in open source form under an Apache license. You can install it on any of your existing Linux AMIs and call registerContainerInstances to add them to your cluster.

grid80_1

Here are a few vocabulary items to help you to get familiar with the terminology used by ECS:
  • Cluster – A cluster is a pool of EC2 instances in a particular AWS Region, all managed by ECS. One cluster can contain multiple instance types and sizes, and can reside within one or more Availability Zones.
  • Scheduler – A scheduler is associated with each cluster. The scheduler is responsible for making good use of the resources in the cluster by assigning containers to instances in a way that respects any placement constraints and simultaneously drives as much parallelism as possible, while also aiming for high availability.
  • Container – A container is a packaged (or “Dockerized,” as the cool kids like to say) application component. Each EC2 instance in a cluster can serve as a host to one or more containers.
  • Task Definition – A JSON file that defines a Task as a set of containers. Fields in the file define the image for each container, convey memory and CPU requirements, and also specify the port mappings that are needed for the containers in the task to communicate with each other.
  • Task – A task is an instantiation of a Task Definition consisting of one or more containers, defined by the work that they do and their relationship to each other.
  • ECS-Enabled AMI – An Amazon Machine Image (AMI) that runs the ECS Agent and dockerd. We plan to ECS-enable the Amazon Linux AMI and are working with our partners to similarly enable their AMIs.

EC2 Container Service includes a set of APIs that are both simple and powerful. You can create, describe, and destroy clusters and you can register EC2 instances therein. You can create task definitions and initiate and manage tasks.

Here is the basic set of steps that you will follow in order to run your application on ECS. I am making the assumption that you have already Dockerized your application by breaking it down in to fine-grained components, each described by a Dockerfile and each running nicely on your existing infrastructure. There are plenty of good resources online to help you with this process. Many popular applications application components have already been Dockerized and can be found on Docker Hub. You can use ECS with any public or private Docker repository that you can access. Ok, so here are the steps:

  1. Create a cluster, or decide to use the default one for your account in the target Region.
  2. Create your task definitions and register them with the cluster.
  3. Launch some EC2 instances and register them with the cluster.
  4. Start the desired number of copies of each task.
  5. Monitor the overall utilization of the cluster and the overall throughput of your application, and make adjustments as desired. For example, you can launch and then register additional EC2 instances in order to expand the cluster’s pool of available resources.

EC2 Container Service Pricing and Availability
The service is launch today in preview form. If you are interested in signing up, click here to join the waiting list.

There is no extra charge for ECS. As usual, you pay only for the resources that you use.

( Via Amazon blog)

Data Warehouse and Analytics Infrastructure at Viki

At Viki, we use data to power product, marketing and business decisions. We use an in-house analytics dashboard to expose all the data we collect to various teams through simple table and chart based reports. This allows them to monitor all our high level KPIs and metrics regularly.

Data also powers more heavy duty stuff – like our data-driven content recommendation system, or predictive models that help us forecast the value of content we’re looking to license. We’re also constantly looking at data to determine the success of new product features, tweak and improve existing features and even kill stuff that doesn’t work. All of this makes data an integral part of the decision making process at Viki.

To support all these functions, we need a robust infrastructure below it, and that’s our data warehouse and analytics infrastructure.

This post is the first of a series about our data warehouse and analytics infrastructure. In this post we’ll cover the high-level pipeline of the system and goes into details about how we collect and batch-process our data. Do expect a lot of detailed-level discussions.

About Viki: We’re a online TV site, with fan-powered translations in 150+ different languages. To understand more what Viki is, watch this short video (2 minutes).

Part 0: Overview (or TL;DR)

Our analytics infrastructure, following most common sense approach, is broken down into 3 steps:

  • Collect and Store Data
  • Process Data (batch + real-time)
  • Present Data

15724861706_3a81497b37_m

Collect and Store Data

  1. Logs (events) are sent by different clients to a central log collector
  2. Log collector forward events to a Hydration service, here the events get enriched with more time-sensitive information; the results are stored to S3

Batch Processing Data

  1. There is an hourly job that runs to take data from S3, apply further transformations (read: cleaning up bad data) and store the results to our cloud-based Hadoop cluster
  2. We run multiple MapReduce (Hive) jobs to aggregate data from Hadoop and write them to our central analytics database (Postgres)
  3. Another job takes a snapshot of our production databases and restore into our analytics database

Presenting Data

  1. All analytics/reporting-related activities are then done at our master analytics database. The results (meant for report presentation) are then sent to our Reporting DB
  2. We run an internal reporting dashboard app on top of our Reporting DB; this is where end-users log in to see their reports

Real-time Data Processing

  1. The data from Hydration service is also multiplexed and sent to our real-time processing pipeline (using Apache Storm)
  2. At Storm, we write custom job that does real-time aggregation of important metrics. We also write a real-time alerting system to inform ourselves when traffic goes bad

Part 1: Collecting, Pre-processing and Storing Data

aMC5MNF

We use fluentd to receive event logs from different platforms (web, Android, iOS, etc) (through HTTP). We set up a cluster of 2 dedicated servers running multiple fluentd instances inside Docker, load-balanced through HAproxy. When the message hits our endpoint, fluentd then buffers the messages and batch-forward them to our Hydration System. At the moment, our cluster is doing 100M messages a day.

A word about fluentd: It’s a robust open-source log collecting software that has a very healthy and helpful community around it. The core is written in C so it’s fast and scalable, with plugins written in Ruby, making it easy to extend.

What data do we collect? We collect everything that we think is useful for the business: a click event, an ad impression, ad request, a video play, etc. We call each record an event, and it’s stored as a JSON string like this:

{
  "time":1380452846, "event": "video_play",
  "video_id":"1008912v", "user_id":"5298933u",
  "uuid":"80833c5a760597bf1c8339819636df04",
  "app_id":"100004a", "app_ver":"2.9.3.151",
  "device":"iPad", "stream_quality":"720p",
  "ip":"99.232.169.246", "country":"ca",
  "city_name":"Toronto", "region_name":"ON"
}

Pre-processing Data – Hydration Service

cqglpGe

Data Hydration ServiceThe data after collected is sent to a Hydration Service for pre-processing. Here, the message is enriched with time-sensitive information.

For example: When a user watches a video (thus a video_play event sent), we want to know if it’s a free user or a paid user. Since the user could be a free user today and upgrade to paid tomorrow, the only way to correctly attribute the play event to free/paid bucket is to inject that status right right into the message when it’s received. In short, the service translates this:

{ "event":"video_play", "user_id":"1234" }

into this:

{ "event":"video_play", "user_id":"1234", "user_status":"free" }

For non time-sensitive operations (fixing typo, getting country from IP, etc), there is different process for that (discussed below)

Storing to S3

From the Hydration Service, the message is buffered and then stored to S3 – our source of truth. The data is gzip-compressed and stored into hour bucket, making it easy and fast to retrieve them per hour time-period.

Part 2: Batch-processing Data

The processing layer has 2 components, the batch-processing and the real-time processing component.

This section focuses mainly on our batch-processing layer – our main process of transforming data for reporting/presentation. We’ll cover our real-time processing layer in another post.

z1b5mFT

Batch-processing Data Layer

Cleaning Data Before Importing Into Hadoop

Those who have worked with data before know this: Cleaning data takes a lot of time. In fact: Cleaning and preparing data will take most of your time. Not the actual analysis.

What is unclean data (or bad data)? Data that is logged incorrectly. It comes in a lot of different forms. E.g

  • Typo mistake, send ‘clickk’ event instead of ‘click’
  • Clients send event twice, or forgot to send event

When bad data enters your system, it stays there forever, unless you purposely find a way to clean it/take it out

So how do we clean up bad data?

Previously when we receive a record, we write directly to our Hadoop cluster and make a backup to S3. This makes it difficult to correct the bad data, due to the append-only nature of Hadoop.

Now all the data is first stored into S3. And we have hourly process that takes data from S3, apply cleanup/transformations and load them into Hadoop (insert-overwrite).

V3KBoqY

Storing Data, Before and AfterThe process is similar in nature to the hydration process, but this time we look at 1 hour block at a time, rather than per record. This approach has many great benefits:

  • The pipeline is more linear, thus prevents from the threat of data discrepancy (between S3 and Hadoop).
  • The data is not tied down to being stored in Hadoop. If we want to load our data into other data storage, we’d just write another process that transform S3 data and dump somewhere else.
  • When a bad logging happens causing unclean data, we can modify the transformation code and rerun the data from the point of bad logging. Because the process isidempotent, we can perform the reprocessing as many times as we want without double-logging the data.

kYfXPXG

Our S3 to Hadoop Transform and Load ProcessIf you’ve studied this article The Log from the Data Engineering folks at LinkedIn, you’d notice that the approach is very similar (replacing Kafka with S3, and per-message processing with per hour processing). Indeed our paradigm is inspired by The Log architecture. However due to our needs, our system chose S3 because:

  1. When it comes to batch (re)processing, we do it in a time-period manner (eg. process 1 hour of data). Kafka use natural number to order message, thus if we use Kafka we’ll have to build another service to translate [beg_timestamp, end_timestamp) into [beg_index, end_index).
  2. Kafka can only retain up to X days due to the disk-space limitation. As we want the ability to reprocess data further back, employing Kafka means we need another strategy to cater to these cases.

Aggregating Data from Hadoop into Postgres

Once the data got into Hadoop, we’d have these daily aggregation jobs to aggregate data into fewer dimensions and port them into our analytics master database (PostgreSQL)

For example, to aggregate a table of video starts data together with some video and user information, we run this Hive query (MapReduce job):

-- The hadoop's events table would contain 2 fields: time (int), v (json)
SELECT
  SUBSTR(FROM_UNIXTIME(time), 0, 10) AS date_d,
  v['platform'] AS platform,
  v['country'] AS country,
  v['video_id'] AS video_id,
  v['user_id'] AS user_id
  COUNT(1) AS cnt
FROM events
WHERE time >= BEG_TS
  AND time <= END_TS
  AND v['event'] = 'video_start'
GROUP BY 1,2,3,4,5

and load the results into an aggregated.video_starts table in Postgres:

       Table "aggregated.video_starts"
   Column    |          Type          | Modifiers 
-------------+------------------------+-----------
 date_d      | date                   | not null
 platform    | character varying(255) | 
 country     | character(3)           | 
 video_id    | character varying(255) | 
 user_id     | character varying(255) | 
 cnt         | bigint                 | not null

Further querying and reporting of video_starts will be done out of this table. If we need more dimensions, we either rebuild this table with more dimensions, or build a new table from Hadoop.

If it’s a one-time ad-hoc analysis, we’d just run the queries directly against Hadoop.

Table Partitioning:

1LoFj7B

 

Also, we’re making use of Postgres’ Table Inheritance feature to partition our data into multiple monthly tables with a parent table on top of all. Your query just needs to hit the parent table and the engine will know which underlying monthly tables to hit to get your data.

This makes our data very easy to maintain, with small indexes and better rebuild process. We’d have fast (SSD) drives that host the recent tables, and move the older ones to slower (but bigger) drives for semi-archiving purpose.

Centralizing All Data

1Zuq7np

 

We dump our production databases into our master analytics database on a daily basis.

Also, we use a lot of 3rd party vendors (Adroll, DoubleClick, Flurry, GA, etc). For each of these services, we write a process to ping their API and import the data into our master analytics database.

These data, together with the aggregated data from Hadoop, allow us to produce meaningful analysis combining data from multiple sources.

For example, to break down our video starts by different genre, we would write some query that joins data from prod.videos table with aggregated.video_starts table:

-- Video Starts by genre
SELECT V.genre, SUM(cnt) FROM aggregated.video_starts VS
LEFT JOIN prod.videos V ON VS.video_id = V.id
WHERE VS.date_d = '2014-06-01'
GROUP BY 1
ORDER BY 1

The above is made possible because we have both sources of data (event tracking data + production data) in 1 place.

Centralizing data is a very important concept in our pipeline because it makes it simple and pain-free for us to connect, report and corroborate our numbers across many different data sources.

Managing Job Dependencies

We started with simple crontab to schedule our hourly/daily jobs. When the jobs grew complicated, we’d end up with very long crontab:

qHXdXme

Crontab also doesn’t support graph-based job flow (e.g run A and B at the sametime, when both finishes, run C)

rkomAJT

So we looked around for a solution. We considered Chronos (by Airbnb), but their use-case is more complicated than what we needed, plus the need to setup ZooKeeper and all that.

We ended up using Azkaban by LinkedIn, it has everything we need: Crontab with graph-based job flow, it also tell you the runtime history of your job. And when a job flow fails, you can restart them, running only tasks that failed/haven’t run.

It’s pretty awesome.

Making Sure Your Numbers Tie

One of the things I see being less discussed in analytics infrastructure talk/blog is making sure your data don’t drop half-way during transportation, resulting in data inconsistency in different storages.

We have a process that runs after every data transportation, it counts number of records in both source and destination storage and prints errors when they don’t match. These check-total processes sound a little tedious to do, but it proved crucial to our system; it gives us the confidence in the accurary of the numbers we report to management.

Case in point, we had a process that dumps data from Postgres to CSV, then compresses and uploads to S3 and loads them into Amazon Redshift (using COPY command). So technically we have the exact same table in both Postgres and Redshift. One day our analyst pointed out that the data in Redshift is significantly less than in Postgres. Upon investigation, there was bug that cause CSV file to be truncated and thus not fully loaded into Redshift tables. It was because for this particular process we didn’t have the check-totals in place.

Using Ruby as the Scripting Language

When we started we are primarily a Ruby shop, so going ahead with Ruby was a natural choice. “Isn’t it slow?”, you might say. But we use Ruby not to process data (i.e it rarely holds any large amount of data), but to facilitate and coordinate the process.

We have written an entire library to support doing data pipeline in Ruby. For example, we extended pg gem to make it more object-oriented to Postgres. It allows us to do table creation, table hot-swapping, upsert, insert-overwriting, copying tables between databases, etc, all without having to touch SQL code. It has become a nice, productive abstraction on top of SQL and Postgres. Think ORM for data-warehousing purpose.

Example: The below code will create a data.videos_by_genre table holding the result of a simple aggregation query. The process works on a temporary table and eventually it’ll perform a table hot-swap with the main one; this is to avoid any data disruption being made if we would have done it on the main table from the beginning.

columns = [
  {name: 'genre', data_type: 'varchar'},
  {name: 'video_count', data_type: 'int'},
]
indexes = [{columns: ['genre']}]
table = PGx::Table.new 'data.videos_by_genre', columns, indexes

table.with_temp_table do |temp_t|
  temp_t.drop(check_exists: true)
  temp_t.create
  connection.exec <<-SQL.strip_heredoc
    INSERT INTO #{temp_t.qualified_name}
    SELECT genre, COUNT(1) FROM prod.videos
    GROUP BY 1
  SQL

  temp_t.create_indexes
  table.hotswap
end

(the above example could also be done using a MATERIALIZED VIEW btw)

Having this set of libraries has proven very critical to our data pipeline process, since it allows us to write extensible and maintainable code that perform all sort of data transformations.

Technology

We rely mostly on free and open-source technologies. Our stack is:

  • fluentd (open-source) for collecting logs
  • Cloud-based Hadoop + Hive (TreasureData – 3rd party vendor)
  • PostgreSQL + Amazon Redshift as central analytics database
  • Ruby as scripting language
  • NodeJS (worker process) with Redis (caching)
  • Azkaban (job flow management)
  • Kestrel (message queue)
  • Apache Storm (real-time stream processing)
  • Docker for automated deployment
  • HAproxy (load balancing)
  • and lots of SQL (huge thanks to Postgres, one of the best relational databases ever made)

Conclusion

The above post went through the overall architecture of our analytics system. It also went into details the Collecting layer and Batch-processing layer. In later blog posts we’ll cover the remaining, specifically:

  • Our Data Presentation layer. And how Stuti, our analyst, built our funnel analysis, fan-in and fan-out tools, all with SQL. And it updates automatically (very funnel. wow!)
  • Our Real-time traffic alert/monitoring system (using Apache Storm)
  • turing: our feature roll-out, A/B testing framework

( Via Engineering. Viki.com )

Nifty Architecture Tricks From Wix – Building A Publishing Platform At Scale

15724861706_3a81497b37_m

Wix operates websites in the long tale. As a HTML5 based WYSIWYG web publishing platform, they have created over 54 million websites, most of which receive under 100 page views per day. So traditional caching strategies don’t apply, yet it only takes four web servers to handle all the traffic. That takes some smart work.

Aviran Mordo, Head of Back-End Engineering at Wix, has described their solution in an excellent talk: Wix Architecture at Scale. What they’ve developed is in the best tradition of scaling is specialization. They’ve carefully analyzed their system and figured out how to meet their aggressive high availability and high performance goals in some most interesting ways.

Wix uses multiple datacenters and clouds. Something I haven’t seen before is that they replicate data to multiple datacenters, to Google Compute Engine, and to Amazon. And they have fallback strategies between them in case of failure.

Wix doesn’t use transactions. Instead, all data is immutable and they use a simple eventual consistency strategy that perfectly matches their use case.

Wix doesn’t cache (as in a big caching layer). Instead, they pay great attention to optimizing the rendering path so that every page displays in under 100ms.

Wix started small, with a monolithic architecture, and has consciously moved to a service architecture using a very deliberate process for identifying services that can help anyone thinking about the same move.

This is not your traditional LAMP stack or native cloud anything. Wix is a little different and there’s something here you can learn from. Let’s see how they do it…

Stats

  • 54+ million websites, 1 million new websites per month.

  • 800+ terabytes of static data, 1.5 terabytes of new files per day

  • 3 data centers + 2 clouds (Google, Amazon)

  • 300 servers

  • 700 million HTTP requests per day

  • 600 people total, 200 people in R&D

  • About 50 services.

  • 4 public servers are needed to serve 45 million websites

Platform

  • MySQL

  • Google and Amazon clouds

  • CDN

  • Chef

Evolution

  • Simple initial monolithic architecture. Started with one app server. That’s the simplest way to get started. Make quick changes and deploy. It gets you to a particular point.

    • Tomcat, Hibernate, custom web framework

    • Used stateful logins.

    • Disregarded any notion of performance and scaling.

  • Fast forward two years.

    • Still one monolithic server that did everything.

    • At a certain scale of developers and customers it held them back.

    • Problems with dependencies between features. Changes in one place caused deployment of the whole system. Failure in unrelated areas caused system wide downtime.

  • Time to break the system apart.

    • Went with a services approach, but it’s not that easy. How are you going to break functionality apart and into services?

    • Looked at what users are doing in the system and identified three main parts: edit websites, view sites created by Wix, serving media.

    • Editing web sites includes data validation of data from the server, security and authentication, data consistency, and lots of data modification requests.

    • Once finished with the web site users will view it. There are 10x more viewers than editors. So the concerns are now:

      • high availability. HA is the most important feature because it’s the user’s business.

      • high performance

      • high traffic volume

      • the long tail. There are a lot of websites, but they are very small. Every site gets maybe 10 or 100 page views a day. The long tail make caching not the go to scalability strategy. Caching becomes very inefficient.

    • Media serving is the next big service. Includes HTML, javascript, css, images. Needed a way to serve files the 800TB of data under a high volume of requests. The win is static content is highly cacheable.

    • The new system looks like a networking layer that sits below three segment services: editor segment (anything that edits data), media segment (handles static files, read-only), public segment (first place a file is viewed, read-only).

Guidelines For How To Build Services

  • Each service has its own database and only one service can write to a database.

  • Access to a database is only through service APIs. This supports a separation of concerns and hiding the data model from other services.

  • For performance reasons read-only access is granted to other services, but only one service can write. (yes, this contradicts what was said before)

  • Services are stateless. This makes horizontal scaling easy. Just add more servers.

  • No transactions. With the exception of billing/financial transactions, all other services do not use transactions. The idea is to increase database performance by removing transaction overhead. This makes you think about how the data is modeled to have logical transactions, avoiding inconsistent states, without using database transactions.

  • When designing a new service caching is not part of the architecture. First, make a service as performant as possible, then deploy to production, see how it performs, only then, if there are performance issues, and you can’t optimize the code (or other layers), only then add caching.

Editor Segment

  • Editor server must handle lots of files.

  • Data stored as immutable JSON pages (~2.5 million per day) in MySQL.

  • MySQL is a great key-value store. Key is based on a hash function of the file so the key is immutable. Accessing MySQL by primary key is very fast and efficient.

  • Scalability is about tradeoffs. What tradeoffs are we going to make? Didn’t want to use NoSQL because they sacrifice consistency and most developers do not know how to deal with that. So stick with MySQL.

  • Active database. Found after a site has been built only 6% were still being updated. Given this then these active sites can be stored in one database that is really fast and relatively small in terms of storage (2TB).

  • Archive database. All the stale site data, for sites that are infrequently accessed, is moved over into another database that is relatively slow, but has huge amounts of storage. After three months data is pushed to this database is accesses are low. (one could argue this is an implicit caching strategy).

  • Gives a lot of breathing room to grow. The large archive database is slow, but it doesn’t matter because the data isn’t used that often. On first access the data comes from the archive database, but then it is moved to the active database so later accesses are fast.

High Availability For Editor Segment

  • With a lot of data it’s hard to provide high availability for everything. So look at the critical path, which for a website is the content of the website. If a widget has problems most of the website will still work. Invested a lot in protecting the critical path.

  • Protect against database crashes. Want to recover quickly. Replicate databases and failover to the secondary database.

  • Protect against data corruption and data poisoning.  Doesn’t have to be malicious, a bug is enough to spoil the barrel. All data is immutable. Revisions are stored for everything. Worst case  if corruption can’t be fixed is to revert to version where the data was fine.

  • Protect against unavailability. A website has to work all the time. This drove an investment in replicating data across different geographical locations and multiple clouds. This makes the system very resilient.

    • Clicking save on a website editing session sends a JSON file to the editor server.

    • The server sends the page to the active MySQL server which is replicated to another datacenter.

    • After the page is saved to locally, an asynchronous process is kicked upload the data to a static grid, which is the Media Segment.

    • After data is uploaded to the static grid, a notification is sent to a archive service running on the Google Compute Engine. The archive goes to the grid, downloads a page, and stores a copy on the Google cloud.

    • Then a notification is sent back to the editor saying the page was saved to GCE.

    • Another copy is saved to Amazon from GCE.

    • One the final notification is received it means there are three copies of the current revision of data: one in the database, the static grid, and on GCE.

    • For the current revision there are three copies. For old revision there two revisions (static grid, GCE).

    • The process is self-healing. If there’s a failure the next time a user updates their website everything that wasn’t uploaded will be uploaded again.

    • Orphan files are garbage collected.

Modeling Data With No Database Transactions

  • Don’t want a situation where a user edit two pages and only one page is saved in the database, which is an inconsistent state.

  • Take all the JSON files and stick them in the database one after the other. When all the files are saved another save command is issued which contains a manifest of all the IDs (which is hash of the content which is the file name on the static server) of the saved pages that were uploaded to the static servers.

Media Segment

  • Stores lots of files. 800TB of user media files, 3M files uploaded daily, and 500M metadata records.

  • Images are modified. They are resized for different devices and sharpened. Watermarks can be inserted and there’s also audio format conversion.

  • Built an eventually consistent distributed file system that is multi datacenter aware with automatic fallback across DCs. This is before Amazon.

  • A pain to run. 32 servers, doubling the number every 9 months.

  • Plan to push stuff to the cloud to help scale.

  • Vendor lock-in is a myth. It’s all APIs. Just change the implementation and you can move to different clouds in weeks.

  • What really locks you down is data. Moving 800TB of data to a different cloud is really hard.

  • They broke Google Compute Engine when they moved all their data into GCE. They reached the limits of the Google cloud. After some changes by Google it now works.

  • Files are immutable so the are highly cacheable.

  • Image requests first go to a CDN. If the image isn’t in the CDN the request goes to their primary datacenter in Austin. If the image isn’t in Austin the request then goes to Google Cloud. If it’s not in Google cloud it goes to a datacenter in Tampa.

Public Segment

  • Resolve URLs (45 million of them), dispatch to the appropriate renderer, and then render into HTML, sitemap XML, or robots TXT, etc.

  • Public SLA is that response time is < 100ms at peak traffic. Websites have to be available, but also fast. Remember, no caching.

  • When a user clicks publish after editing a page, the manifest, which contains references to pages, are pushed to Public. The routing table is also published.

  • Minimize out-of-service hops. Requires 1 database call to resolve the route. 1 RPC call to dispatch the request to the renderer. 1 database call to get the site manifest.

  • Lookup tables are cached in memory and are updated every 5 minutes.

  • Data is not stored in the same format as it is for the editor. It is stored in a denormalized format, optimized for read by primary key. Everything that is needed is returned in a single request.

  • Minimize business logic. The data is denormalized and precalculated. When you handle large scale every operation, every millisecond you add, it’s times 45 million, so every operation that happens on the public server has to be justified.

  • Page rendering.

    • The html returned by the public server is bootstrap html. It’s a shell with JavaScript imports and JSON data with references to site manifest and dynamic data.

    • Rendering is offloaded to the client. Laptops and mobile devices are very fast and can handle the rendering.

    • JSON was chosen because it’s easy to parse and compressible.

    • It’s easier to fix bugs on the client. Just redeploy new client code. When rendering is done on the server the html will be cached, so fixing a bug requires re-rendering millions of websites again.

High Availability For Public Segment

  • Goal is to be always available, but stuff happens.

  • On a good day: a browser makes a request, the request goes to a datacenter, through a load balancer, goes to a public server, resolves the route, goes to the renderer, the html goes back to the browser, and the browser runs the javascript. The javascript fetches all media files and the JSON data and renders a very beautiful web site. The browser then make a request to the Archive service. The Archive service replays the request in the same way the browser does and stores the data in a cache.

  • On a bad day a datacenter is lost, which did happen. All the UPSs died and the datacenter was down. The DNS was changed and then all the requests went to the secondary datacenter.

  • On a bad day Public is lost. This happened once when a load balancer got half of a configuration so all the Public servers were gone. Or a bad version can be deployed that starts returning errors. Custom code in the load balancer handles this problem by routing to the Archive service to fetch the cached if the Public servers are not available. This approach meant customers were not affected when Public went down, even though the system was reverberating with alarms at the time.

  • On a bad day the Internet sucks. The browser makes a request, goes to the datacenter, goes to the load balancer, gets the html back. Now the JavaScript code has to fetch all the pages and JSON data. It goes to the CDN, it goes to the static grid and fetches all the JSON files to render the site. In these processes Internet problems can prevent files from being returned. Code in JavaScript says if you can’t get to the primary location, try and get it from the archive service, if that fails try the editor database.

Lessons Learned

  • Identify your critical path and concerns. Think through how your product works. Develop usage scenarios. Focus your efforts on these as they give the biggest bang for the buck.

  • Go multi-datacenter and multi-cloud. Build redundancy on the critical path (for availability).

  • De-normalize data and Minimize out-of-process hops (for performance). Precaluclate and do everything possible to minimize network chatter.

  • Take advantage of client’s CPU power. It saves on your server count and it’s also easier to fix bugs in the client.

  • Start small, get it done, then figure out where to go next. Wix did what they needed to do to get their product working. Then they methodically moved to a sophisticated services architecture.

  • The long tail requires a different approach. Rather than cache everything Wix chose to optimize the heck out of the render path and keep data in both an active and archive databases.

  • Go immutable. Immutability has far reaching consequences for an architecture. It affects everything from the client through the back-end. It’s an elegant solution to a lot of problems.

  • Vendor lock-in is a myth. It’s all APIs. Just change the implementation and you can move to different clouds in weeks.

  • What really locks you down is data. Moving lots of data to a different cloud is really hard.

( Via HighScalability.com )