What is a Monolith?

There is currently a strong trend for microservice based architectures and frequent discussions comparing them to monoliths. There is much advice about breaking-up monoliths into microservices and also some amusing fights between proponents of the two paradigms – see the great Microservices vs Monolithic Melee. The term ‘Monolith’ is increasingly being used as a generic insult in the same way that ‘Legacy’ is!

However, I believe that there is a great deal of misunderstanding about exactly what a ‘Monolith’ is and those discussing it are often talking about completely different things.

A monolith can be considered an architectural style or a software development pattern (or anti-pattern if you view it negatively). Styles and patterns usually fit into different Viewtypes (a viewtype is a set, or category, of views that can be easily reconciled with each other [Clements et al., 2010]) and some basic viewtypes we can discuss are:

  • Module – The code units and their relation to each other at compile time.
  • Allocation – The mapping of the software onto its environment.
  • Runtime – The static structure of the software elements and how they interact at runtime.

A monolith could refer to any of the basic viewtypes above.

Module Monolith

If you have a module monolith then all of the code for a system is in a single codebase that is compiled together and produces a single artifact. The code may still be well structured (classes and packages that are coherent and decoupled at a source level rather than a big-ball-of-mud) but it is not split into separate modules for compilation. Conversely a non-monolithic module design may have code split into multiple modules or libraries that can be compiled separately, stored in repositories and referenced when required. There are advantages and disadvantages to both but this tells you very little about how the code is used – it is primarily done for development management.

module

 

 

Allocation Monolith

For an allocation monolith, all of the code is shipped/deployed at the same time. In other words once the compiled code is ‘ready for release’ then a single version is shipped to all nodes. All running components have the same version of the software running at any point in time. This is independent of whether the module structure is a monolith. You may have compiled the entire codebase at once before deployment OR you may have created a set of deployment artifacts from multiple sources and versions. Either way this version for the system is deployed everywhere at once (often by stopping the entire system, rolling out the software and then restarting).

A non-monolithic allocation would involve deploying different versions to individual nodes at different times. This is again independent of the module structure as different versions of a module monolith could be deployed individually.

allocation

 

Runtime Monolith

A runtime monolith will have a single application or process performing the work for the system (although the system may have multiple, external dependencies). Many systems have traditionally been written like this (especially line-of-business systems such as Payroll, Accounts Payable, CMS etc).

Whether the runtime is a monolith is independent of whether the system code is a module monolith or not. A runtime monolith often implies an allocation monolith if there is only one main node/component to be deployed (although this is not the case if a new version of software is rolled out across regions, with separate users, over a period of time).

runtime

Note that my examples above are slightly forced for the viewtypes and it won’t be as hard-and-fast in the real world.

Conclusion

Be very carefully when arguing about ‘Microservices vs Monoliths’. A direct comparison is only possible when discussing the Runtime viewtype and properties. You should also not assume that moving away from a Module or Allocation monolith will magically enable a Microservice architecture (although it will probably help). If you are moving to a Microservice architecture then I’d advise you to consider all these viewtypes and align your boundaries across them i.e. don’t just code, build and distribute a monolith that exposes subsets of itself on different nodes.

(Via Codingthearchitecture.com)

 

Auth0 Architecture – Running In Multiple Cloud Providers And Regions

15920200395_04c420407a_m

Auth0 provides authentication, authorization and single sign on services for apps of any type: mobile, web, native; on any stack.

Authentication is critical for the vast majority of apps. We designed Auth0 from the beginning with multiple levels of redundancy. One of this levels is hosting. Auth0 can run anywhere: our cloud, your cloud, or even your own servers. And when we run Auth0 we run it on multiple-cloud providers and in multiple regions simultaneously.

This article is a brief introduction of the infrastructure behind app.auth0.com and the strategies we use to keep it up and running with high availability.

Core Service Architecture

The core service is relatively simple:

  • Front-end servers: these consist of several x-large VMs, running Ubuntu on Microsoft Azure.

  • Store: mongodb, running on dedicated memory optimized X-large VMs.

  • Intra-node service routing: nginx

All components of Auth0 (e.g. Dashboard, transaction server, docs) run on all nodes. All identical.

Multi-Cloud / High Availability

png_base647fe106fda2c43970

Multi Cloud Architecture

Last week, Azure suffered a global outage that lasted for hours. During that time our HA plan activated and we switched over to AWS

  • The services runs primarily on Microsoft Azure (IaaS). Secondary nodes on stand-by always ready on AWS.

  • We use Route53 with a failover routing policy. TTL at 60 secs. The Route53 health check detects using a probe against primary DC, if it fails (3 times, 10 seconds interval) it changes the DNS entry to point to secondary DC. So max downtime in case of primary failure is ~2 minutes.

  • We use puppet to deploy on every “push to master”. Using puppet allows us to be cloud independent on the configuration/deployment process. Puppet Master runs on our build server (TeamCity currently).

  • MongoDB is replicated often to secondary DC and secondary DC is configured as read-only.

  • While running on the secondary DC, only runtime logins are allowed and the dashboard is set to “read-only mode”.

  • We replicate all the configuration needed for a login to succeed (application info, secrets, connections, users, etc). We don’t replicate transactional data (tokens, logs).

  • In case of failover, there might might some logging records that are lost. We are planning to improve that by having a real-time replica across Azure and AWS.

  • We use our own version of chaos monkey to test the resiliency of our infrastructure https://github.com/auth0/chaos-mona

cqHiNgjo8PF+

Automated Testing

  • We have 1000+ unit and integration tests.

  • We use saucelabs to run cross-browser (desktop/mobile) integration tests for Lock, our JavaScript login widget.

  • We use phantomjs/casper for integration tests. We test, for instance, that a full flow login with Google and other providers works fine.

  • All these run before every push to production.

CDN

Our use case is simple, we need to serve our JS library and its configuration (which providers are enabled, etc.). Assets and configuration data is uploaded to S3. It has to support TLS on our own custom domain (https://cdn.auth0.com). We ended up building our own CDN.

  • We tried 3 reputable CDN providers, but run into a whole variety of issues: The first one we tried when we didn’t have our own domain for cdn. At some point we decided we needed our own domain over SSL/TLS. This cdn was too expensive if you want SSL and customer domain at that point (600/mo). We also had issues configuring it to work with gzip and S3. Since S3 cannot serve both version (zipped and not) of the same file and this CDN doesn’t have content negotiation, some browsers (cough IE) don’t play well with this. So we moved to another CDN which was much cheaper.

  • The second CDN, we had a handful of issues and we couldn’t understand the root cause of them. Their support was on chat and it took time to get answers. Sometimes it seemed to be S3 issues, sometimes they had issues on routing, etc.

  • We decided to spend more money and we moved to a third CDN. Given that this CDN is being used by high load services like GitHub we thought it was going to be fine. However, our requirements were different from GitHub. If the CDN doesn’t work for GitHub, you won’t see an image on the README.md. In our case, our customers depends on the CDN to serve the Login Widget, which means that if it doesn’t work, then their customers can’t login.

  • We ended up building our own CDN using nginx, varnish and S3. It’s hosted on every region on AWS and so far it has been working great (no downtime). We use Route53 latency based routing.

Sandbox (Used To Run Untrusted Code)

One of the features we provide is the ability to run custom code as part of the login transaction. Customers can write these rules and we have a public repository for commonly used rules.

  • The sandbox is built on CoreOS, Docker and etcd.

  • There is a pool of Docker instances that gets assigned to a tenant on-demand.

  • Each tenant gets its own docker instance and there is a recycling policy based on idle time.

  • There is a controller doing the recycling policy and a proxy that routes the request to the right container.

custom_code

sandbox_vm

More information about the sandbox is in this JSConf presentation Nov 2014: https://www.youtube.com/watch?feature=player_detailpage&v=I4VkZ5H9PE8#t=7015 and slides: http://tjanczuk.github.io/about/sandbox.html

Monitoring

Initially we used pingdom (we still use it), but we decided to develop our own health check system that can run arbitrary health checks based on node.js scripts. These run from all AWS regions.

  • It uses the same sandbox we developed for our service. We call the sandbox via an http API and send the node.js script to run as an HTTP POST.

  • We monitor all the components and we also do synthetic transactions against the service (e.g. a login transaction).

cFZ4mniHT_0+

c0k6Kb6aaui+

If a health check fails we get notified through Slack. We have two Slack channels #p1 and #p2. If the failure happens 1 time, it gets posted to #p2. If it happens 2 times in a row it gets posted to #p1 and all members of devops get an SMS (via Twilio).

For detailed performance counters and response times we use statsd and we send all the metrics to Librato. This is an example of a chart you can create.

cIzBgYAL6NL+

We also setup alerts based on derivative metrics (i.e. how much something grows or shrinks in a time period). For instance, we have one based on logins: if Derivate(logins) > X => Send an alert to Slack.

crcAsxzWUf7+

Finally, we have alerts coming from NewRelic for infrastructure components.

cmKYmWpoKFs+

For logging we use ElasticSearch, Logstash and Kibana. We are storing logs from nginx and mongodb at this point. We are also parsing mongo logs using logstash in order to identify slow queries (anything with a high number of collscans).

cWjCKlf_z3j+

Website

  • All related web properties: the auth0.com site, our blog, etc. run completely separate from the app and runtime, on their own Ubuntu + Docker VMs.

Future

This is where we are going:

  • We are moving to CoreOS and Docker. We want to move to a model where we manage clusters as a whole instead of doing configuration management over individual nodes. Docker helps also by removing some moving parts by doing image-based deployment (and be able to rollback at that level as well).

  • MongoDB will be geo-replicated across DCs between AWS and Azure. We are testing latency.

  • For all the search related features we are moving to ElasticSearch to provide search based on any criteria. MongoDB didn’t work out well in this scenario (given our multi-tenancy).

(Via HighScalability.com)

Scaling Docker with Kubernetes

Kubernetes is an open source project to manage a cluster of Linux containers as a single system, managing and running Docker containers across multiple hosts, offering co-location of containers, service discovery and replication control. It was started by Google and now it is supported by Microsoft, RedHat, IBM and Docker amongst others.

Google has been using container technology for over ten years, starting over 2 billion containers per week. With Kubernetes it shares its container expertise creating an open platform to run containers at scale.

The project serves two purposes. Once you are using Docker containers the next question is how to scale and start containers across multiple Docker hosts, balancing the containers across them. It also adds a higher level API to define how containers are logically grouped, allowing to define pools of containers, load balancing and affinity.

Kubernetes is still at a very early stage, which translates to lots of changes going into the project, some fragile examples, and some cases for new features that need to be fleshed out, but the pace of development, and the support by other big companies in the space, is highly promising.

Kubernetes concepts

The Kubernetes architecture is defined by a master server and multiple minions. The command line tools connect to the API endpoint in the master, which manages and orchestrates all the minions, Docker hosts that receive the instructions from the master and run the containers.

  • Master: Server with the Kubernetes API service. Multi master configuration is on the roadmap.
  • Minion: Each of the multiple Docker hosts with the Kubelet service that receive orders from the master, and manages the host running containers.
  • Pod: Defines a collection of containers tied together that are deployed in the same minion, for example a database and a web server container.
  • Replication controller: Defines how many pods or containers need to be running. The containers are scheduled across multiple minions.
  • Service: A definition that allows discovery of services/ports published by containers, and external proxy communications. A service maps the ports of the containers running on pods across multiple minions to externally accesible ports.
  • kubecfg: The command line client that connects to the master to administer Kubernetes.

architecture-small

Kubernetes is defined by states, not processes. When you define a pod, Kubernetes tries to ensure that it is always running. If a container is killed, it will try to start a new one. If a replication controller is defined with 3 replicas, Kubernetes will try to always run that number, starting and stopping containers as necessary.

The example app used in this article is the Jenkins CI server, in a typical master-slaves setup to distribute the jobs. Jenkins is configured with the Jenkins swarm plugin to run a Jenkins master and multiple Jenkins slaves, all of them running as Docker containers across multiple hosts. The swarm slaves connect to the Jenkins master on startup and become available to run Jenkins jobs. The configuration files used in the example are available in GitHub, and the Docker images are available as csanchez/jenkins-swarm, for the master Jenkins, extending the official Jenkins image with the swarm plugin, and csanchez/jenkins-swarm-slave, for each of the slaves, just running the slave service on a JVM container.

Creating a Kubernetes cluster

Kubernetes provides scripts to create a cluster with several operating systems and cloud/virtual providers: Vagrant (useful for local testing), Google Compute Engine, Azure, Rackspace, etc.

The examples will use a local cluster running on Vagrant, using Fedora as OS, as detailed in the getting started instructions, and have been tested on Kubernetes 0.5.4. Instead of the default three minions (Docker hosts) we are going to run just two, which is enough to show the Kubernetes capabilities without requiring a more powerful machine.

Once you have downloaded Kubernetes and extracted it, the examples can be run from that directory. In order to create the cluster from scratch the only command needed is ./cluster/kube-up.sh.

$ export KUBERNETES_PROVIDER=vagrant
$ export KUBERNETES_NUM_MINIONS=2
$ ./cluster/kube-up.sh

Get the example configuration files:

$ git clone https://github.com/carlossg/kubernetes-jenkins.git

The cluster creation will take a while depending on machine power and internet bandwidth, but should eventually finish without errors and it only needs to be ran once.

Command line tool

The command line tool to interact with Kubernetes is called kubecfg, with a convenience script in cluster/kubecfg.sh.

In order to check that our cluster is up and running with two minions, just run the kubecfg list minions command and it should display the two virtual machines in the Vagrant configuration.

$ ./cluster/kubecfg.sh list minions

Minion identifier
----------
10.245.2.2
10.245.2.3

Pods

The Jenkins master server is defined as a pod in Kubernetes terminology. Multiple containers can be specified in a pod, that would be deployed in the same Docker host, with the advantage that containers in a pod can share resources, such as storage volumes, and use the same network namespace and IP. Volumes are by default empty directories, type emptyDir, that live for the lifespan of the pod, not the specific container, so if the container fails the persistent storage will live on. Other volume type is hostDir, that will mount a directory from the host server in the container.

In this Jenkins specific example we could have a pod with two containers, the Jenkins server and, for instance, a MySQL container to use as database, although we will only focus on a standalone Jenkins master container.

In order to create a Jenkins pod we run kubecfg with the Jenkins container pod definition, using Docker image csanchez/jenkins-swarm, ports 8080 and 50000 mapped to the container in order to have access to the Jenkins web UI and the slave API, and a volume mounted in /var/jenkins_home. You can find the example code in GitHub as well.

The Jenkins web UI pod (pod.json) is defined as follows:

{
  "id": "jenkins",
  "kind": "Pod",
  "apiVersion": "v1beta1",
  "desiredState": {
    "manifest": {
      "version": "v1beta1",
      "id": "jenkins",
      "containers": [
        {
          "name": "jenkins",
          "image": "csanchez/jenkins-swarm:1.565.3.3",
          "ports": [
            {
              "containerPort": 8080,
              "hostPort": 8080
            },
            {
              "containerPort": 50000,
              "hostPort": 50000
            }
          ],
          "volumeMounts": [
            {
              "name": "jenkins-data",
              "mountPath": "/var/jenkins_home"
            }
          ]
        }
      ],
      "volumes": [
        {
          "name": "jenkins-data",
          "source": {
            "emptyDir": {}
          }
        }
      ]
    }
  },
  "labels": {
    "name": "jenkins"
  }
}

And create it with:

$ ./cluster/kubecfg.sh -c kubernetes-jenkins/pod.json create pods

Name                Image(s)                           Host                Labels              Status
----------          ----------                         ----------          ----------          ----------
jenkins             csanchez/jenkins-swarm:1.565.3.3   <unassigned>        name=jenkins        Pending

After some time, depending on your internet connection, as it has to download the Docker image to the minion, we can check its status and in which minion is started.

$ ./cluster/kubecfg.sh list pods
Name                Image(s)                           Host                    Labels              Status
----------          ----------                         ----------              ----------          ----------
jenkins             csanchez/jenkins-swarm:1.565.3.3   10.0.29.247/10.0.29.247   name=jenkins        Running

If we ssh into the minion that the pod was assigned to, minion-1 or minion-2, we can see how Docker started the container defined, amongst other containers used by Kubernetes for internal management (kubernetes/pause and google/cadvisor).

$ vagrant ssh minion-2 -c "docker ps"

CONTAINER ID        IMAGE                              COMMAND                CREATED             STATUS              PORTS                                              NAMES
7f6825a80c8a        google/cadvisor:0.6.2              "/usr/bin/cadvisor"    3 minutes ago       Up 3 minutes                                                           k8s_cadvisor.b0dae998_cadvisormanifes12uqn2ohido76855gdecd9roadm7l0.default.file_cadvisormanifes12uqn2ohido76855gdecd9roadm7l0_28df406a
5c02249c0b3c        csanchez/jenkins-swarm:1.565.3.3   "/usr/local/bin/jenk   3 minutes ago       Up 3 minutes                                                           k8s_jenkins.f87be3b0_jenkins.default.etcd_901e8027-759b-11e4-bfd0-0800279696e1_bf8db75a
ce51fda15f55        kubernetes/pause:go                "/pause"               10 minutes ago      Up 10 minutes                                                          k8s_net.dbcb7509_0d38f5b2-759c-11e4-bfd0-0800279696e1.default.etcd_0d38fa52-759c-11e4-bfd0-0800279696e1_e4e3a40f
e6f00165d7d3        kubernetes/pause:go                "/pause"               13 minutes ago      Up 13 minutes       0.0.0.0:8080->8080/tcp, 0.0.0.0:50000->50000/tcp   k8s_net.9eb4a781_jenkins.default.etcd_901e8027-759b-11e4-bfd0-0800279696e1_7bd4d24e
7129fa5dccab        kubernetes/pause:go                "/pause"               13 minutes ago      Up 13 minutes       0.0.0.0:4194->8080/tcp                             k8s_net.a0f18f6e_cadvisormanifes12uqn2ohido76855gdecd9roadm7l0.default.file_cadvisormanifes12uqn2ohido76855gdecd9roadm7l0_659a7a52

And, once we know the container id, we can check the container logs with vagrant ssh minion-1 -c “docker logs cec3eab3f4d3″

We should also see the Jenkins web UI at http://10.245.2.2:8080/ or http://10.0.29.247:8080/, depending on what minion it was started in.

Service discovery

Kubernetes allows defining services, a way for containers to use discovery and proxy requests to the appropriate minion. With this definition in service-http.json we are creating a service with id jenkins pointing to the pod with the label name=jenkins, as declared in the pod definition, and forwarding the port 8888 to the container’s 8080.

{
  "id": "jenkins",
  "kind": "Service",
  "apiVersion": "v1beta1",
  "port": 8888,
  "containerPort": 8080,
  "selector": {
    "name": "jenkins"
  }
}

Creating the service with kubecfg:

$ ./cluster/kubecfg.sh -c kubernetes-jenkins/service-http.json create services

Name                Labels              Selector            IP                  Port
----------          ----------          ----------          ----------          ----------
jenkins                                 name=jenkins        10.0.29.247         8888

Each service is assigned a unique IP address tied to the lifespan of the Service. If we had multiple pods matching the service definition the service would load balance the traffic across all of them.

Another feature of services is that a number of environment variables are available for any subsequent containers ran by Kubernetes, providing the ability to connect to the service container, in a similar way as running linked Docker containers. This will provide useful for finding the master Jenkins server from any of the slaves.

JENKINS_PORT='tcp://10.0.29.247:8888'
JENKINS_PORT_8080_TCP='tcp://10.0.29.247:8888'
JENKINS_PORT_8080_TCP_ADDR='10.0.29.247'
JENKINS_PORT_8080_TCP_PORT='8888'
JENKINS_PORT_8080_TCP_PROTO='tcp'
JENKINS_SERVICE_PORT='8888'
SERVICE_HOST='10.0.29.247'

Another tweak we need to do is to open port 50000, needed by the Jenkins swarm plugin. It can be achieved creating another service service-slave.json so Kubernetes forwards traffic to that port to the Jenkins server container.

{
  "id": "jenkins-slave",
  "kind": "Service",
  "apiVersion": "v1beta1",
  "port": 50000,
  "containerPort": 50000,
  "selector": {
    "name": "jenkins"
  }
}

The service is created with kubecfg again.

$ ./cluster/kubecfg.sh -c kubernetes-jenkins/service-slave.json create services

Name                Labels              Selector            IP                  Port
----------          ----------          ----------          ----------          ----------
jenkins-slave                           name=jenkins        10.0.86.28          50000

An all the defined services are available now, including some Kubernetes internal ones:

$ ./cluster/kubecfg.sh list services

Name                Labels              Selector                                  IP                  Port
----------          ----------          ----------                                ----------          ----------
kubernetes-ro                           component=apiserver,provider=kubernetes   10.0.22.155         80
kubernetes                              component=apiserver,provider=kubernetes   10.0.72.49          443
jenkins                                 name=jenkins                              10.0.29.247         8888
jenkins-slave                           name=jenkins                              10.0.86.28          50000

Replication controllers

Replication controllers allow running multiple pods in multiple minions. Jenkins slaves can be run this way to ensure there is always a pool of slaves ready to run Jenkins jobs.

In a replication.json definition:

{
  "id": "jenkins-slave",
  "apiVersion": "v1beta1",
  "kind": "ReplicationController",
  "desiredState": {
    "replicas": 1,
    "replicaSelector": {
      "name": "jenkins-slave"
    },
    "podTemplate": {
      "desiredState": {
        "manifest": {
          "version": "v1beta1",
          "id": "jenkins-slave",
          "containers": [
            {
              "name": "jenkins-slave",
              "image": "csanchez/jenkins-swarm-slave:1.21",
              "command": [
                "sh", "-c", "/usr/local/bin/jenkins-slave.sh -master http://$JENKINS_SERVICE_HOST:$JENKINS_SERVICE_PORT -tunnel $JENKINS_SLAVE_SERVICE_HOST:$JENKINS_SLAVE_SERVICE_PORT -username jenkins -password jenkins -executors 1"
              ]
            }
          ]
        }
      },
      "labels": {
        "name": "jenkins-slave"
      }
    }
  },
  "labels": {
    "name": "jenkins-slave"
  }
}

The podTemplate section allows the same configuration options as a pod definition. In this case we want to make the Jenkins slave connect automatically to our Jenkins master, instead of relying on Jenkins multicast discovery. To do so we execute the jenkins-slave.sh command with -master parameter to point the slave to the Jenkins master running in Kubernetes. Note that we use the Kubernetes provided environment variables for the Jenkins service definition (JENKINS_SERVICE_HOST and JENKINS_SERVICE_PORT). The image command is overridden to configure the container this way, useful to reuse existing images while taking advantage of the service environment variables. It can be done in pod definitions too.

Create the replicas with kubecfg:

$ ./cluster/kubecfg.sh -c kubernetes-jenkins/replication.json create replicationControllers

Name                Image(s)                            Selector             Replicas
----------          ----------                          ----------           ----------
jenkins-slave       csanchez/jenkins-swarm-slave:1.21   name=jenkins-slave   1

Listing the pods now would show new ones being created, up to the number of replicas defined in the replication controller.

$ ./cluster/kubecfg.sh list pods

Name                                   Image(s)                            Host                    Labels               Status
----------                             ----------                          ----------              ----------           ----------
jenkins                                csanchez/jenkins-swarm:1.565.3.3    10.245.2.3/10.245.2.3   name=jenkins         Running
07651754-4f88-11e4-b01e-0800279696e1   csanchez/jenkins-swarm-slave:1.21   10.245.2.2/10.245.2.2   name=jenkins-slave   Pending

The first time running jenkins-swarm-slave image the minion has to download it from the Docker repository, but after a while, depending on your internet connection, the slaves should automatically connect to the Jenkins server. Going into the server where the slave is started, docker ps has to show the container running and docker logs is useful to debug any problems on container startup.

$ vagrant ssh minion-1 -c "docker ps"

CONTAINER ID        IMAGE                               COMMAND                CREATED              STATUS              PORTS                    NAMES
870665d50f68        csanchez/jenkins-swarm-slave:1.21   "/usr/local/bin/jenk   About a minute ago   Up About a minute                            k8s_jenkins-slave.74f1dda1_07651754-4f88-11e4-b01e-0800279696e1.default.etcd_11cac207-759f-11e4-bfd0-0800279696e1_9495d10e
cc44aa8743f0        kubernetes/pause:go                 "/pause"               About a minute ago   Up About a minute                            k8s_net.dbcb7509_07651754-4f88-11e4-b01e-0800279696e1.default.etcd_11cac207-759f-11e4-bfd0-0800279696e1_4bf086ee
edff0e535a84        google/cadvisor:0.6.2               "/usr/bin/cadvisor"    27 minutes ago       Up 27 minutes                                k8s_cadvisor.b0dae998_cadvisormanifes12uqn2ohido76855gdecd9roadm7l0.default.file_cadvisormanifes12uqn2ohido76855gdecd9roadm7l0_588941b0
b7e23a7b68d0        kubernetes/pause:go                 "/pause"               27 minutes ago       Up 27 minutes       0.0.0.0:4194->8080/tcp   k8s_net.a0f18f6e_cadvisormanifes12uqn2ohido76855gdecd9roadm7l0.default.file_cadvisormanifes12uqn2ohido76855gdecd9roadm7l0_57a2b4de

The replication controller can automatically be resized to any number of desired replicas:

$ ./cluster/kubecfg.sh resize jenkins-slave 2

And again the pods are updated to show where each replica is running.

$ ./cluster/kubecfg.sh list pods
Name                                   Image(s)                            Host                    Labels               Status
----------                             ----------                          ----------              ----------           ----------
07651754-4f88-11e4-b01e-0800279696e1   csanchez/jenkins-swarm-slave:1.21   10.245.2.2/10.245.2.2   name=jenkins-slave   Running
a22e0d59-4f88-11e4-b01e-0800279696e1   csanchez/jenkins-swarm-slave:1.21   10.245.2.3/10.245.2.3   name=jenkins-slave   Pending
jenkins                                csanchez/jenkins-swarm:1.565.3.3    10.245.2.3/10.245.2.3   name=jenkins         Running

220140917 kubernetes-jenkins

Scheduling

Right now the default scheduler is random, but resource based scheduling will be implemented soon. At the time of writing there are several issues opened to add scheduling based on memory and CPU usage. There is also work in progress in an Apache Mesos based scheduler. Apache Mesos is a framework for distributed systems providing APIs for resource management and scheduling across entire datacenter and cloud environments.

Self healing

One of the benefits of using Kubernetes is the automated management and recovery of containers.

If the container running the Jenkins server dies for any reason, for instance because the process being ran crashes, Kubernetes will notice and will create a new container after a few seconds.

$ vagrant ssh minion-2 -c 'docker kill `docker ps | grep csanchez/jenkins-swarm: | sed -e "s/ .*//"`'
51ba3687f4ee


$ ./cluster/kubecfg.sh list pods
Name                                   Image(s)                            Host                    Labels               Status
----------                             ----------                          ----------              ----------           ----------
jenkins                                csanchez/jenkins-swarm:1.565.3.3    10.245.2.3/10.245.2.3   name=jenkins         Failed
07651754-4f88-11e4-b01e-0800279696e1   csanchez/jenkins-swarm-slave:1.21   10.245.2.2/10.245.2.2   name=jenkins-slave   Running
a22e0d59-4f88-11e4-b01e-0800279696e1   csanchez/jenkins-swarm-slave:1.21   10.245.2.3/10.245.2.3   name=jenkins-slave   Running

And some time later, typically no more than a minute…

Name                                   Image(s)                            Host                    Labels               Status
----------                             ----------                          ----------              ----------           ----------
jenkins                                csanchez/jenkins-swarm:1.565.3.3    10.245.2.3/10.245.2.3   name=jenkins         Running
07651754-4f88-11e4-b01e-0800279696e1   csanchez/jenkins-swarm-slave:1.21   10.245.2.2/10.245.2.2   name=jenkins-slave   Running
a22e0d59-4f88-11e4-b01e-0800279696e1   csanchez/jenkins-swarm-slave:1.21   10.245.2.3/10.245.2.3   name=jenkins-slave   Running

Running the Jenkins data dir in a volume we guarantee that the data is kept even after the container dies, so we do not lose any Jenkins jobs or data created. And because Kubernetes is proxying the services in each minion the slaves will reconnect to the new Jenkins server automagically no matter where they run! And exactly the same will happen if any of the slave containers dies, the system will automatically create a new container and thanks to the service discovery it will automatically join the Jenkins server pool.

If something more drastic happens, like a minion dying, Kubernetes does not offer yet the ability to reschedule the containers in the other existing minions, it would just show the pods as Failed.

$ vagrant halt minion-2
==> minion-2: Attempting graceful shutdown of VM...
$ ./cluster/kubecfg.sh list pods
Name                                   Image(s)                            Host                    Labels               Status
----------                             ----------                          ----------              ----------           ----------
jenkins                                csanchez/jenkins-swarm:1.565.3.3    10.245.2.3/10.245.2.3   name=jenkins         Failed
07651754-4f88-11e4-b01e-0800279696e1   csanchez/jenkins-swarm-slave:1.21   10.245.2.2/10.245.2.2   name=jenkins-slave   Running
a22e0d59-4f88-11e4-b01e-0800279696e1   csanchez/jenkins-swarm-slave:1.21   10.245.2.3/10.245.2.3   name=jenkins-slave   Failed

Tearing down

kubecfg offers several commands to stop and delete the replication controllers, pods and services definitions.

To stop the replication controller, setting the number of replicas to 0, and causing the termination of all the Jenkins slaves containers:

$ ./cluster/kubecfg.sh stop jenkins-slave

To delete it:

$ ./cluster/kubecfg.sh rm jenkins-slave

To delete the jenkins server pod, causing the termination of the Jenkins master container:

$ ./cluster/kubecfg.sh delete pods/jenkins

To delete the services:

$ ./cluster/kubecfg.sh delete services/jenkins
$ ./cluster/kubecfg.sh delete services/jenkins-slave

Conclusion

Kubernetes is still a very young project, but highly promising to manage Docker deployments across multiple servers and simplify the execution of long running and distributed Docker containers. By abstracting infrastructure concepts and working on states instead of processes, it provides easy definition of clusters, including self healing capabilities out of the box. In short, Kubernetes makes management of Docker fleets easier.

About the Author

Carlos Sanchez has been working on automation and quality of software development, QA and operations processes for over 10 years, from build tools and continuous integration to deployment automation, DevOps best practices and continuous delivery. He has delivered solutions to Fortune 500 companies, working at several US based startups, most recently MaestroDev, a company he cofounded. Carlos has been a speaker at several conferences around the world, including JavaOne, EclipseCON, ApacheCON, JavaZone, Fosdem or PuppetConf. Very involved in open source, he is a member of the Apache Software Foundation amongst other open source groups, contributing to several projects, such as Apache Maven, Fog or Puppet.

(Via InfoQ.com)

Data Warehouse and Analytics Infrastructure at Viki

At Viki, we use data to power product, marketing and business decisions. We use an in-house analytics dashboard to expose all the data we collect to various teams through simple table and chart based reports. This allows them to monitor all our high level KPIs and metrics regularly.

Data also powers more heavy duty stuff – like our data-driven content recommendation system, or predictive models that help us forecast the value of content we’re looking to license. We’re also constantly looking at data to determine the success of new product features, tweak and improve existing features and even kill stuff that doesn’t work. All of this makes data an integral part of the decision making process at Viki.

To support all these functions, we need a robust infrastructure below it, and that’s our data warehouse and analytics infrastructure.

This post is the first of a series about our data warehouse and analytics infrastructure. In this post we’ll cover the high-level pipeline of the system and goes into details about how we collect and batch-process our data. Do expect a lot of detailed-level discussions.

About Viki: We’re a online TV site, with fan-powered translations in 150+ different languages. To understand more what Viki is, watch this short video (2 minutes).

Part 0: Overview (or TL;DR)

Our analytics infrastructure, following most common sense approach, is broken down into 3 steps:

  • Collect and Store Data
  • Process Data (batch + real-time)
  • Present Data

15724861706_3a81497b37_m

Collect and Store Data

  1. Logs (events) are sent by different clients to a central log collector
  2. Log collector forward events to a Hydration service, here the events get enriched with more time-sensitive information; the results are stored to S3

Batch Processing Data

  1. There is an hourly job that runs to take data from S3, apply further transformations (read: cleaning up bad data) and store the results to our cloud-based Hadoop cluster
  2. We run multiple MapReduce (Hive) jobs to aggregate data from Hadoop and write them to our central analytics database (Postgres)
  3. Another job takes a snapshot of our production databases and restore into our analytics database

Presenting Data

  1. All analytics/reporting-related activities are then done at our master analytics database. The results (meant for report presentation) are then sent to our Reporting DB
  2. We run an internal reporting dashboard app on top of our Reporting DB; this is where end-users log in to see their reports

Real-time Data Processing

  1. The data from Hydration service is also multiplexed and sent to our real-time processing pipeline (using Apache Storm)
  2. At Storm, we write custom job that does real-time aggregation of important metrics. We also write a real-time alerting system to inform ourselves when traffic goes bad

Part 1: Collecting, Pre-processing and Storing Data

aMC5MNF

We use fluentd to receive event logs from different platforms (web, Android, iOS, etc) (through HTTP). We set up a cluster of 2 dedicated servers running multiple fluentd instances inside Docker, load-balanced through HAproxy. When the message hits our endpoint, fluentd then buffers the messages and batch-forward them to our Hydration System. At the moment, our cluster is doing 100M messages a day.

A word about fluentd: It’s a robust open-source log collecting software that has a very healthy and helpful community around it. The core is written in C so it’s fast and scalable, with plugins written in Ruby, making it easy to extend.

What data do we collect? We collect everything that we think is useful for the business: a click event, an ad impression, ad request, a video play, etc. We call each record an event, and it’s stored as a JSON string like this:

{
  "time":1380452846, "event": "video_play",
  "video_id":"1008912v", "user_id":"5298933u",
  "uuid":"80833c5a760597bf1c8339819636df04",
  "app_id":"100004a", "app_ver":"2.9.3.151",
  "device":"iPad", "stream_quality":"720p",
  "ip":"99.232.169.246", "country":"ca",
  "city_name":"Toronto", "region_name":"ON"
}

Pre-processing Data – Hydration Service

cqglpGe

Data Hydration ServiceThe data after collected is sent to a Hydration Service for pre-processing. Here, the message is enriched with time-sensitive information.

For example: When a user watches a video (thus a video_play event sent), we want to know if it’s a free user or a paid user. Since the user could be a free user today and upgrade to paid tomorrow, the only way to correctly attribute the play event to free/paid bucket is to inject that status right right into the message when it’s received. In short, the service translates this:

{ "event":"video_play", "user_id":"1234" }

into this:

{ "event":"video_play", "user_id":"1234", "user_status":"free" }

For non time-sensitive operations (fixing typo, getting country from IP, etc), there is different process for that (discussed below)

Storing to S3

From the Hydration Service, the message is buffered and then stored to S3 – our source of truth. The data is gzip-compressed and stored into hour bucket, making it easy and fast to retrieve them per hour time-period.

Part 2: Batch-processing Data

The processing layer has 2 components, the batch-processing and the real-time processing component.

This section focuses mainly on our batch-processing layer – our main process of transforming data for reporting/presentation. We’ll cover our real-time processing layer in another post.

z1b5mFT

Batch-processing Data Layer

Cleaning Data Before Importing Into Hadoop

Those who have worked with data before know this: Cleaning data takes a lot of time. In fact: Cleaning and preparing data will take most of your time. Not the actual analysis.

What is unclean data (or bad data)? Data that is logged incorrectly. It comes in a lot of different forms. E.g

  • Typo mistake, send ‘clickk’ event instead of ‘click’
  • Clients send event twice, or forgot to send event

When bad data enters your system, it stays there forever, unless you purposely find a way to clean it/take it out

So how do we clean up bad data?

Previously when we receive a record, we write directly to our Hadoop cluster and make a backup to S3. This makes it difficult to correct the bad data, due to the append-only nature of Hadoop.

Now all the data is first stored into S3. And we have hourly process that takes data from S3, apply cleanup/transformations and load them into Hadoop (insert-overwrite).

V3KBoqY

Storing Data, Before and AfterThe process is similar in nature to the hydration process, but this time we look at 1 hour block at a time, rather than per record. This approach has many great benefits:

  • The pipeline is more linear, thus prevents from the threat of data discrepancy (between S3 and Hadoop).
  • The data is not tied down to being stored in Hadoop. If we want to load our data into other data storage, we’d just write another process that transform S3 data and dump somewhere else.
  • When a bad logging happens causing unclean data, we can modify the transformation code and rerun the data from the point of bad logging. Because the process isidempotent, we can perform the reprocessing as many times as we want without double-logging the data.

kYfXPXG

Our S3 to Hadoop Transform and Load ProcessIf you’ve studied this article The Log from the Data Engineering folks at LinkedIn, you’d notice that the approach is very similar (replacing Kafka with S3, and per-message processing with per hour processing). Indeed our paradigm is inspired by The Log architecture. However due to our needs, our system chose S3 because:

  1. When it comes to batch (re)processing, we do it in a time-period manner (eg. process 1 hour of data). Kafka use natural number to order message, thus if we use Kafka we’ll have to build another service to translate [beg_timestamp, end_timestamp) into [beg_index, end_index).
  2. Kafka can only retain up to X days due to the disk-space limitation. As we want the ability to reprocess data further back, employing Kafka means we need another strategy to cater to these cases.

Aggregating Data from Hadoop into Postgres

Once the data got into Hadoop, we’d have these daily aggregation jobs to aggregate data into fewer dimensions and port them into our analytics master database (PostgreSQL)

For example, to aggregate a table of video starts data together with some video and user information, we run this Hive query (MapReduce job):

-- The hadoop's events table would contain 2 fields: time (int), v (json)
SELECT
  SUBSTR(FROM_UNIXTIME(time), 0, 10) AS date_d,
  v['platform'] AS platform,
  v['country'] AS country,
  v['video_id'] AS video_id,
  v['user_id'] AS user_id
  COUNT(1) AS cnt
FROM events
WHERE time >= BEG_TS
  AND time <= END_TS
  AND v['event'] = 'video_start'
GROUP BY 1,2,3,4,5

and load the results into an aggregated.video_starts table in Postgres:

       Table "aggregated.video_starts"
   Column    |          Type          | Modifiers 
-------------+------------------------+-----------
 date_d      | date                   | not null
 platform    | character varying(255) | 
 country     | character(3)           | 
 video_id    | character varying(255) | 
 user_id     | character varying(255) | 
 cnt         | bigint                 | not null

Further querying and reporting of video_starts will be done out of this table. If we need more dimensions, we either rebuild this table with more dimensions, or build a new table from Hadoop.

If it’s a one-time ad-hoc analysis, we’d just run the queries directly against Hadoop.

Table Partitioning:

1LoFj7B

 

Also, we’re making use of Postgres’ Table Inheritance feature to partition our data into multiple monthly tables with a parent table on top of all. Your query just needs to hit the parent table and the engine will know which underlying monthly tables to hit to get your data.

This makes our data very easy to maintain, with small indexes and better rebuild process. We’d have fast (SSD) drives that host the recent tables, and move the older ones to slower (but bigger) drives for semi-archiving purpose.

Centralizing All Data

1Zuq7np

 

We dump our production databases into our master analytics database on a daily basis.

Also, we use a lot of 3rd party vendors (Adroll, DoubleClick, Flurry, GA, etc). For each of these services, we write a process to ping their API and import the data into our master analytics database.

These data, together with the aggregated data from Hadoop, allow us to produce meaningful analysis combining data from multiple sources.

For example, to break down our video starts by different genre, we would write some query that joins data from prod.videos table with aggregated.video_starts table:

-- Video Starts by genre
SELECT V.genre, SUM(cnt) FROM aggregated.video_starts VS
LEFT JOIN prod.videos V ON VS.video_id = V.id
WHERE VS.date_d = '2014-06-01'
GROUP BY 1
ORDER BY 1

The above is made possible because we have both sources of data (event tracking data + production data) in 1 place.

Centralizing data is a very important concept in our pipeline because it makes it simple and pain-free for us to connect, report and corroborate our numbers across many different data sources.

Managing Job Dependencies

We started with simple crontab to schedule our hourly/daily jobs. When the jobs grew complicated, we’d end up with very long crontab:

qHXdXme

Crontab also doesn’t support graph-based job flow (e.g run A and B at the sametime, when both finishes, run C)

rkomAJT

So we looked around for a solution. We considered Chronos (by Airbnb), but their use-case is more complicated than what we needed, plus the need to setup ZooKeeper and all that.

We ended up using Azkaban by LinkedIn, it has everything we need: Crontab with graph-based job flow, it also tell you the runtime history of your job. And when a job flow fails, you can restart them, running only tasks that failed/haven’t run.

It’s pretty awesome.

Making Sure Your Numbers Tie

One of the things I see being less discussed in analytics infrastructure talk/blog is making sure your data don’t drop half-way during transportation, resulting in data inconsistency in different storages.

We have a process that runs after every data transportation, it counts number of records in both source and destination storage and prints errors when they don’t match. These check-total processes sound a little tedious to do, but it proved crucial to our system; it gives us the confidence in the accurary of the numbers we report to management.

Case in point, we had a process that dumps data from Postgres to CSV, then compresses and uploads to S3 and loads them into Amazon Redshift (using COPY command). So technically we have the exact same table in both Postgres and Redshift. One day our analyst pointed out that the data in Redshift is significantly less than in Postgres. Upon investigation, there was bug that cause CSV file to be truncated and thus not fully loaded into Redshift tables. It was because for this particular process we didn’t have the check-totals in place.

Using Ruby as the Scripting Language

When we started we are primarily a Ruby shop, so going ahead with Ruby was a natural choice. “Isn’t it slow?”, you might say. But we use Ruby not to process data (i.e it rarely holds any large amount of data), but to facilitate and coordinate the process.

We have written an entire library to support doing data pipeline in Ruby. For example, we extended pg gem to make it more object-oriented to Postgres. It allows us to do table creation, table hot-swapping, upsert, insert-overwriting, copying tables between databases, etc, all without having to touch SQL code. It has become a nice, productive abstraction on top of SQL and Postgres. Think ORM for data-warehousing purpose.

Example: The below code will create a data.videos_by_genre table holding the result of a simple aggregation query. The process works on a temporary table and eventually it’ll perform a table hot-swap with the main one; this is to avoid any data disruption being made if we would have done it on the main table from the beginning.

columns = [
  {name: 'genre', data_type: 'varchar'},
  {name: 'video_count', data_type: 'int'},
]
indexes = [{columns: ['genre']}]
table = PGx::Table.new 'data.videos_by_genre', columns, indexes

table.with_temp_table do |temp_t|
  temp_t.drop(check_exists: true)
  temp_t.create
  connection.exec <<-SQL.strip_heredoc
    INSERT INTO #{temp_t.qualified_name}
    SELECT genre, COUNT(1) FROM prod.videos
    GROUP BY 1
  SQL

  temp_t.create_indexes
  table.hotswap
end

(the above example could also be done using a MATERIALIZED VIEW btw)

Having this set of libraries has proven very critical to our data pipeline process, since it allows us to write extensible and maintainable code that perform all sort of data transformations.

Technology

We rely mostly on free and open-source technologies. Our stack is:

  • fluentd (open-source) for collecting logs
  • Cloud-based Hadoop + Hive (TreasureData – 3rd party vendor)
  • PostgreSQL + Amazon Redshift as central analytics database
  • Ruby as scripting language
  • NodeJS (worker process) with Redis (caching)
  • Azkaban (job flow management)
  • Kestrel (message queue)
  • Apache Storm (real-time stream processing)
  • Docker for automated deployment
  • HAproxy (load balancing)
  • and lots of SQL (huge thanks to Postgres, one of the best relational databases ever made)

Conclusion

The above post went through the overall architecture of our analytics system. It also went into details the Collecting layer and Batch-processing layer. In later blog posts we’ll cover the remaining, specifically:

  • Our Data Presentation layer. And how Stuti, our analyst, built our funnel analysis, fan-in and fan-out tools, all with SQL. And it updates automatically (very funnel. wow!)
  • Our Real-time traffic alert/monitoring system (using Apache Storm)
  • turing: our feature roll-out, A/B testing framework

( Via Engineering. Viki.com )

Nifty Architecture Tricks From Wix – Building A Publishing Platform At Scale

15724861706_3a81497b37_m

Wix operates websites in the long tale. As a HTML5 based WYSIWYG web publishing platform, they have created over 54 million websites, most of which receive under 100 page views per day. So traditional caching strategies don’t apply, yet it only takes four web servers to handle all the traffic. That takes some smart work.

Aviran Mordo, Head of Back-End Engineering at Wix, has described their solution in an excellent talk: Wix Architecture at Scale. What they’ve developed is in the best tradition of scaling is specialization. They’ve carefully analyzed their system and figured out how to meet their aggressive high availability and high performance goals in some most interesting ways.

Wix uses multiple datacenters and clouds. Something I haven’t seen before is that they replicate data to multiple datacenters, to Google Compute Engine, and to Amazon. And they have fallback strategies between them in case of failure.

Wix doesn’t use transactions. Instead, all data is immutable and they use a simple eventual consistency strategy that perfectly matches their use case.

Wix doesn’t cache (as in a big caching layer). Instead, they pay great attention to optimizing the rendering path so that every page displays in under 100ms.

Wix started small, with a monolithic architecture, and has consciously moved to a service architecture using a very deliberate process for identifying services that can help anyone thinking about the same move.

This is not your traditional LAMP stack or native cloud anything. Wix is a little different and there’s something here you can learn from. Let’s see how they do it…

Stats

  • 54+ million websites, 1 million new websites per month.

  • 800+ terabytes of static data, 1.5 terabytes of new files per day

  • 3 data centers + 2 clouds (Google, Amazon)

  • 300 servers

  • 700 million HTTP requests per day

  • 600 people total, 200 people in R&D

  • About 50 services.

  • 4 public servers are needed to serve 45 million websites

Platform

  • MySQL

  • Google and Amazon clouds

  • CDN

  • Chef

Evolution

  • Simple initial monolithic architecture. Started with one app server. That’s the simplest way to get started. Make quick changes and deploy. It gets you to a particular point.

    • Tomcat, Hibernate, custom web framework

    • Used stateful logins.

    • Disregarded any notion of performance and scaling.

  • Fast forward two years.

    • Still one monolithic server that did everything.

    • At a certain scale of developers and customers it held them back.

    • Problems with dependencies between features. Changes in one place caused deployment of the whole system. Failure in unrelated areas caused system wide downtime.

  • Time to break the system apart.

    • Went with a services approach, but it’s not that easy. How are you going to break functionality apart and into services?

    • Looked at what users are doing in the system and identified three main parts: edit websites, view sites created by Wix, serving media.

    • Editing web sites includes data validation of data from the server, security and authentication, data consistency, and lots of data modification requests.

    • Once finished with the web site users will view it. There are 10x more viewers than editors. So the concerns are now:

      • high availability. HA is the most important feature because it’s the user’s business.

      • high performance

      • high traffic volume

      • the long tail. There are a lot of websites, but they are very small. Every site gets maybe 10 or 100 page views a day. The long tail make caching not the go to scalability strategy. Caching becomes very inefficient.

    • Media serving is the next big service. Includes HTML, javascript, css, images. Needed a way to serve files the 800TB of data under a high volume of requests. The win is static content is highly cacheable.

    • The new system looks like a networking layer that sits below three segment services: editor segment (anything that edits data), media segment (handles static files, read-only), public segment (first place a file is viewed, read-only).

Guidelines For How To Build Services

  • Each service has its own database and only one service can write to a database.

  • Access to a database is only through service APIs. This supports a separation of concerns and hiding the data model from other services.

  • For performance reasons read-only access is granted to other services, but only one service can write. (yes, this contradicts what was said before)

  • Services are stateless. This makes horizontal scaling easy. Just add more servers.

  • No transactions. With the exception of billing/financial transactions, all other services do not use transactions. The idea is to increase database performance by removing transaction overhead. This makes you think about how the data is modeled to have logical transactions, avoiding inconsistent states, without using database transactions.

  • When designing a new service caching is not part of the architecture. First, make a service as performant as possible, then deploy to production, see how it performs, only then, if there are performance issues, and you can’t optimize the code (or other layers), only then add caching.

Editor Segment

  • Editor server must handle lots of files.

  • Data stored as immutable JSON pages (~2.5 million per day) in MySQL.

  • MySQL is a great key-value store. Key is based on a hash function of the file so the key is immutable. Accessing MySQL by primary key is very fast and efficient.

  • Scalability is about tradeoffs. What tradeoffs are we going to make? Didn’t want to use NoSQL because they sacrifice consistency and most developers do not know how to deal with that. So stick with MySQL.

  • Active database. Found after a site has been built only 6% were still being updated. Given this then these active sites can be stored in one database that is really fast and relatively small in terms of storage (2TB).

  • Archive database. All the stale site data, for sites that are infrequently accessed, is moved over into another database that is relatively slow, but has huge amounts of storage. After three months data is pushed to this database is accesses are low. (one could argue this is an implicit caching strategy).

  • Gives a lot of breathing room to grow. The large archive database is slow, but it doesn’t matter because the data isn’t used that often. On first access the data comes from the archive database, but then it is moved to the active database so later accesses are fast.

High Availability For Editor Segment

  • With a lot of data it’s hard to provide high availability for everything. So look at the critical path, which for a website is the content of the website. If a widget has problems most of the website will still work. Invested a lot in protecting the critical path.

  • Protect against database crashes. Want to recover quickly. Replicate databases and failover to the secondary database.

  • Protect against data corruption and data poisoning.  Doesn’t have to be malicious, a bug is enough to spoil the barrel. All data is immutable. Revisions are stored for everything. Worst case  if corruption can’t be fixed is to revert to version where the data was fine.

  • Protect against unavailability. A website has to work all the time. This drove an investment in replicating data across different geographical locations and multiple clouds. This makes the system very resilient.

    • Clicking save on a website editing session sends a JSON file to the editor server.

    • The server sends the page to the active MySQL server which is replicated to another datacenter.

    • After the page is saved to locally, an asynchronous process is kicked upload the data to a static grid, which is the Media Segment.

    • After data is uploaded to the static grid, a notification is sent to a archive service running on the Google Compute Engine. The archive goes to the grid, downloads a page, and stores a copy on the Google cloud.

    • Then a notification is sent back to the editor saying the page was saved to GCE.

    • Another copy is saved to Amazon from GCE.

    • One the final notification is received it means there are three copies of the current revision of data: one in the database, the static grid, and on GCE.

    • For the current revision there are three copies. For old revision there two revisions (static grid, GCE).

    • The process is self-healing. If there’s a failure the next time a user updates their website everything that wasn’t uploaded will be uploaded again.

    • Orphan files are garbage collected.

Modeling Data With No Database Transactions

  • Don’t want a situation where a user edit two pages and only one page is saved in the database, which is an inconsistent state.

  • Take all the JSON files and stick them in the database one after the other. When all the files are saved another save command is issued which contains a manifest of all the IDs (which is hash of the content which is the file name on the static server) of the saved pages that were uploaded to the static servers.

Media Segment

  • Stores lots of files. 800TB of user media files, 3M files uploaded daily, and 500M metadata records.

  • Images are modified. They are resized for different devices and sharpened. Watermarks can be inserted and there’s also audio format conversion.

  • Built an eventually consistent distributed file system that is multi datacenter aware with automatic fallback across DCs. This is before Amazon.

  • A pain to run. 32 servers, doubling the number every 9 months.

  • Plan to push stuff to the cloud to help scale.

  • Vendor lock-in is a myth. It’s all APIs. Just change the implementation and you can move to different clouds in weeks.

  • What really locks you down is data. Moving 800TB of data to a different cloud is really hard.

  • They broke Google Compute Engine when they moved all their data into GCE. They reached the limits of the Google cloud. After some changes by Google it now works.

  • Files are immutable so the are highly cacheable.

  • Image requests first go to a CDN. If the image isn’t in the CDN the request goes to their primary datacenter in Austin. If the image isn’t in Austin the request then goes to Google Cloud. If it’s not in Google cloud it goes to a datacenter in Tampa.

Public Segment

  • Resolve URLs (45 million of them), dispatch to the appropriate renderer, and then render into HTML, sitemap XML, or robots TXT, etc.

  • Public SLA is that response time is < 100ms at peak traffic. Websites have to be available, but also fast. Remember, no caching.

  • When a user clicks publish after editing a page, the manifest, which contains references to pages, are pushed to Public. The routing table is also published.

  • Minimize out-of-service hops. Requires 1 database call to resolve the route. 1 RPC call to dispatch the request to the renderer. 1 database call to get the site manifest.

  • Lookup tables are cached in memory and are updated every 5 minutes.

  • Data is not stored in the same format as it is for the editor. It is stored in a denormalized format, optimized for read by primary key. Everything that is needed is returned in a single request.

  • Minimize business logic. The data is denormalized and precalculated. When you handle large scale every operation, every millisecond you add, it’s times 45 million, so every operation that happens on the public server has to be justified.

  • Page rendering.

    • The html returned by the public server is bootstrap html. It’s a shell with JavaScript imports and JSON data with references to site manifest and dynamic data.

    • Rendering is offloaded to the client. Laptops and mobile devices are very fast and can handle the rendering.

    • JSON was chosen because it’s easy to parse and compressible.

    • It’s easier to fix bugs on the client. Just redeploy new client code. When rendering is done on the server the html will be cached, so fixing a bug requires re-rendering millions of websites again.

High Availability For Public Segment

  • Goal is to be always available, but stuff happens.

  • On a good day: a browser makes a request, the request goes to a datacenter, through a load balancer, goes to a public server, resolves the route, goes to the renderer, the html goes back to the browser, and the browser runs the javascript. The javascript fetches all media files and the JSON data and renders a very beautiful web site. The browser then make a request to the Archive service. The Archive service replays the request in the same way the browser does and stores the data in a cache.

  • On a bad day a datacenter is lost, which did happen. All the UPSs died and the datacenter was down. The DNS was changed and then all the requests went to the secondary datacenter.

  • On a bad day Public is lost. This happened once when a load balancer got half of a configuration so all the Public servers were gone. Or a bad version can be deployed that starts returning errors. Custom code in the load balancer handles this problem by routing to the Archive service to fetch the cached if the Public servers are not available. This approach meant customers were not affected when Public went down, even though the system was reverberating with alarms at the time.

  • On a bad day the Internet sucks. The browser makes a request, goes to the datacenter, goes to the load balancer, gets the html back. Now the JavaScript code has to fetch all the pages and JSON data. It goes to the CDN, it goes to the static grid and fetches all the JSON files to render the site. In these processes Internet problems can prevent files from being returned. Code in JavaScript says if you can’t get to the primary location, try and get it from the archive service, if that fails try the editor database.

Lessons Learned

  • Identify your critical path and concerns. Think through how your product works. Develop usage scenarios. Focus your efforts on these as they give the biggest bang for the buck.

  • Go multi-datacenter and multi-cloud. Build redundancy on the critical path (for availability).

  • De-normalize data and Minimize out-of-process hops (for performance). Precaluclate and do everything possible to minimize network chatter.

  • Take advantage of client’s CPU power. It saves on your server count and it’s also easier to fix bugs in the client.

  • Start small, get it done, then figure out where to go next. Wix did what they needed to do to get their product working. Then they methodically moved to a sophisticated services architecture.

  • The long tail requires a different approach. Rather than cache everything Wix chose to optimize the heck out of the render path and keep data in both an active and archive databases.

  • Go immutable. Immutability has far reaching consequences for an architecture. It affects everything from the client through the back-end. It’s an elegant solution to a lot of problems.

  • Vendor lock-in is a myth. It’s all APIs. Just change the implementation and you can move to different clouds in weeks.

  • What really locks you down is data. Moving lots of data to a different cloud is really hard.

( Via HighScalability.com )

For Understanding Microservices

“Microservices” – yet another new term on the crowded streets of software architecture. Although our natural inclination is to pass such things by with a contemptuous glance, this bit of terminology describes a style of software systems that we are finding more and more appealing. We’ve seen many projects use this style in the last few years, and results so far have been positive, so much so that for many of our colleagues this is becoming the default style for building enterprise applications. Sadly, however, there’s not much information that outlines what the microservice style is and how to do it.

In short, the microservice architectural style [1] is an approach to developing a single application as a suite of small services, each running in its own process and communicating with lightweight mechanisms, often an HTTP resource API. These services are built around business capabilities and independently deployable by fully automated deployment machinery. There is a bare mininum of centralized management of these services, which may be written in different programming languages and use different data storage technologies.

To start explaining the microservice style it’s useful to compare it to the monolithic style: a monolithic application built as a single unit. Enterprise Applications are often built in three main parts: a client-side user interface (consisting of HTML pages and javascript running in a browser on the user’s machine) a database (consisting of many tables inserted into a common, and usually relational, database management system), and a server-side application. The server-side application will handle HTTP requests, execute domain logic, retrieve and update data from the database, and select and populate HTML views to be sent to the browser. This server-side application is a monolith – a single logical executable[2]. Any changes to the system involve building and deploying a new version of the server-side application.

Such a monolithic server is a natural way to approach building such a system. All your logic for handling a request runs in a single process, allowing you to use the basic features of your language to divide up the application into classes, functions, and namespaces. With some care, you can run and test the application on a developer’s laptop, and use a deployment pipeline to ensure that changes are properly tested and deployed into production. You can horizontally scale the monolith by running many instances behind a load-balancer.

Monolithic applications can be successful, but increasingly people are feeling frustrations with them – especially as more applications are being deployed to the cloud . Change cycles are tied together – a change made to a small part of the application, requires the entire monolith to be rebuilt and deployed. Over time it’s often hard to keep a good modular structure, making it harder to keep changes that ought to only affect one module within that module. Scaling requires scaling of the entire application rather than parts of it that require greater resource.

sketch

Figure 1: Monoliths and Microservices

These frustrations have led to the microservice architectural style: building applications as suites of services. As well as the fact that services are independently deployable and scalable, each service also provides a firm module boundary, even allowing for different services to be written in different programming languages. They can also be managed by different teams .

We do not claim that the microservice style is novel or innovative, its roots go back at least to the design principles of Unix. But we do think that not enough people consider a microservice architecture and that many software developments would be better off if they used it.


Characteristics of a Microservice Architecture

We cannot say there is a formal definition of the microservices architectural style, but we can attempt to describe what we see as common characteristics for architectures that fit the label. As with any definition that outlines common characteristics, not all microservice architectures have all the characteristics, but we do expect that most microservice architectures exhibit most characteristics. While we authors have been active members of this rather loose community, our intention is to attempt a description of what we see in our own work and in similar efforts by teams we know of. In particular we are not laying down some definition to conform to.

Componentization via Services

For as long as we’ve been involved in the software industry, there’s been a desire to build systems by plugging together components, much in the way we see things are made in the physical world. During the last couple of decades we’ve seen considerable progress with large compendiums of common libraries that are part of most language platforms.

When talking about components we run into the difficult definition of what makes a component. Our definition is that a component is a unit of software that is independently replaceable and upgradeable.

Microservice architectures will use libraries, but their primary way of componentizing their own software is by breaking down into services. We define libraries as components that are linked into a program and called using in-memory function calls, while services are out-of-process components who communicate with a mechanism such as a web service request, or remote procedure call. (This is a different concept to that of a service object in many OO programs [3].)

One main reason for using services as components (rather than libraries) is that services are independently deployable. If you have an application [4] that consists of a multiple libraries in a single process, a change to any single component results in having to redeploy the entire application. But if that application is decomposed into multiple services, you can expect many single service changes to only require that service to be redeployed. That’s not an absolute, some changes will change service interfaces resulting in some coordination, but the aim of a good microservice architecture is to minimize these through cohesive service boundaries and evolution mechanisms in the service contracts.

Another consequence of using services as components is a more explicit component interface. Most languages do not have a good mechanism for defining an explicit Published Interface. Often it’s only documentation and discipline that prevents clients breaking a component’s encapsulation, leading to overly-tight coupling between components. Services make it easier to avoid this by using explicit remote call mechanisms.

Using services like this does have downsides. Remote calls are more expensive than in-process calls, and thus remote APIs need to be coarser-grained, which is often more awkward to use. If you need to change the allocation of responsibilities between components, such movements of behavior are harder to do when you’re crossing process boundaries.

At a first approximation, we can observe that services map to runtime processes, but that is only a first approximation. A service may consist of multiple processes that will always be developed and deployed together, such as an application process and a database that’s only used by that service.

Organized around Business Capabilities

When looking to split a large application into parts, often management focuses on the technology layer, leading to UI teams, server-side logic teams, and database teams. When teams are separated along these lines, even simple changes can lead to a cross-team project taking time and budgetary approval. A smart team will optimise around this and plump for the lesser of two evils – just force the logic into whichever application they have access to. Logic everywhere in other words. This is an example of Conway’s Law[5]in action.

Any organization that designs a system (defined broadly) will produce a design whose structure is a copy of the organization’s communication structure.

– Melvyn Conway, 1967

conways-law

Figure 2: Conway’s Law in action

The microservice approach to division is different, splitting up into services organized around business capability. Such services take a broad-stack implementation of software for that business area, including user-interface, persistant storage, and any external collaborations. Consequently the teams are cross-functional, including the full range of skills required for the development: user-experience, database, and project management.

PreferFunctionalStaffOrganization

Figure 3: Service boundaries reinforced by team boundaries

One company organised in this way is www.comparethemarket.com. Cross functional teams are responsible for building and operating each product and each product is split out into a number of individual services communicating via a message bus.

Large monolithic applications can always be modularized around business capabilities too, although that’s not the common case. Certainly we would urge a large team building a monolithic application to divide itself along business lines. The main issue we have seen here, is that they tend to be organised around too many contexts. If the monolith spans many of these modular boundaries it can be difficult for individual members of a team to fit them into their short-term memory. Additionally we see that the modular lines require a great deal of discipline to enforce. The necessarily more explicit separation required by service components makes it easier to keep the team boundaries clear.

Products not Projects

Most application development efforts that we see use a project model: where the aim is to deliver some piece of software which is then considered to be completed. On completion the software is handed over to a maintenance organization and the project team that built it is disbanded.

Microservice proponents tend to avoid this model, preferring instead the notion that a team should own a product over its full lifetime. A common inspiration for this is Amazon’s notion of “you build, you run it” where a development team takes full responsibility for the software in production. This brings developers into day-to-day contact with how their software behaves in production and increases contact with their users, as they have to take on at least some of the support burden.

The product mentality, ties in with the linkage to business capabilities. Rather than looking at the software as a set of functionality to be completed, there is an on-going relationship where the question is how can software assist its users to enhance the business capability.

There’s no reason why this same approach can’t be taken with monolithic applications, but the smaller granularity of services can make it easier to create the personal relationships between service developers and their users.

Smart endpoints and dumb pipes

When building communication structures between different processes, we’ve seen many products and approaches that stress putting significant smarts into the communication mechanism itself. A good example of this is the Enterprise Service Bus (ESB), where ESB products often include sophisticated facilities for message routing, choreography, transformation, and applying business rules.

The microservice community favours an alternative approach: smart endpoints and dumb pipes. Applications built from microservices aim to be as decoupled and as cohesive as possible – they own their own domain logic and act more as filters in the classical Unix sense – receiving a request, applying logic as appropriate and producing a response. These are choreographed using simple RESTish protocols rather than complex protocols such as WS-Choreography or BPEL or orchestration by a central tool.

The two protocols used most commonly are HTTP request-response with resource API’s and lightweight messaging[6]. The best expression of the first is

Be of the web, not behind the web

– Ian Robinson

Microservice teams use the principles and protocols that the world wide web (and to a large extent, Unix) is built on. Often used resources can be cached with very little effort on the part of developers or operations folk.

The second approach in common use is messaging over a lightweight message bus. The infrastructure chosen is typically dumb (dumb as in acts as a message router only) – simple implementations such as RabbitMQ or ZeroMQ don’t do much more than provide a reliable asynchronous fabric – the smarts still live in the end points that are producing and consuming messages; in the services.

In a monolith, the components are executing in-process and communication between them is via either method invocation or function call. The biggest issue in changing a monolith into microservices lies in changing the communication pattern. A naive conversion from in-memory method calls to RPC leads to chatty communications which don’t perform well. Instead you need to replace the fine-grained communication with a coarser -grained approach.

Decentralized Governance

One of the consequences of centralised governance is the tendency to standardise on single technology platforms. Experience shows that this approach is constricting – not every problem is a nail and not every solution a hammer. We prefer using the right tool for the job and while monolithic applications can take advantage of different languages to a certain extent, it isn’t that common.

Splitting the monolith’s components out into services we have a choice when building each of them. You want to use Node.js to standup a simple reports page? Go for it. C++ for a particularly gnarly near-real-time component? Fine. You want to swap in a different flavour of database that better suits the read behaviour of one component? We have the technology to rebuild him.

Of course, just because you can do something, doesn’t mean you should- but partitioning your system in this way means you have the option.

Teams building microservices prefer a different approach to standards too. Rather than use a set of defined standards written down somewhere on paper they prefer the idea of producing useful tools that other developers can use to solve similar problems to the ones they are facing. These tools are usually harvested from implementations and shared with a wider group, sometimes, but not exclusively using an internal open source model. Now that git and github have become the de facto version control system of choice, open source practices are becoming more and more common in-house .

Netflix is a good example of an organisation that follows this philosophy. Sharing useful and, above all, battle-tested code as libraries encourages other developers to solve similar problems in similar ways yet leaves the door open to picking a different approach if required. Shared libraries tend to be focused on common problems of data storage, inter-process communication and as we discuss further below, infrastructure automation.

For the microservice community, overheads are particularly unattractive. That isn’t to say that the community doesn’t value service contracts. Quite the opposite, since there tend to be many more of them. It’s just that they are looking at different ways of managing those contracts. Patterns like Tolerant Reader and Consumer-Driven Contracts are often applied to microservices. These aid service contracts in evolving independently. Executing consumer driven contracts as part of your build increases confidence and provides fast feedback on whether your services are functioning. Indeed we know of a team in Australia who drive the build of new services with consumer driven contracts. They use simple tools that allow them to define the contract for a service. This becomes part of the automated build before code for the new service is even written. The service is then built out only to the point where it satisfies the contract – an elegant approach to avoid the ‘YAGNI’[9] dilemma when building new software. These techniques and the tooling growing up around them, limit the need for central contract management by decreasing the temporal coupling between services.

Perhaps the apogee of decentralised governance is the build it / run it ethos popularised by Amazon. Teams are responsible for all aspects of the software they build including operating the software 24/7. Devolution of this level of responsibility is definitely not the norm but we do see more and more companies pushing responsibility to the development teams. Netflix is another organisation that has adopted this ethos[11]. Being woken up at 3am every night by your pager is certainly a powerful incentive to focus on quality when writing your code. These ideas are about as far away from the traditional centralized governance model as it is possible to be.

Decentralized Data Management

Decentralization of data management presents in a number of different ways. At the most abstract level, it means that the conceptual model of the world will differ between systems. This is a common issue when integrating across a large enterprise, the sales view of a customer will differ from the support view. Some things that are called customers in the sales view may not appear at all in the support view. Those that do may have different attributes and (worse) common attributes with subtly different semantics.

This issue is common between applications, but can also occur withinapplications, particular when that application is divided into separate components. A useful way of thinking about this is the Domain-Driven Design notion of Bounded Context. DDD divides a complex domain up into multiple bounded contexts and maps out the relationships between them. This process is useful for both monolithic and microservice architectures, but there is a natural correlation between service and context boundaries that helps clarify, and as we describe in the section on business capabilities, reinforce the separations.

As well as decentralizing decisions about conceptual models, microservices also decentralize data storage decisions. While monolithic applications prefer a single logical database for persistant data, enterprises often prefer a single database across a range of applications – many of these decisions driven through vendor’s commercial models around licensing. Microservices prefer letting each service manage its own database, either different instances of the same database technology, or entirely different database systems – an approach called Polyglot Persistence. You can use polyglot persistence in a monolith, but it appears more frequently with microservices.

decentralised-data

Decentralizing responsibility for data across microservices has implications for managing updates. The common approach to dealing with updates has been to use transactions to guarantee consistency when updating multiple resources. This approach is often used within monoliths.

Using transactions like this helps with consistency, but imposes significant temporal coupling, which is problematic across multiple services. Distributed transactions are notoriously difficult to implement and and as a consequence microservice architectures emphasize transactionless coordination between services, with explicit recognition that consistency may only be eventual consistency and problems are dealt with by compensating operations.

Choosing to manage inconsistencies in this way is a new challenge for many development teams, but it is one that often matches business practice. Often businesses handle a degree of inconsistency in order to respond quickly to demand, while having some kind of reversal process to deal with mistakes. The trade-off is worth it as long as the cost of fixing mistakes is less than the cost of lost business under greater consistency.

Infrastructure Automation

Infrastructure automation techniques have evolved enormously over the last few years – the evolution of the cloud and AWS in particular has reduced the operational complexity of building, deploying and operating microservices.

Many of the products or systems being build with microservices are being built by teams with extensive experience of Continuous Delivery and it’s precursor, Continuous Integration. Teams building software this way make extensive use of infrastructure automation techniques. This is illustrated in the build pipeline shown below.

basic-pipeline

Figure 5: basic build pipeline

Since this isn’t an article on Continuous Delivery we will call attention to just a couple of key features here. We want as much confidence as possible that our software is working, so we run lots of automated tests. Promotion of working software ‘up’ the pipeline means we automate deployment to each new environment.

A monolithic application will be built, tested and pushed through these environments quite happlily. It turns out that once you have invested in automating the path to production for a monolith, then deploying more applications doesn’t seem so scary any more. Remember, one of the aims of CD is to make deployment boring, so whether its one or three applications, as long as its still boring it doesn’t matter[12].

Another area where we see teams using extensive infrastructure automation is when managing microservices in production. In contrast to our assertion above that as long as deployment is boring there isn’t that much difference between monoliths and microservices, the operational landscape for each can be strikingly different.

micro-deployment

Figure 6: Module deployment often differs

Design for failure

A consequence of using services as components, is that applications need to be designed so that they can tolerate the failure of services. Any service call could fail due to unavailability of the supplier, the client has to respond to this as gracefully as possible. This is a disadvantage compared to a monolithic design as it introduces additional complexity to handle it. The consequence is that microservice teams constantly reflect on how service failures affect the user experience. Netflix’s Simian Army induces failures of services and even datacenters during the working day to test both the application’s resilience and monitoring.

This kind of automated testing in production would be enough to give most operation groups the kind of shivers usually preceding a week off work. This isn’t to say that monolithic architectural styles aren’t capable of sophisticated monitoring setups – it’s just less common in our experience.

Since services can fail at any time, it’s important to be able to detect the failures quickly and, if possible, automatically restore service. Microservice applications put a lot of emphasis on real-time monitoring of the application, checking both architectural elements (how many requests per second is the database getting) and business relevant metrics (such as how many orders per minute are received). Semantic monitoring can provide an early warning system of something going wrong that triggers development teams to follow up and investigate.

This is particularly important to a microservices architecture because the microservice preference towards choreography and event collaboration leads to emergent behavior. While many pundits praise the value of serendipitous emergence, the truth is that emergent behavior can sometimes be a bad thing. Monitoring is vital to spot bad emergent behavior quickly so it can be fixed.

Monoliths can be built to be as transparent as a microservice – in fact, they should be. The difference is that you absolutely need to know when services running in different processes are disconnected. With libraries within the same process this kind of transparency is less likely to be useful.

Microservice teams would expect to see sophisticated monitoring and logging setups for each individual service such as dashboards showing up/down status and a variety of operational and business relevant metrics. Details on circuit breaker status, current throughput and latency are other examples we often encounter in the wild.

Evolutionary Design

Microservice practitioners, usually have come from an evolutionary design background and see service decomposition as a further tool to enable application developers to control changes in their application without slowing down change. Change control doesn’t necessarily mean change reduction – with the right attitudes and tools you can make frequent, fast, and well-controlled changes to software.

Whenever you try to break a software system into components, you’re faced with the decision of how to divide up the pieces – what are the principles on which we decide to slice up our application? The key property of a component is the notion of independent replacement and upgradeability[13] – which implies we look for points where we can imagine rewriting a component without affecting its collaborators. Indeed many microservice groups take this further by explicitly expecting many services to be scrapped rather than evolved in the longer term.

The Guardian website is a good example of an application that was designed and built as a monolith, but has been evolving in a microservice direction. The monolith still is the core of the website, but they prefer to add new features by building microservices that use the monolith’s API. This approach is particularly handy for features that are inherently temporary, such as specialized pages to handle a sporting event. Such a part of the website can quickly be put together using rapid development languages, and removed once the event is over. We’ve seen similar approaches at a financial institution where new services are added for a market opportunity and discarded after a few months or even weeks.

This emphasis on replaceability is a special case of a more general principle of modular design, which is to drive modularity through the pattern of change [14]. You want to keep things that change at the same time in the same module. Parts of a system that change rarely should be in different services to those that are currently undergoing lots of churn. If you find yourself repeatedly changing two services together, that’s a sign that they should be merged.

Putting components into services adds an opportunity for more granular release planning. With a monolith any changes require a full build and deployment of the entire application. With microservices, however, you only need to redeploy the service(s) you modified. This can simplify and speed up the release process. The downside is that you have to worry about changes to one service breaking its consumers. The traditional integration approach is to try to deal with this problem using versioning, but the preference in the microservice world is to only use versioning as a last resort. We can avoid a lot of versioning by designing services to be as tolerant as possible to changes in their suppliers.


Are Microservices the Future?

Our main aim in writing this article is to explain the major ideas and principles of microservices. By taking the time to do this we clearly think that the microservices architectural style is an important idea – one worth serious consideration for enterprise applications. We have recently built several systems using the style and know of others who have used and favor this approach.

Those we know about who are in some way pioneering the architectural style include Amazon, Netflix, The Guardian, the UK Government Digital Service, realestate.com.au, Forward and comparethemarket.com. The conference circuit in 2013 was full of examples of companies that are moving to something that would class as microservices – including Travis CI. In addition there are plenty of organizations that have long been doing what we would class as microservices, but without ever using the name. (Often this is labelled as SOA – although, as we’ve said, SOA comes in many contradictory forms. [15])

Despite these positive experiences, however, we aren’t arguing that we are certain that microservices are the future direction for software architectures. While our experiences so far are positive compared to monolithic applications, we’re conscious of the fact that not enough time has passed for us to make a full judgement.

Often the true consequences of your architectural decisions are only evident several years after you made them. We have seen projects where a good team, with a strong desire for modularity, has built a monolithic architecture that has decayed over the years. Many people believe that such decay is less likely with microservices, since the service boundaries are explicit and hard to patch around. Yet until we see enough systems with enough age, we can’t truly assess how microservice architectures mature.

There are certainly reasons why one might expect microservices to mature poorly. In any effort at componentization, success depends on how well the software fits into components. It’s hard to figure out exactly where the component boundaries should lie. Evolutionary design recognizes the difficulties of getting boundaries right and thus the importance of it being easy to refactor them. But when your components are services with remote communications, then refactoring is much harder than with in-process libraries. Moving code is difficult across service boundaries, any interface changes need to be coordinated between participants, layers of backwards compatibility need to be added, and testing is made more complicated.

Another issue is If the components do not compose cleanly, then all you are doing is shifting complexity from inside a component to the connections between components. Not just does this just move complexity around, it moves it to a place that’s less explicit and harder to control. It’s easy to think things are better when you are looking at the inside of a small, simple component, while missing messy connections between services.

Finally, there is the factor of team skill. New techniques tend to be adopted by more skillful teams. But a technique that is more effective for a more skillful team isn’t necessarily going to work for less skillful teams. We’ve seen plenty of cases of less skillful teams building messy monolithic architectures, but it takes time to see what happens when this kind of mess occurs with microservices. A poor team will always create a poor system – it’s very hard to tell if microservices reduce the mess in this case or make it worse.

One reasonable argument we’ve heard is that you shouldn’t start with a microservices architecture. Instead begin with a monolith, keep it modular, and split it into microservices once the monolith becomes a problem. (Although this advice isn’t ideal, since a good in-process interface is usually not a good service interface.)

So we write this with cautious optimism. So far, we’ve seen enough about the microservice style to feel that it can be a worthwhile road to tread. We can’t say for sure where we’ll end up, but one of the challenges of software development is that you can only make decisions based on the imperfect information that you currently have to hand.

( Via martinfowler.com )

 

Announcing Kylin: Extreme OLAP Engine for Big Data

We (from Ebay) are very excited to announce that eBay has released to the open-source community our distributed analytics engine: Kylin (http://kylin.io). Designed to accelerate analytics on Hadoop and allow the use of SQL-compatible tools, Kylin provides a SQL interface and multi-dimensional analysis (OLAP) on Hadoop to support extremely large datasets.

Kylin is currently used in production by various business units at eBay. In addition to open-sourcing Kylin, we are proposing Kylin as an Apache Incubator project.

Background

The challenge faced at eBay is that our data volume has become bigger while our user base has become more diverse. Our users – for example, in analytics and business units – consistently ask for minimal latency but want to continue using their favorite tools, such as Tableau and Excel.

So, we worked closely with our internal analytics community and outlined requirements for a successful product at eBay:

  1. Sub-second query latency on billions of rows
  2. ANSI-standard SQL availability for those using SQL-compatible tools
  3. Full OLAP capability to offer advanced functionality
  4. Support for high cardinality and very large dimensions
  5. High concurrency for thousands of users
  6. Distributed and scale-out architecture for analysis in the TB to PB size range

We quickly realized nothing met our exact requirements externally – especially in the open-source Hadoop community. To meet our emergent business needs, we decided to build a platform from scratch. With an excellent team and several pilot customers, we have been able to bring the Kylin platform into production as well as open-source it.

Feature highlights

Kylin is a platform offering the following features for big data analytics:

  • Extremely fast OLAP engine at scale: Kylin is designed to reduce query latency on Hadoop for 10+ billion rows of data.
  • ANSI SQL on Hadoop: Kylin supports most ANSI SQL query functions in its ANSI SQL on Hadoop interface.
  • Interactive query capability: Users can interact with Hadoop data via Kylin at sub-second latency – better than Hive queries for the same dataset.
  • MOLAP cube query serving on billions of rows: Users can define a data model and pre-build in Kylin with more than 10+ billions of raw data records.
  • Seamless integration with BI Tools: Kylin currently offers integration with business intelligence tools such as Tableau and third-party applications.
  • Open-source ODBC driver: Kylin’s ODBC driver is built from scratch and works very well with Tableau. We have open-sourced the driver to the community as well.
  • Other highlights: 
  • Job management and monitoring
  • Compression and encoding to reduce storage
  • Incremental refresh of cubes
  • Leveraging of the HBase coprocessor for query latency
  • Approximate query capability for distinct counts (HyperLogLog)
  • Easy-to-use Web interface to manage, build, monitor, and query cubes
  • Security capability to set ACL at the cube/project level
  • Support for LDAP integration

The fundamental idea

The idea of Kylin is not brand new. Many technologies over the past 30 years have used the same theory to accelerate analytics. These technologies include methods to store pre-calculated results to serve analysis queries, generate each level’s cuboids with all possible combinations of dimensions, and calculate all metrics at different levels.

For reference, here is the cuboid topology:

cuboid_topo

 

When data becomes bigger, the pre-calculation processing becomes impossible – even with powerful hardware. However, with the benefit of Hadoop’s distributed computing power, calculation jobs can leverage hundreds of thousands of nodes. This allows Kylin to perform these calculations in parallel and merge the final result, thereby significantly reducing the processing time.

From relational to key-value

As an example, suppose there are several records stored in Hive tables that represent a relational structure. When the data volume grows very large – 10+ or even 100+ billions of rows – a question like “how many units were sold in the technology category in 2010 on the US site?” will produce a query with a large table scan and a long delay to get the answer. Since the values are fixed every time the query is run, it makes sense to calculate and store those values for further usage. This technique is called Relational to Key-Value (K-V) processing. The process will generate all of the dimension combinations and measured values shown in the example below, at the right side of the diagram. The middle columns of the diagram, from left to right, show how data is calculated by leveraging MapReduce for the large-volume data processing.

rational_to_kv

 

Kylin is based on this theory and is leveraging the Hadoop ecosystem to do the job for huge volumes of data:

  1. Read data from Hive (which is stored on HDFS)
  2. Run MapReduce jobs to pre-calculate
  3. Store cube data in HBase
  4. Leverage Zookeeper for job coordination

Architecture

The following diagram shows the high-level architecture of Kylin.

kylin_arch

 

This diagram illustrates how relational data becomes key-value data through the Cube Build Engine offline process. The yellow lines also illustrate the online analysis data flow. The data requests can originate from SQL submitted using a SQL-based tool, or even using third-party applications via Kylin’s RESTful services. The RESTful services call the Query Engine, which determines if the target dataset exists. If so, the engine directly accesses the target data and returns the result with sub-second latency. Otherwise, the engine is designed to route non-matching dataset queries to SQL on Hadoop, enabled on a Hadoop cluster such as Hive.

Following are descriptions of all of the components the Kylin platform includes.

  • Metadata Manager: Kylin is a metadata-driven application. The Metadata Manager is the key component that manages all metadata stored in Kylin, including the most important cube metadata. All other components rely on the Metadata Manager.
  • Job Engine: This engine is designed to handle all of the offline jobs including shell script, Java API, and MapReduce jobs. The Job Engine manages and coordinates all of the jobs in Kylin to make sure each job executes and handles failures.
  • Storage Engine: This engine manages the underlying storage – specifically the cuboids, which are stored as key-value pairs. The Storage Engine uses HBase – the best solution from the Hadoop ecosystem for leveraging an existing K-V system. Kylin can also be extended to support other K-V systems, such as Redis.
  • REST Server: The REST Server is an entry point for applications to develop against Kylin. Applications can submit queries, get results, trigger cube build jobs, get metadata, get user privileges, and so on.
  • ODBC Driver: To support third-party tools and applications – such as Tableau – we have built and open-sourced an ODBC Driver. The goal is to make it easy for users to onboard.
  • Query Engine: Once the cube is ready, the Query Engine can receive and parse user queries. It then interacts with other components to return the results to the user.

In Kylin, we are leveraging an open-source dynamic data management framework called Apache Calcite to parse SQL and plug in our code. The Calcite architecture is illustrated below. (Calcite was previously called Optiq, which was written by Julian Hyde and is now an Apache Incubator project.)

calcite

 

Kylin usage at eBay

At the time of open-sourcing Kylin, we already had several eBay business units using it in production. Our largest use case is the analysis of 12+ billion source records generating 14+ TB cubes. Its 90% query latency is less than 5 seconds. Now, our use cases target analysts and business users, who can access analytics and get results through the Tableau dashboard very easily – no more Hive query, shell command, and so on.

What’s next

  • Support TopN on high-cardinality dimension: The current MOLAP technology is not perfect when it comes to querying on a high-cardinality dimension – such as TopN on millions of distinct values in one column. Similar to search engines (as many researchers have pointed out), the inverted index is the reasonable mechanism to use to pre-build such results.
  • Support Hybrid OLAP (HOLAP): MOLAP is great to serve queries on historical data, but as more and more data needs to be processed in real time, there is a growing requirement to combine real-time/near-real-time and historical results for business decisions. Many in-memory technologies already work on Relational OLAP (ROLAP) to offer such capability. Kylin’s next generation will be a Hybrid OLAP (HOLAP) to combine MOLAP and ROLAP together and offer a single entry point for front-end queries.

Open source

Kylin has already been open-sourced to the community. To develop and grow an even stronger ecosystem around Kylin, we are currently working on proposing Kylin as an Apache Incubator project. With distinguished sponsors from the Hadoop developer community supporting Kylin, such as Owen O’Malley (Hortonworks co-founder and Apache member) and Julian Hyde (original author of Apache Calcite, also with Hortonworks), we believe that the greater open-source community can take Kylin to the next level.

We welcome everyone to contribute to Kylin. Please visit the Kylin web site for more details: http://kylin.io.

To begin with, we are looking for open-source contributions not only in the core code base, but also in the following areas:

  1. Shell Client
  2. RPC Server
  3. Job Scheduler
  4. Tools

For more details and to discuss these topics further, please follow us on twitter @KylinOLAP and join our Google group: https://groups.google.com/forum/#!forum/kylin-olap

Summary

Kylin has been deployed in production at eBay and is processing extremely large datasets. The platform has demonstrated great performance benefits and has proved to be a better way for analysts to leverage data on Hadoop with a more convenient approach using their favorite tool. We are pleased to open-source Kylin. We welcome feedback and suggestions, and we look forward to the involvement of the open-source community.

( Via Ebaytechblog.com )