Designing For Scale – Three Principles And Three Practices From Tapad Engineering

This is a guest post by Toby Matejovsky, Director of Engineering at Tapad (@TapadEng).

Here at Tapad, scaling our technology strategically has been crucial to our immense growth. Over the last four years we’ve scaled our real-time bidding system to handle hundreds of thousands of queries per second. We’ve learned a number of lessons about scalability along that journey.

Here are a few concrete principles and practices we’ve distilled from those experiences:

  • Principle 1: Design for Many
  • Principle 2: Service-Oriented Architecture Beats Monolithic Application
  • Principle 3: Monitor Everything
  • Practice 1: Canary Deployments
  • Practice 2: Distributed Clock
  • Practice 3: Automate To Assist, Not To Control

Principle 1: Design For Many

There are three amounts that matter in software design: none, one, and many. We’ve learned to always design for the “many” case. This makes scaling more of a simple mechanical process, rather than a complicated project requiring re-architecting the entire codebase. The work to get there might not be as easy, but front-loading the effort pays dividends later when the application needs to scale suddenly.

For us– a guiding principle is to always consider the hypothetical ‘10x use case.’ How would our applications respond if they had to suddenly handle 10 times the current traffic? The system is only as good as the sum of its parts. The application layer might scale out easily, but if it fails because of interacting with a single database node then we haven’t achieved true scalability.

Principle 2: Service-Oriented Architecture Beats Monolithic Application

Here at Tapad we leverage a service-based architecture. The main advantages are the ability to allocate resources efficiently, and to make upgrades easier.

Imagine two systems:

  • One requires a lot of compute, not much memory.

  • One requires a lot of memory, not much compute.

If they were combined into a single system, but only the memory-intensive one needed to scale, every additional node would end up overcommitting on compute.

Virtualization solves the problem by making those overcommitted cores available to some other system, but the solution paints a misleading picture. It appears there are N systems available, but it is impossible to run all N at full capacity. If the cluster keeps enough compute available to run all N at full capacity, then money is being wasted – never a good thing.

Principle 3: Monitor Everything

Monitoring is an obvious requirement for any production system. We currently use Zabbix for alerting, and Graphite for tracking metrics over time. A typical Zabbix check looks like:

  • “Is process X running”

  • “Is node N responding to a request within M milliseconds”

  • “Node N’ is using > 80% of its available storage”

We recently switched out our Graphite backend to use Cassandra instead of whisper to better handle the volume of traffic (there are currently about half a million metrics tracked). We aggregate metrics in-memory with a customized version of Twitter’s Ostrich metrics library, and flush them to graphite every 10 seconds.

A example path for a given metric might look like this:


We use Grafana for building real-time dashboards to track key metrics, and display those dashboards on big screens inside our office. When we switch our SOA to a more ephemeral container-based approach (e.g. running containers on Mesos with Marathon) we may have to re-think how these metrics are organized, as the instance-specific names like foo01 will end up looking like foo.43ffbdc8-ef60-11e4-97ce-005056a272f9. Graphite supports wildcards in queries, so something likesumSeries(*.pipeline.producer.avro_event_bar_count) could work.

Principles lay the groundwork for our decisions, but executing them successfully is equally important. Here are three best practices for working with distributed systems.

Practice 1: Canary Deployments

Some things are very challenging to test rigorously prior to a full-scale production launch. To mitigate risk, we upgrade a single node first and monitor it manually. Assuming it behaves as expected, the rest of the nodes are automatically deployed by Rundeck. Rundeck is a tool that can upgrade systems automatically and in parallel, rolling several instances at a time and moving on to the next set as soon as the upgraded nodes report a healthy status. Monitoring the canary deploy involves more than this single health check, which is why it’s upgraded out-of-band.

Practice 2: Distributed Clock

Because of clock skew and lag, there is no good concept of “now” in a distributed system.

  • Clock skew occurs because clocks are not particularly precise, even with NTP (Network Time Protocol).

  • Lag is a factor when passing messages around. If one server is cut off from the network, buffers messages for a while, then sends them after re-joining, the receiving system will get a batch of messages with relatively old timestamps. A system consuming all messages from these producers cannot be assured it has read 100% of messages up to a given time until it sees that each producer has passed the mark. This assumes that each producer guarantees ordering within its own stream, much like Kafka’s model.

Our solution is to create a sort of distributed clock, where producers record their most recent timestamps as child nodes of a particular Zookeeper “clock” node. The time is resolved by taking the minimum timestamp in that set. We also track lag relative to the resolving node’s system clock.

Practice 3: Automate To Assist, Not To Control

Our devops tools are designed to assist humans, rather than to automatically manage things, as human judgement is often required to respond to a system failure. There is risk in allowing a script to automatically failover a database or spin up new nodes. We have a pager duty rotation with primary, secondary, and tertiary engineers. The engineer can initiate the failover or spin up new nodes based on an alert. This means they are fully aware of the context.

The more well-understood a task is, the more it can be automated. One basic level of automation is using Kickstart/PXE boot. When a new VM starts up, it does a PXE boot and registers itself with Spacewalk, and Puppet handles installation of all required packages. Nothing custom is ever required, which enables us to easily build/rebuild sets of systems just by starting new VMs with particular names.

As we gain better understanding of a given system’s performance we can automate more parts. For example, scaling a shared-nothing system up and down depending on some reasonable change in traffic. For exceptional circumstances, we want a human to make the decision, assisted by all the information at their disposal.


Elements Of Scale: Composing And Scaling Data Platforms


This is a guest repost of Ben Stopford‘s epic post on Elements of Scale: Composing and Scaling Data Platforms. A masterful tour through the evolutionary forces that shape how systems adapt to key challenges.

As software engineers we are inevitably affected by the tools we surround ourselves with. Languages, frameworks, even processes all act to shape the software we build.

Likewise databases, which have trodden a very specific path, inevitably affect the way we treat mutability and share state in our applications.

Over the last decade we’ve explored what the world might look like had we taken a different path. Small open source projects try out different ideas. These grow. They are composed with others. The platforms that result utilise suites of tools, with each component often leveraging some fundamental hardware or systemic efficiency. The result, platforms that solve problems too unwieldy or too specific to work within any single tool.

So today’s data platforms range greatly in complexity. From simple caching layers or polyglotic persistence right through to wholly integrated data pipelines. There are many paths. They go to many different places. In some of these places at least, nice things are found.

So the aim for this talk is to explain how and why some of these popular approaches work. We’ll do this by first considering the building blocks from which they are composed. These are the intuitions we’ll need to pull together the bigger stuff later on.


 In a somewhat abstract sense, when we’re dealing with data, we’re really just arranging locality. Locality to the CPU. Locality to the other data we need. Accessing data sequentially is an important component of this. Computers are just good at sequential operations. Sequential operations can be predicted.

If you’re taking data from disk sequentially it’ll be pre-fetched into the disk buffer, the page cache and the different levels of CPU caching. This has a significant effect on performance. But it does little to help the addressing of data at random, be it in main memory, on disk or over the network. In fact pre-fetching actually hinders random workloads as the various caches and frontside bus fill with data which is unlikely to be used.


Whilst disk is somewhat renowned for its slow performance, main memory is often assumed to simply be fast. This is not as ubiquitously true as people often think. There are one to two orders of magnitude between random and sequential main memory workloads. Use a language that manages memory for you and things generally get a whole lot worse.

Streaming data sequentially from disk can actually outperform randomly addressed main memory. So disk may not always be quite the tortoise we think it is, at least not if we can arrange sequential access. SSD’s, particularly those that utilise PCIe, further complicate the picture as they demonstrate different tradeoffs, but the caching benefits of the two access patterns remain, regardless.


So lets imagine, as a simple thought experiment, that we want to create a very simple database. We’ll start with the basics: a file.

We want to keep writes and reads sequential, as it works well with the hardware. We can append writes to the end of the file efficiently. We can read by scanning the the file in its entirety. Any processing we wish to do can happen as the data streams through the CPU. We might filter, aggregate or even do something more complex. The world is our oyster!


So what about data that changes, updates etc?

We have two options. We could update the value in place. We’d need to use fixed width fields for this, but that’s ok for our little thought experiment. But update in place would mean random IO. We know that’s not good for performance.

Alternatively we could just append updates to the end of the file and deal with the superseded values when we read it back.

So we have our first tradeoff. Append to a ‘journal’ or ‘log’, and reap the benefits of sequential access. Alternatively if we use update in place we’ll be back to 300 or so writes per second, assuming we actually flush through to the underlying media.


Now in practice of course reading the file, in its entirety, can be pretty slow. We’ll only need to get into GB’s of data and the fastest disks will take seconds. This is what a database does when it ends up table scanning.

Also we often want something more specific, say customers named “bob”, so scanning the whole file would be overkill. We need an index.


Now there are lots of different types of indexes we could use. The simplest would be an ordered array of fixed-width values, in this case customer names, held with the corresponding offsets in the heap file. The ordered array could be searched with binary search. We could also of course use some form of tree, bitmap index, hash index, term index etc. Here we’re picturing a tree.

The thing with indexes like this is that they impose an overarching structure. The values are deliberately ordered so we can access them quickly when we want to do a read. The problem with the overarching structure is that it necessitates random writes as data flows in. So our wonderful, write optimised, append only file must be augmented by writes that scatter-gun the filesystem. This is going to slow us down.


Anyone who has put lots of indexes on a database table will be familiar with this problem. If we are using a regular rotating hard drive, we might run 1,000s of times slower if we maintain disk integrity of an index in this way.

Luckily there are a few ways around this problem. Here we are going to discuss three. These represent three extremes, and they are in truth simplifications of the real world, but the concepts are useful when we consider larger compositions.


Our first option is simply to place the index in main memory. This will compartmentalise the problem of random writes to RAM. The heap file stays on disk.

This is a simple and effective solution to our random writes problem. It is also one used by many real databases. MongoDB, Cassandra, Riak and many others use this type of optimisation. Often memory mapped files are used.

However, this strategy breaks down if we have far more data than we have main memory. This is particularly noticeable where there are lots of small objects. Our index would get very large. Thus our storage becomes bounded by the amount of main memory we have available. For many tasks this is fine, but if we have very large quantities of data this can be a burden.

A popular solution is to move away from having a single ‘overarching’ index. Instead we use a collection of smaller ones.


This is a simple idea. We batch up writes in main memory, as they come in. Once we have sufficient – say a few MB’s – we sort them and write them to disk as an individual mini-index. What we end up with is a chronology of small, immutable index files.

So what was the point of doing that? Our set of immutable files can be streamed sequentially. This brings us back to a world of fast writes, without us needing to keep the whole index in memory. Nice!

Of course there is a downside to this approach too. When we read, we have to consult the many small indexes individually. So all we have really done is shift the problem of RandomIO from writes onto reads. However this turns out to be a pretty good tradeoff in many cases. It’s easier to optimise random reads than it is to optimise random writes.

Keeping a small meta-index in memory or using a Bloom Filter provides a low-memory way of evaluating whether individual index files need to be consulted during a read operation. This gives us almost the same read performance as we’d get with a single overarching index whilst retaining fast, sequential writes.

In reality we will need to purge orphaned updates occasionally too, but that can be done with nice sequentially reads and writes.


What we have created is termed a Log Structured Merge Tree. A storage approach used in a lot of big data tools such as HBase, Cassandra, Google’s BigTable and many others. It balances write and read performance with comparatively small memory overhead.


So we can get around the ‘random-write penalty’ by storing our indexes in memory or, alternatively, using a write-optimised index structure like LSM. There is a third approach though. Pure brute force.

Think back to our original example of the file. We could read it in its entirety. This gave us many options in terms of how we go about processing the data within it. The brute force approach is simply to hold data by column rather than by row and stream only the columns required for a query, in their entirety, through the CPU. This approach is termed Columnar or Column Oriented.

(It should be noted that there is an unfortunate nomenclature clash between true column stores and those that follow the Big Table pattern. Whilst they share some similarities, in practice they are quite different. It is wise to consider them as different things.)


Column Orientation is another simple idea. Instead of storing data as a set of rows, appended to a single file, we split each row by column. We then store each column in a separate file.

We keep the order of the files the same, so row N has the same position (offset) in each column file. This is important because we will need to read multiple columns to service a single query, all at the same time. This means ‘joining’ columns on the fly. If the columns are in the same order we can do this in a tight loop which is very cache- and cpu-efficient. Many implementations make heavy use of vectorisation to further optimise throughput for simple join and filter operations.

Writes can leverage the benefit of being append-only. The downside is that we now have many files to update, one for every column in every individual write to the database. The most common solution to this is to batch writes in a similar way to the one used in the LSM approach above. Many columnar databases also impose an overall order to the table as a whole to increase their read performance for one chosen key.


By splitting data by column we significantly reduce the amount of data that needs to be brought from disk, so long as our query operates on a subset of all columns.

In addition to this, data in a single column generally compresses well. We can take advantage of the data type of the column to do this, if we have knowledge of it. This means we can often use efficient, low cost encodings such as run-length, delta, bit-packed etc. For some encodings predicates can be used directly on the uncompressed stream too.

The result is a brute force approach that will work particularly well for operations that require large scans. Aggregate functions like average, max, min, group by etc are typical of this.

This is very different to using the ‘heap file & index’ approach we covered earlier. A good way to understand this is to ask yourself: what is the difference between a columnar approach like this vs a ‘heap & index’ where indexes are added to every field?


The answer to this lies in the ordering of the index files. BTrees etc will be ordered by the fields they index. Joining the data in two indexes involves a streaming operation on one side, but on the other side the index lookups have to read random positions in the second index. This is generally less efficient than joining two indexes (columns) that retain the same ordering. Again we’re leveraging sequential access.


So many of the best technologies which we may want to use as components in a data platform will leverage one of these core efficiencies to excel for a certain set of workloads.

Storing indexes in memory, over a heap file, is favoured by many NoSQL stores such as Riak, Couchbase or MongoDB as well as some relational databases. It’s a simple model that works well.

Tools designed to work with larger data sets tend to take the LSM approach. This gives them fast ingestion as well as good read performance using disk based structures. HBase, Cassandra, RocksDB, LevelDB and even Mongo now support this approach.

Column-per-file engines are used heavily in MPP databases like Redshift or Vertica as well as in the Hadoop stack using Parquet. These are engines for data crunching problems that require large traversals. Aggregation is the home ground for these tools.

Other products like Kafka apply the use of a simple, hardware efficient contract to messaging. Messaging, at it’s simplest, is just appending to a file, or reading from a predefined offset. You read messages from an offset. You go away. You come back. You read from the offset you previously finished at. All nice sequential IO.

This is different to most message oriented middleware. Specifications like JMS and AMQP require the addition of indexes like the ones discussed above, to manage selectors and session information. This means they often end up performing more like a database than a file. Jim Gray made this point famously back in his 1995 publication Queue’s are Databases.

So all these approaches favour one tradeoff or other, often keeping things simple, and hardware sympathetic, as a means of scaling.


So we’ve covered some of the core approaches to storage engines. In truth we made some simplifications. The real world is a little more complex. But the concepts are useful nonetheless.

Scaling a data platform is more than just storage engines though. We need to consider parallelism.


When distributing data over many machines we have two core primitives to play with: partitioning and replication. Partitioning, sometimes called sharding, works well both for random access and brute force workloads.

If a hash-based partitioning model is used the data will be spread across a number of machines using a well-known hash function. This is similar to the way a hash table works, with each bucket being held on a different machine.

The result is that any value can be read by going directly to the machine that contains the data, via the hash function. This pattern is wonderfully scalable and is the only pattern that shows linear scalability as the number of client requests increases. Requests are isolated to a single machine. Each one will be served by just a single machine in the cluster.


We can also use partitioning to provide parallelism over batch computations, for example aggregate functions or more complex algorithms such as those we might use for clustering or machine learning. The key difference is that we exercise all machines at the same time, in a broadcast manner. This allows us to solve a large computational problem in a much shorter time, using a divide and conquer approach.

Batch systems work well for large problems, but provide little concurrency as they tend to exhaust the resources on the cluster when they execute.


So the two extremes are pretty simple: directed access at one end, broadcast, divide and conquer at the other. Where we need to be careful is in the middle ground that lies between the two. A good example of this is the use of secondary indexes in NoSQL stores that span many machines.

A secondary index is an index that isn’t on the primary key. This means the data will not be partitioned by the values in the index. Directed routing via a hash function is no longer an option. We have to broadcast requests to all machines. This limits concurrency. Every node must be involved in every query.

For this reason many key value stores have resisted the temptation to add secondary indexes, despite their obvious use. HBase and Voldemort are examples of this. But many others do expose them, MongoDB, Cassandra, Riak etc. This is good as secondary indexes are useful. But it’s important to understand the effect they will have on the overall concurrency of the system.


The route out of this concurrency bottleneck is replication. You’ll probably be familiar with replication either from using async slave databases or from replicated NoSQL stores like Mongo or Cassandra.

In practice replicas can be invisible (used only for recovery), read only (adding read concurrency) or read write (adding partition tolerance). Which of these you choose will trade off against the consistency of the system. This is simply the application of CAP theorem (although cap theorem also may not be as simple as you think).


This tradeoff with consistency* brings us to an important question. When does consistency matter?

Consistency is expensive. In the database world ACID is guaranteed by serialisability. This is essentially ensuring that all operations appear to occur in sequential order. It turns out to be a pretty expensive thing. In fact it’s prohibitive enough that many databases don’t offer it as an isolation level at all. Those that do never set it as the default.

Suffice to say that if you apply strong consistency to a system that does distributed writes you’ll likely end up in tortoise territory.

(* note the term consistency has two common usages. The C in ACID and the C in CAP. They are unfortunately not the same. I’m using the CAP definition: all nodes see the same data at the same time)


The solution to this consistency problem is simple. Avoid it. If you can’t avoid it isolate it to as few writers and as few machines as possible.

Avoiding consistency issues is often quite easy, particularly if your data is an immutable stream of facts. A set of web logs is a good example. They have no consistency concerns as they are just facts that never change.

There are other use cases which do necessitate consistency though. Transferring money between accounts is an oft used example. Non-commutative actions such as applying discount codes is another.

But often things that appear to need consistency, in a traditional sense, may not. For example if an action can be changed from a mutation to a new set of associated facts we can avoid mutable state. Consider marking a transaction as being potentially fraudulent. We could update it directly with the new field. Alternatively we could simply use a separate stream of facts that links back to the original transaction.


So in a data platform it’s useful to either remove the consistency requirement altogether, or at least isolate it. One way to isolate is to use the single writer principal, this gets you some of the way. Datomic is a good example of this. Another is to physically isolate the consistency requirement by splitting mutable and immutable worlds.

Approaches like Bloom/CALM extend this idea further by embracing the concept of disorder by default, imposing order only when necessary.

So those were some of the fundamental tradeoffs we need to consider. Now how to we pull these things together to build a data platform?


A typical application architecture might look something like the below. We have a set of processes which write data to a database and read it back again. This is fine for many simple workloads. Many successful applications have been built with this pattern. But we know it works less well as throughput grows. In the application space this is a problem we might tackle with message-passing, actors, load balancing etc.

The other problem is this approach treats the database as a black box. Databases are clever software. They provide a huge wealth of features. But they provide little mechanism for scaling out of an ACID world. This is a good thing in many ways. We default to safety. But it can become an annoyance when scaling is inhibited by general guarantees which may be overkill for the requirements we have.


The simplest route out of this is CQRS (Command Query Responsibility Segregation).

Another very simple idea. We separate read and write workloads. Writes go into something write-optimised. Something closer to a simple journal file. Reads come from something read-optimised. There are many ways to do this, be it tools like Goldengate for relational technologies or products that integrate replication internally such as Replica Sets in MongoDB.


Many databases do something like this under the hood. Druid is a nice example. Druid is an open source, distributed, time-series, columnar analytics engine. Columnar storage works best if we input data in large blocks, as the data must be spread across many files. To get good write performance Druid stores recent data in a write optimised store. This is gradually ported over to the read optimised store over time.

When Druid is queried the query routes to both the write optimised and read optimised components. The results are combined (‘reduced’) and returned to the user. Druid uses time, marked on each record, to determine ordering.

Composite approaches like this provide the benefits of CQRS behind a single abstraction.


Another similar approach is to use an Operational/Analytic Bridge. Read- and write-optimised views are separated using an event stream. The stream of state is retained indefinitely, so that the async views can be recomposed and augmented at a later date by replaying.

So the front section provides for synchronous reads and writes. This can be as simple as immediately reading data that was written or as complex as supporting ACID transactions.

The back end leverages asynchronicity, and the advantages of immutable state, to scale offline processing through replication, denormalisation or even completely different storage engines. The messaging-bridge, along with joining the two, allows applications to listen to the data flowing through the platform.

As a pattern this is well suited to mid-sized deployments where there is at least a partial, unavoidable requirement for a mutable view.


If we are designing for an immutable world, it’s easier to embrace larger data sets and more complex analytics. The batch pipeline, one almost ubiquitously implemented with the Hadoop stack, is typical of this.

The beauty of the Hadoop stack comes from it’s plethora of tools. Whether you want fast read-write access, cheap storage, batch processing, high throughput messaging or tools for extracting, processing and analysing data, the Hadoop ecosystem has it all.

The batch pipeline architecture pulls data from pretty much any source, push or pull. Ingests it into HDFS then processes it to provide increasingly optimised versions of the original data. Data might be enriched, cleansed, denormalised, aggregated, moved to a read optimised format such as Parquet or loaded into a serving layer or data mart. Data can be queried and processed throughout this process.

This architecture works well for immutable data, ingested and processed in large volume. Think 100’s of TBs plus. The evolution of this architecture will be slow though. Straight-through timings are often measured in hours.


The problem with the Batch Pipeline is that we often don’t want to wait hours to get a result. A common solution is to add a streaming layer aside it. This is sometimes referred to as theLambda Architecture.

The Lambda Architecture retains a batch pipeline, like the one above, but it circumvents it with a fast streaming layer. It’s a bit like building a bypass around a busy town. The streaming layer typically uses a streaming processing tool such as Storm or Samza.

The key insight of the Lambda Architecture is that we’re often happy to have an approximate answer quickly, but we would like an accurate answer in the end.

So the streaming layer bypasses the batch layer providing the best answers it can within a streaming window. These are written to a serving layer. Later the batch pipeline computes an accurate data and overwrites the approximation.

This is a clever way to balance accuracy with responsiveness. Some implementations of this pattern suffer if the two branches end up being dual coded in stream and batch layers. But it is often possible to simply abstract this logic into common libraries that can be reused, particularly as much of this processing is often written in external libraries such as Python or R anyway. Alternatively systems like Spark provide both stream and batch functionality in one system (although the streams in Spark are really micro-batches).

So this pattern again suits high volume data platforms, say in the 100TB+ range, that want to combine streams with existing, rich, batch based analytic function.


There is another approach to this problem of slow data pipelines. It’s sometimes termed theKappa architecture. I actually thought this name was ‘tongue in cheek’ but I’m now not so sure. Whichever it is, I’m going to use the term Stream Data Platform, which is a term in use also.

Stream Data Platform’s flip the batch pattern on its head. Rather than storing data in HDFS, and refining it with incremental batch jobs, the data is stored in a scale out messaging system, or log, such as Kafka. This becomes the system of record and the stream of data is processed in real time to create a set of tertiary views, indexes, serving layers or data marts.

This is broadly similar to the streaming layer of the Lambda architecture but with the batch layer removed. Obviously the requirement for this is that the messaging layer can store and vend very large volumes of data and there is a sufficiently powerful stream processor to handle the processing.

There is no free lunch so, for hard problems, Stream Data Platform’s will likely run no faster than an equivalent batch system, but switching the default approach from ‘store and process’ to ‘stream and process’ can provide greater opportunity for faster results.


Finally, the Stream Data Platform approach can be applied to the problem of ‘application integration’. This is a thorny and difficult problem that has seen focus from big vendors such as Informatica, Tibco and Oracle for many years. For the most part results have been beneficial, but not transformative. Application integration remains a topic looking for a real workable solution.

Stream Data Platform’s provide an interesting potential solution to this problem. They take many of the benefits of an O/A bridge – the variety of asynchronous storage formats and ability to recreate views – but leave the consistency requirement isolated in, often existing sources:


With the system of record being a log it’s easy to enforce immutability. Products like Kafka can retain enough volume internally to be used as a historic record. This means recovery can be a process of replaying and regenerating state, rather than constantly checkpointing.

Similarly styled approaches have been taken before in a number of large institutions with tools such as Goldengate, porting data to enterprise data warehouses or more recently data lakes. They were often thwarted by a lack of throughput in the replication layer and the complexity of managing changing schemas. It seems unlikely the first problem will continue. As for the later problem though, the jury is still out.


So we started with locality. With sequential addressing for both reads and writes. This dominates the tradeoffs inside the components we use. We looked at scaling these components out, leveraging primitives for both sharding and replication. Finally we rebranded consistency as a problem we should isolate in the platforms we build.

But data platforms themselves are really about balancing the sweet-spots of these individual components within a single, holistic form. Incrementally restructuring. Migrating the write-optimised to the read-optimised. Moving from the constraints of consistency to the open plains of streamed, asynchronous, immutable state.

This must be done with a few things in mind. Schemas are one. Time, the peril of the distributed, asynchronous world, is another. But these problems are manageable if carefully addressed. Certainly the future is likely to include more of these things, particularly as tooling, innovated in the big data space, percolates into platforms that address broader problems, both old and new.


How OpenCL Could Open the Gates for FPGAs

In this special guest feature from Scientific Computing World, Robert Roe explains how OpenCL may make FPGAs an attractive option.

Over the past few years, high-performance computing (HPC) has become used to heterogeneous hardware, principally mixing GPUs and CPUs, but now, with both major FPGA manufacturers in conformance with the OpenCL standard, the door is effectively open for the wider use of FPGAs in high-performance computing.In January 2015, FPGAs took a step closer to the mainstream of high-performance computing with the announcement that Xilinx’s development environment for systems and software engineers, SDAccel, had been certified as conforming to the OpenCL standard for parallel programming of heterogeneous systems.

The changing landscape of HPC, with the move towards data-centric computing, could favour FPGAs with very high I/O throughput. However, it remains to be seen if FPGAs will be used as an accelerator or if supercomputers might be built using FPGA as the main processor technology.

One of the attractions of FPGAs is that they consume very little power but, as with GPUs initially, the barrier to adoption has been the difficulty of programming them. Manufacturers and vendors are now releasing compilers that will optimise code written in C and C++ to make use of the flexible nature of FPGA architecture.

Easier to program

Mike Strickland, director of the computer and storage business unit at Altera said: “The problem was that we did not have the ease of use, we did not have a software-friendly interface back in 2008. The huge enabler here has been OpenCL.”

Larry Getman, VP of strategic marketing and planning at Xilinx said: ‘When FPGAs first started they could do very basic things such as Boolean algebra and it was really used for glue logic. Over the years, FPGAs have really advanced and evolved with more hardened structures which are much more specialised.’

Getman continued: ‘Over the years FPGAs have gone from being glue logic to harder things like radio head systems, that do a lot of DSP processing; very high-performance vision applications; wireless radio; medical equipment; and radar systems. So they are used in high-performance computing, but for applications that use very specialised algorithms.’

Getman concluded: ‘The reason people use FPGAs for these applications is simple, they offer a much higher level of performance per Watt than trying to run the same application in pure software code.’

FPGAs are programmable semiconductor devices that are based around a matrix of configurable logic blocks (CLBs) connected through programmable interconnects. This is where the FPGA gets the term ‘field programmable’ as an FPGA can be programmed and optimised for a specific application. Efficient programming can take advantage of the inherent parallelism of the FPGA architecture delivering a higher level of performance than accelerators that have a less flexible architecture.

Millions of threads running at the same time

Devadas Varma, senior director of software Research and Development at Xilinx said: ‘A CPU, if it is single core CPU, executes one instruction at a time and if you have four cores, eight cores, that are multithreaded then you can do eight or sixteen sets of instructions, for example. If you compare this to an FPGA, which is a blank set of millions of components that you decide to interconnect, theoretically speaking you could have thousands or even millions of threads running at the same time.’

Reuven Weintraub, founder and chief technology officer at Gidel, highlighted the differences between FPGAs and the processors used in CPUs today. He said: ‘They are the same and they are different. They are the same from the perspective that both of them are programmable. The difference is coming from the fact that in the FPGA all the instructions would run in parallel. Actually the FPGA is not a processor; it is compiled to be a dedicated set of hardware components according to the requirements of the algorithm – that is what gives it the efficiency, power savings and so on.’

Traditionally this power efficiency, scalability, and flexible architecture came at the price of more complex programming: code needed to address the hardware and the flow of data across the various components, in addition to providing the basic instruction set to be computed in the logic blocks. However, major FPGA manufacturers Altera and Xilinx have both been working on their own OpenCL based solutions which have the potential to make FPGA acceleration a real possibility for more general HPC workloads.

Development toolkits

Xilinx has recently released SDAccel, a development environment that includes software tools including its own compiler, tools for code development, profiling, and debugging, and provides a GPU-like work environment. Getman said: ‘Our goal is to make an FPGA as easy to program as a GPU. SDAccel, which is OpenCL based, does allow people to program in OpenCL and C or C++ and they can now target the FPGA at a very high level.’

In addition, SDAccel provides functionality to swap multiple kernels in and out of the FPGA without disrupting the interface between the server CPU and the FPGA. This could be a key enabler of FPGAs in real-world data centres where turning off some of your resources while you re-optimise them for the next application is not an economically viable strategy at present.

Altera has been working closely with the Khronos group, which oversees a number of open computing standards including OpenCL, OpenGL, and WebGL. Altera released a development toolkit, Altera’s SDK for OpenCL, in May 2013. Strickland said: ‘In May 2013 we achieved a very important conformance test with the standards body – the Khronos group – that manages OpenCL. We had to pass 8,000 tests and that really strengthened the credibility of what we are doing with the FPGA.’

Strickland continued: ‘In the past, there were a lot of FPGA compiler tools that took care of the logic but not the data management. They could take lines of C and automatically generate lines of RTL but they did not take care of how that data would come from the CPU, the optimisation of external memory bandwidth off the FPGA, and that is a large amount of the work.’

Traditionally optimising algorithms to utilise fully the parallel architectures of FPGA technology involved significant experience using HDLs (hardware description languages) because they allowed programmers to write code that would address the FPGA at register-transfer level (RTL).

RTL enables programmers to describe the flow of data between hardware registers, and the logical operations performed on that data. This is typically what creates the difference in performance between more general processors and FPGAs, which can be optimised much more efficiently for a specific algorithm.

The difficulty is that that kind of coding requires expertise and can be very time consuming. Hand-coded RTL may go through several iterations as programmers test the most efficient ways to parallelise the instruction set to take advantage of the programmable hardware on the FPGA.

Strickland said: “With OpenCL or the OpenCL compiler, you still write something that is like C code that targets the FPGA. The big difference I would say is the instruction set. The big innovation has been the back end of our complier which can now take that C code and efficiently use the FPGA.”

Strickland noted that Altera’s compiler ‘does more than 200 optimisations when you write some C code. It is doing things like seeing the order in which you access memory so that it can group memory addresses together, improving the efficiency of that memory interface.’

Converting code from different languages into an RTL description has been possible for some time, but these developments in OpenCL make it much easier for programmers without extensive knowledge of HDLs, such as VHDL and Verilog, to make use of FPGAs.

However OpenCL is not the final piece of the puzzle for FPGA programming. Strickland said: ‘Over time you may want to have other high-level interfaces. There is a standard called SPIR (Standard Portable Intermediate Representation). The idea is that this allows you to kind of split up your compiler between the front end and the back end, enabling people to use different high-level language interfaces on the front end.’

Strickland continued: ‘In universities now there is research into domain-specific languages, so people are trying to accomplish a certain class of algorithms may benefit from having a higher level interface than even C. The idea behind exposing this intermediate compiler interface is you can now start working with the ecosystem to have front ends with higher-level interfaces.’

Over the past few years, there have been two ideas behind the best way to program FPGAs: high-level synthesis (HLS) or OpenCL. As OpenCL has matured, Xilinx decided to adopt the standard but to keep the work it had done developing HLS technology and integrate that into the development environment conforming to the OpenCL standard.

Getman said: “The main problem is that C is very much designed to go cycle to cycle, step by step. Unfortunately hardware doesn’t. Hardware has a lot of things running at the same time.” This aspect was what made HLS attractive as a compiler that can take OpenCL, C or C++ and architecturally optimise it for the FPGA hardware.

Xilinx acquired AutoESL and its HLS tool AutoPilot in 2011 and began integrating it into its own development tools for FPGAs. Getman said: ‘That was really the big switching point. For many years, people had been promising really great results with HLS but in reality the results were a lot bigger and a lot slower than what could have been done by hand.’

Getman continued: ‘We have integrated this technology into our tools and added a lot to it. This is really one of the big differentiators from our competition, even though we both have OpenCL support. This technology allows our users the opportunity to create their own libraries in real-time using C, C++ or OpenCL, rather than have to wait for the vendor to create specific libraries or specific algorithms for them.

Varma said: “The silver bullet in HLS is the ability to take a sequential description that has been written in C and then find this parallelism, the concurrencies, without the user having to think. That was a necessary technology before we could do anything. It has been adopted by thousands of users already as a standalone technology, but what we do is embed that technology inside OpenCL compilers so that now it can be utilised in full software mode and it is fully compatible with OpenCL.”

Getman said: “We consciously made a switch over the last few years to expand our customer base by both continuing technology development for our traditional users as well as expand our tool flow to cater to software coders.”

A key facet of this technology is that Xilinx is letting programmers take the work they have done in C and port it over to OpenCL using the technology from HLS that is now integrated into its compilers. Varma said: ‘One thing that changes when you go from software to hardware programming is that C programmers, OpenCL programmers, are used to dealing with a lot of libraries. They do not have to write matrix multiplications or filters or those kinds of things, because they are always available as library elements. Now hardware languages often have libraries, but they are very specific implementations that you cannot just change for your use.’

Varma concluded: “By writing in C, our HLS technology can re-compile that very efficiently and immediately. This gives you a tremendous capability.”

Coprocessor or something bigger?

FPGA manufacturers like Altera and Xilinx have been focusing their attention on using FPGAs in HPC as coprocessors or accelerators that would be used in much the same way as GPUs.

Getman said: “The biggest use model is really processor plus FPGA. The reason for that is there are still things that you want to run on a processor. You really want a processor to do what it is good at. Typically an FPGA will be used through something like a PCIE slot and it will be used as an acceleration engine for the things that are really difficult for the processor.”

This view was shared by Devadas Varma who highlighted some of the functionality in an earlier release of OpenCL that increased the potential for CPU/GPU/FPGA synergy.

Varma said: ‘The tool we have developed supports OpenCL 1.2 and importantly it can co-exist with CPUs and GPUs. In fact in our upcoming release we will support partitioning workloads into GPUs, we already support this feature regarding CPUs. That is definitely where we are heading.’

However this was not a view shared by Reuven Weintraub, at Gidel, who felt that to regard an FPGA simply as a coprocessor was to miss much of the point and many of the advantages that FPGAs could offer to computing. Weintraub said: “For me a coprocessor is like the H87 was, you make certain lines of code in the processor and then you say “there’s a line of code for you” and it returns and this goes back and forth. The big advantage of running with the FPGA is that the FPGA can have a lot of pipelining inside of it, solve a lot of things and have a lot of memory.”

He explained that an FPGA contains a ‘huge array of registers that are immediately available’ by taking advantage of the on-board memory and high-throughput that FPGAs can handle, meaning that ‘you do not necessarily have to use the cache because the data is being moved in and out in the correct order.’

Weintraub concluded: “Therefore it is better to give a task to the FPGA rather than giving just a few up codes and the going back and forth. It is more task oriented. Computing is a balance between the processing, memory access, networking and storage, but everything has to be balanced. If you want to utilize a good FPGA then you need to give it a task that makes use of its internal memory so that it can move things from one job to another.”

Gidel has considerable experience in this field. Gidel provided the FPGAs for the Novo-G supercomputer, housed at the University of Florida, the largest re-configurable supercomputer available for research.

The university is a lead partner in the ‘Center for High-Performance Reconfigurable Computing’ (CHREC), a US national research centre funded by the National Science Foundation.

In development at the UF site since 2009, Novo-G features 192, 40nm FPGAs (Altera Stratix-IV E530) and 192, 65nm FPGAs (Stratix-III E260).

These 384 FPGAs are housed in 96 quad-FPGA boards (Gidel ProcStar-IV and ProcStar-III) and supported by quad-core Nehalem Xeon processors, GTX-480 GPUs, 20Gb/s non-blocking InfiniBand, GigE, and approximately 3TB of total RAM, most of it directly attached to the FPGAs. An upgrade is underway to add 32 top-end, 28nm FPGAs (Stratix-V GSD8) to the system.

According to the article ‘Novo-G: At the Forefront of Scalable Reconfigurable Supercomputing’ written by Alan George, Herman Lam, and Greg Stitt, three researchers from the university, Novo-G achieved speeds rivaling the largest conventional supercomputers in existence – yet at a fraction of their size, energy, and cost.

But although processing speed and energy efficiency were important, they concluded that the principal impact of a reconfigurable supercomputer like Novo-G was the freedom that its innovative design can give to scientists to conduct more types of analysis, and examine larger datasets.

The potential is there.


HTTP 2.0 is coming!

Why we need another version of HTTP protocol?

HTTP has been in use by the World-Wide Web global information initiative since 1990. However, it is December 2014 and we don’t have anymore simple pages with cross linked HTML documents as it used to be. Instead, we have Web applications, some of them very heavy and requiring a lot of resources. And unfortunately, the version of the HTTP protocol currently used – 1.1, has issues.

HTTP is actually very simple – browser sends request the server, server provides the response and that is it. Very simple, but if you check the chart below you’ll see that there is not only one request and one response, but multiple requests and responses – about 80 – 100 and 1.8MB of data:


Data provided by HTTP Archive.

Now, imagine we have a server in Los Angeles and our client is in Berlin, Germany. All those 80-100 requests should travel from Berlin to L.A. and then get back. That is not fast – for example, the roundtrip time between London and New York is about 56 ms. From Berlin to Los Angeles it is even more. And as we know, first page load is latency bound; latency is the constraining factor for today’s applications.

In order to speed up downloading the needed resources, browsers currently open multiple connections to the server (typically 6 per domain). However, opening a connection is expensive – there is domain name resolution, socket connect, more roundtrips if TLS should be established and so on. From browser point of view, this also means more consumed memory, connections management, using heuristics when to open a new connection and when to reuse existing one and so on.

Web engineers also tried to make sites loading as fast as possible. They invented many different workarounds (aka “optimizations”) like: image sprites, domain sharding, resource inlining, concatenating files, combo services and so on. Inventing more and more tricks may work to some point but what if we were able to fix the issues on the protocol level and avoid these workarounds?

HTTP/2 in a nutshell

HTTP/2 is binary protocol, where browser and server exchange frames. Each frame belongs to a stream via identifier. The key point is that these streams are multiplexed, they have priorities, priorities are specified by the client (browser) and they can be changed runtime. A stream can depend on another stream.

In contrary to HTTP 1.1, in HTTP/2 the headers are compressed. A special algorithm was invented for that purpose and it is called HPACK.

Server push is a feature of HTTP/2 which deserves special attention. Web Developers actually implemented the same idea for years – the mentioned above inlining of resources in the page is an example of that. Since this is on protocol level, instead of embedding CSS and JavaScript files or images directly on the page, server can explicitly push these resources to the browser in relation with a previously made request.

How does an HTTP/2 connection look like?


Image by Ilya Grigorik

In this example we have three streams:

  1. Client initiated request of page.html
  2. Stream, which carries script.js – the server initiated this stream, since it knows already what is the content of page.html and that script.js will be requested from the browser as soon as it parses the HTML.
  3. Stream, which carries style.css – initiated by the server, since this CSS is considered as critical and it will be required from the browser as soon as it parses the HTML file.

HTTP/2 is huge step forward. It is currently on its Draft-16 and the final specification will be ready very soon.

Optimizing the Web stack for HTTP/2 era

Does the above mean that we should discard all optimizations we were doing for years to make our Web applications as fast as possible? Of course not! This just means we have to forget about some of them and start applying others. For example, we still should send as less data as possible from the server to the client, to deal with caching and store files offline. In general, we can split the needed optimizations in two parts:

Optimize the content being served to the browser:

  • Minimizing JavaScript, CSS and HTML files
  • Removing redundant data from images
  • Optimize Critical Path CSS
  • Removing the CSS which is not needed on the page using tools like UnCSS before to send the page to the server
  • Properly specifying ETag to the files and setting far future expires headers
  • Using HTML 5 offline to store already downloaded files and minimize traffic on the next page load

Optimize the server and TCP stack:

  • Check your server and be sure the value of TCP’s Initial Congestion Window (initial cwnd) is 10 segments (IW10). If you use GNU/Linux, just upgrade to 3.2+ to get this feature and another important update – Proportional Rate Reduction for TCP
  • Disable Slow-Start Restart after idle
  • Check and enable if needed Window Scaling
  • Consider to use TCP Fast Open (TFO)

(for more information check the wonderful book “High Performance Browser Networking” by Ilya Grigorik)

We may consider to remove the following “optimizations”:

  • Joining files – nowadays many companies are striking for continues deployment which makes this challenging – a single line of code change invalidates the whole bundle. Also, it forces browser to wait until the whole file arrives before to start processing it
  • Domain sharding – loading resources from different domains in order to avoid browser’s limit of connections per domain (usually 6) is the first “optimization” to be removed. It causes retransmissions and unnecessary latency
  • Resource inlining – prevents caching and inflates the document in which they are being stored. Instead, consider to leave CSS, JavaScript and images as external resources
  • Image sprites – the problem with cache invalidation is present here too. Apart from that, image sprites force browser to consume more CPU and memory during the process of decoding the the entire sprite
  • Using cookie free domains

The new modules API from ES6 and HTTP/2

For years we were splitting our JavaScript code in modules, stored in different files. Since JavaScript did not provide module system prior to version 6, the community invented two different main formats – AMD and CommonJS. Of course, custom formats, like those used by YUI Loader existed too. In ECMAScript 6 however we have a brand new way of creating modules. The API for loading them looks like this:

Declarative syntax:

import {foo, bar} from 'file.js';

Programmatic loader API:

System.import('my_module') .then(my_module => {
  // ...
.catch(error => {
  // ...

Imagine this module has 10 different dependencies, each of them stored in separate file.
If we don’t change anything on build time, then the browser will request the file, which contains the main module, and then it will make many additional requests in order to download all dependencies.
Since we have HTTP/2 now the browser won’t open multiple connections. However, in order to process the main module, browser still has to wait for all dependencies to be downloaded over the network. This means – download one file, parse it, then oh – it requires another module, download again the required file and so on, until all dependencies are being resolved.

One possible fix of the above issue could be to change this on build time. This means, you may concatenate all modules in one file and then overwrite the originally specified import statements to look for modules in that joined file. However, this has drawbacks and they are the same as if we were joining files for the HTTP 1.1 era.

Another fix, which may be considered is to leverage HTTP/2 push promises. In the example above this means you may try to push the dependencies when the main module is being requested. If the browser already has them then it may abort (reset) the stream by sending RST_STREAM frame.

Patrick Meenan however pointed me to a very interesting issue – on practice browser may not be able to abort the stream quickly enough. By the time the pushed resources hit the client and are validated against the cache, the entire resource will already be in buffers (on the network and in the server) and the whole file will be downloaded anyway. It will work if we can be sure that the resources aren’t in the browser cache, otherwise we will end up sending them anyway. This is an interesting point for further research.

HTTP/2 implementations

You may start playing with HTTP/2 today. There are many server implementations – grab one and start experimenting.

The main browser vendors support HTTP/2 already:

  • Internet Explorer supports HTTP/2 from IE 11 on Windows 10 Technical Preview,
  • Firefox has enabled HTTP/2 by default in version 34 and
  • current version of Chrome supports HTTP/2, but it is not enabled by default. It may be enabled by adding a command line flag --enable-spdy4 when Chrome is being launched or via chrome://flags.

Currently only HTTP/2 over TLS is implemented in all browsers.

Other interesting protocols to keep an eye on

QUIC is a another protocol, started by Google as natural extension of the research on protocols like SPDY and HTTP/2. Here I won’t give many details, it is enough to mention that it has all features of SPDY and HTTP/2, but it is built on top of UDP. The goal is to avoid head-of-line blocking like in SPDY or HTTP/2 and to establish a connection much faster than TCP+TLS is capable to do.

For more information about HTTP/2 and QUIC, please watch my JSConfEU talk.


Introducing Proxygen, Facebook’s C++ HTTP framework

We are excited to announce the release of Proxygen, a collection of C++ HTTP libraries, including an easy-to-use HTTP server. In addition to HTTP/1.1, Proxygen (rhymes with “oxygen”) supports SPDY/3 and SPDY/3.1. We are also iterating and developing support for HTTP/2.

Proxygen is not designed to replace Apache or nginx — those projects focus on building extremely flexible HTTP servers written in C that offer good performance but almost overwhelming amounts of configurability. Instead, we focused on building a high performance C++ HTTP framework with sensible defaults that includes both server and client code and that’s easy to integrate into existing applications. We want to help more people build and deploy high performance C++ HTTP services, and we believe that Proxygen is a great framework to do so. You can read the documentation and send pull requests on our Github site.


Proxygen began as a project to write a customizable, high-performance HTTP(S) reverse-proxy load balancer nearly four years ago. We initially planned for Proxygen to be a software library for generating proxies, hence the name. But Proxygen has evolved considerably since the early days of the project. While there were a variety of software stacks that provided similar functionality to Proxygen at the time (Apache, nginx, HAProxy, Varnish, etc), we opted go in a different direction.

Why build our own HTTP stack?

  • Integration.The ability to deeply integrate with existing Facebook infrastructure and tools was a driving factor. For instance, being able to administer our HTTP infrastructure with tools such as Thrift simplifies integration with existing systems. Being able to easily track and measure Proxygen with systems like ODS, our internal monitoring tool, makes rapid data correlation possible and reduces operational overhead. Building an in-house HTTP stack also meant that we could leverage improvements made to these Facebook technologies.
  • Code reuse. We wanted a platform for building and delivering event-driven networking libraries to other projects. Currently, more than a dozen internal systems are built on top of the Proxygen core code, including parts of systems like Haystack, HHVM, our HTTP load balancers, and some of our mobile infrastructure. Proxygen provides a platform where we can develop support for a protocol like HTTP/2 and have it available anywhere that leverages Proxygen’s core code.
  • Scale. We tried to make some of the existing software stacks work, and some of them did for a long time. Operating a huge HTTP infrastructure built on top of lots of workarounds eventually stopped scaling well for us.
  • Features.There were a number of features that didn’t exist in other software (some still don’t) that seemed quite difficult to integrate in existing projects but would be immediately useful for Facebook. Examples of features that we were interested in included SPDY, WebSockets, HTTP/1.1 (keep-alive), TLS false start, and Facebook-specific load-scheduling algorithms. Building an in-house HTTP stack gave us the flexibility to iterate quickly on these features.

Initially kicked off in 2011 by a few engineers who were passionate about seeing HTTP usage at Facebook evolve, Proxygen has been supported since then by a team of three to four people. Outside of the core development team, dozens of internal contributors have also added to the project. Since the project started, the major milestones have included:

  • 2011 – Proxygen development began and started taking production traffic
  • 2012 – Added SPDY/2 support and started initial public testing
  • 2013 – SPDY/3 development/testing/rollout, SPDY/3.1 development started
  • 2014 – Completed SPDY/3.1 rollout, began HTTP/2 development

There are a number of other milestones that we are excited about, but we think the code tells a better story than we can.

We now have a few years of experience managing a large Proxygen deployment. The library has been battle-tested with many, many trillions of HTTP(S) and SPDY requests. Now we’ve reached a point where we are ready to share this code more widely.


The central abstractions to understand in proxygen/lib are the session, codec, transaction, and handler. These are the lowest level abstractions, and we don’t generally recommend building off of these directly.

When bytes are read off the wire, the HTTPCodec stored inside HTTPSession parses these into higher level objects and associates with it a transaction identifier. The codec then calls into HTTPSessionwhich is responsible for maintaining the mapping between transaction identifier and HTTPTransactionobjects. Each HTTP request/response pair has a separate HTTPTransaction object. Finally,HTTPTransaction forwards the call to a handler object which implements HTTPTransaction::Handler. The handler is responsible for implementing business logic for the request or response.

The handler then calls back into the transaction to generate egress (whether the egress is a request or response). The call flows from the transaction back to the session, which uses the codec to convert the higher level semantics of the particular call into the appropriate bytes to send on the wire.

The same handler and transaction interfaces are used to both create requests and handle responses. The API is generic enough to allow both. HTTPSession is specialized slightly differently depending on whether you are using the connection to issue or respond to HTTP requests.



Moving into higher levels of abstraction, proxygen/httpserver has a simpler set of APIs and is the recommended way to interface with proxygen when acting as a server if you don’t need the full control of the lower level abstractions.

The basic components here are HTTPServer, RequestHandlerFactory, and RequestHandler. AnHTTPServer takes some configuration and is given a RequestHandlerFactory. Once the server is started, the installed RequestHandlerFactory spawns a RequestHandler for each HTTP request.RequestHandler is a simple interface users of the library implement. Subclasses of RequestHandlershould use the inherited protected member ResponseHandler* downstream_ to send the response.

HTTP Server

The server framework we’ve included with the release is a great choice if you want to get set up quickly with a simple, fast-out-of-the-box event driven server. With just a few options, you are ready to go. Check out the echo server example to get a sense of what it is like to work with. For fun, we benchmarked the echo server on a 32 logical core Intel(R) Xeon(R) CPU E5-2670 @ 2.60GHz with 16 GiB of RAM,* *varying the number of worker threads from one to eight. We ran the client on the same machine to eliminate network effects, and achieved the following performance numbers.

    # Load test client parameters:
    # Twice as many worker threads as server
    # 400 connections open simultaneously
    # 100 requests per connection
    # 60 second test
    # Results reported are the average of 10 runs for each test
    # simple GETs, 245 bytes of request headers, 600 byte response (in memory)
    # SPDY/3.1 allows 10 concurrent requests per connection

While the echo server is quite simple compared to a real web server, this benchmark is an indicator of the parsing efficiency of binary protocols like SPDY and HTTP/2. The HTTP server framework included is easy to work with and gives good performance, but focuses more on usability than the absolute best possible performance. For instance, the filter model in the HTTP server allows you to easily compose common behaviors defined in small chunks of code that are easily unit testable. On the other hand, the allocations associated with these filters may not be ideal for the highest-performance applications.


Proxygen has allowed us to rapidly build out new features, get them into production, and see the results very quickly. As an example, we were interested in evaluating the compression format HPACK, however we didn’t have any HTTP/2 clients deployed yet and the HTTP/2 spec itself was still under heavy development. Proxygen allowed us to implement HPACK, use it on top of SPDY, and deploy that to mobile clients and our servers simultaneously. The ability to rapidly experiment with a real HPACK deployment enabled us to quickly understand performance and data efficiency characteristics in a production environment. As another example, the internal configuration systems at Facebook continue to evolve and get better. As these systems are built out, Proxygen is able to quickly take advantage of them as soon as they’re available. Proxygen has made a significant positive impact on our ability to rapidly iterate with our HTTP infrastructure.

Open Source

The Proxygen codebase is under active development and will continue to evolve. If you are passionate about HTTP, high performance networking code, and modern C++, we would be excited to work with you! Please send us pull requests on GitHub.

We’re committed to open source and are always looking for new opportunities to share our learnings and software. The traffic team has now open sourced Thrift as well as Proxygen, two important components of the network software infrastructure at Facebook. We hope that these software components will be building blocks for other systems that can be open sourced in the future.

( Via Facebook)

Netflix open sources its data traffic cop, Suro

Netflix is open sourcing a tool called Suro that the company uses to direct data from its source to its destination in real time. More than just serving a key role in the Netflix data pipeline, though, it’s also a great example of the impressive — if not sometimes redunant — ecosystem of open source data-analysis tools making their way out of large web properties.

Netflix’s various applications generate tens of billions of events per day, and Suro collects them all before sending them on their way. Most head to Hadoop (via Amazon S3) for batch processing, while others head to Druid and ElasticSearch (via Apache Kafka) for real-time analysis. According to the Netflix blog post explaining Suro (which goes into much more depth), the company is also looking at how it might use real-time processing engines such as Storm or Samza to perform machine learning on event data.

An example Suro workflow. Source: Netflix

An example Suro workflow. Source: Netflix

To anyone familiar with the big data space, the names of the various technologies at play here are probably associated to some degree with the companies that developed them. Netflix created Suro, LinkedIn created Kafka and Samza, Twitter (actually, Backtype, which Twitter acquired) created Storm, and Metamarkets (see disclosure) created Druid. Suro, the blog post’s authors acknowledged, is based on the Apache Chukwa project and is similar to Apache Flume (created by Hadoop vendor Cloudera) and Facebook’s Scribe. Hadoop, of course, was all but created at Yahoo and has since seen notable ecosystem contributions from all sorts of web companies.

I sometimes wonder whether all these companies really need to be creating their own technologies all the time, or if they could often get by using the stuff their peers have already created. Like most things in life, though, the answer to that question is probably best decided on a case-by-case basis. Storm, for example, is becoming a very popular tool for stream processing, but LinkedIn felt like it needed something different and thus built Samza. Netflix decided it needed Suro as opposed to just using some pre-existing technologies (largely because of its cloud-heavy infrastructure running largely in Amazon Web Services), but also clearly uses lots of tools built elsewhere (including the Apache Cassandra database).

A diagram of LinkedIn's data architecture as of February 2013.

A diagram of LinkedIn’s data architecture as of February 2013, including everything from Kafka to Teradata.

Hopefully, the big winners in all this innovation will be mainstream technology users that don’t have the in-house talent (or, necessarily, the need) to develop these advanced systems in-house but could benefit from their capabilities. We’re already seeing Hadoop vendors, for example, trying to make projects such as Storm and the Spark processing framework usable by their enterprise customers, and it seems unlikely they’ll be the last. There are a whole lot of AWS users, after all, and they might want the capabilities Suro can provide without having to rely on Amazon to deliver them. (We’ll likely hear a lot more about the future of Hadoop as a data platform at our Structure Data conference, which takes place in March.)



Next-generation search and analytics with Apache Lucene and Solr 4

Use search engine technology to build fast, efficient, and scalable data-driven applications

Apache Lucene™ and Solr™ are highly capable open source search technologies that make it easy for organizations to enhance data access dramatically. With the 4.x line of Lucene and Solr, it’s easier than ever to add scalable search capabilities to your data-driven applications. Lucene and Solr committer Grant Ingersoll walks you through the latest Lucene and Solr features that relate to relevance, distributed search, and faceting. Learn how to leverage these capabilities to build fast, efficient, and scalable next-generation data-driven applications.

I began writing about Solr and Lucene for developerWorks six years ago (see Resources). Over those years, Lucene and Solr established themselves as rock-solid technologies (Lucene as a foundation for Java™ APIs, and Solr as a search service). For instance, they power search-based applications for Apple iTunes, Netflix, Wikipedia, and a host of others, and they help to enable the IBM Watson question-answering system.

Over the years, most people’s use of Lucene and Solr focused primarily on text-based search. Meanwhile, the new and interesting trend of big data emerged along with a (re)new(ed) focus on distributed computation and large-scale analytics. Big data often also demands real-time, large-scale information access. In light of this shift, the Lucene and Solr communities found themselves at a crossroads: the core underpinnings of Lucene began to show their age under the stressors of big data applications such as indexing all of the Twittersphere (see Resources). Furthermore, Solr’s lack of native distributed indexing support made it increasingly hard for IT organizations to scale their search infrastructure cost-effectively.

The community set to work overhauling the Lucene and Solr underpinnings (and in some cases the public APIs). Our focus shifted to enabling easy scalability, near-real-time indexing and search, and many NoSQL features — all while leveraging the core engine capabilities. This overhaul culminated in the Apache Lucene and Solr 4.x releases. These versions aim directly at solving next-generation, large-scale, data-driven access and analytics problems.

This article walks you through the 4.x highlights and shows you some code examples. First, though, you’ll go hands-on with a working application that demonstrates the concept of leveraging a search engine to go beyond search. To get the most from the article, you should be familiar with the basics of Solr and Lucene, especially Solr requests. If you’re not, see Resources for links that will get you started with Solr and Lucene.

Quick start: Search and analytics in action

Search engines are only for searching text, right? Wrong! At their heart, search engines are all about quickly and efficiently filtering and then ranking data according to some notion of similarity (a notion that’s flexibly defined in Lucene and Solr). Search engines also deal effectively with both sparse data and ambiguous data, which are hallmarks of modern data applications. Lucene and Solr are capable of crunching numbers, answering complex geospatial questions (as you’ll see shortly), and much more. These capabilities blur the line between search applications and traditional database applications (and even NoSQL applications).

For example, Lucene and Solr now:

  • Support several types of joins and grouping options
  • Have optional column-oriented storage
  • Provide several ways to deal with text and with enumerated and numerical data types
  • Enable you to define your own complex data types and storage, ranking, and analytics functions

A search engine isn’t a silver bullet for all data problems. But the fact that text search was the primary use of Lucene and Solr in the past shouldn’t prevent you from using them to solve your data needs now or in the future. I encourage you to think about using search engines in ways that go well outside the proverbial (search) box.

To demonstrate how a search engine can go beyond search, the rest of this section shows you an application that ingests aviation-related data into Solr. The application queries the data — most of which isn’t textual — and processes it with the D3 JavaScript library (see Resources) before displaying it. The data sets are from the Research and Innovative Technology Administration (RITA) of the U.S. Department of Transportation’s Bureau of Transportation Statistics and from OpenFlights. The data includes details such as originating airport, destination airport, time delays, causes of delays, and airline information for all flights in a particular time period. By using the application to query this data, you can analyze delays between particular airports, traffic growth at specific airports, and much more.

Start by getting the application up and running, and then look at some of its interfaces. Keep in mind as you go along that the application interacts with the data by interrogating Solr in various ways.


To get started, you need the following prerequisites:

  • Lucene and Solr.
  • Java 6 or higher.
  • A modern web browser. (I tested on Google Chrome and Firefox.)
  • 4GB of disk space — less if you don’t want to use all of the flight data.
  • Terminal access with a bash (or similar) shell on *nix. For Windows, you need Cygwin. I only tested on OS X with the bash shell.
  • wget if you choose to download the data by using the download script that’s in the sample code package. You can also download the flight data manually.
  • Apache Ant 1.8+ for compilation and packaging purposes, if you want to run any of the Java code examples.

See Resources for links to the Lucene, Solr, wget, and Ant download sites.

With the prerequisites in place, follow these steps to get the application up and running:

  1. Download this article’s sample code ZIP file and unzip it to a directory of your choice. I’ll refer to this directory as $SOLR_AIR.
  2. At the command line, change to the $SOLR_AIR directory:
    cd $SOLR_AIR
  3. Start Solr:
  4. Run the script that creates the necessary fields to model the data:
  5. Point your browser at http://localhost:8983/solr/#/ to display the new Solr Admin UI. Figure 1 shows an example:
    Figure 1. Solr UI

    Screen capture the new Solr UI

  6. At the terminal, view the contents of the bin/ script for details on what to download from RITA and OpenFlights. Download the data sets either manually or by running the script:

    The download might take significant time, depending on your bandwidth.

  7. After the download is complete, index some or all of the data.

    To index all data:


    To index data from a single year, use any value between 1987 and 2008 for the year. For example:

    bin/ 1987
  8. After indexing is complete (which might take significant time, depending on your machine), point your browser at http://localhost:8983/solr/collection1/travel. You’ll see a UI similar to the one in Figure 2:
    Figure 2. The Solr Air UI

    Screen capture of an example Solr AIR screen

Exploring the data

With the Solr Air application up and running, you can look through the data and the UI to get a sense of the kinds of questions you can ask. In the browser, you should see two main interface points: the map and the search box. For the map, I started with D3’s excellent Airport example (see Resources). I modified and extended the code to load all of the airport information directly from Solr instead of from the example CSV file that comes with the D3 example. I also did some initial statistical calculations about each airport, which you can see by mousing over a particular airport.

I’ll use the search box to showcase a few key pieces of functionality that help you build sophisticated search and analytics applications. To follow along in the code, see the solr/collection1/conf/velocity/map.vm file.

The key focus areas are:

  • Pivot facets
  • Statistical functionality
  • Grouping
  • Lucene and Solr’s expanded geospatial support

Each of these areas helps you get answers such as the average delay of arriving airplanes at a specific airport, or the most common delay times for an aircraft that’s flying between two airports (per airline, or between a certain starting airport and all of the nearby airports). The application uses Solr’s statistical functionality, combined with Solr’s longstanding faceting capabilities, to draw the initial map of airport “dots” — and to generate basic information such as total flights and average, minimum, and maximum delay times. (This capability alone is a fantastic way to find bad data or at least extreme outliers.) To demonstrate these areas (and to show how easy it is to integrate Solr with D3), I’ve implemented a bit of lightweight JavaScript code that:

  1. Parses the query. (A production-quality application would likely do most of the query parsing on the server side or even as a Solr query-parser plugin.)
  2. Creates various Solr requests.
  3. Displays the results.

The request types are:

  • Lookup per three-letter airport code, such as RDU or SFO.
  • Lookup per route, such as SFO TO ATL or RDU TO ATL. (Multiple hops are not supported.)
  • Clicking the search button when the search box is empty to show various statistics for all flights.
  • Finding nearby airports by using the near operator, as in near:SFO or near:SFO TO ATL.
  • Finding likely delays at various distances of travel (less than 500 miles, 500 to 1000, 1000 to 2000, 2000 and beyond), as in likely:SFO.
  • Any arbitrary Solr query to feed to Solr’s /travel request handler, such as &q=AirportCity:Francisco.

The first three request types in the preceding list are all variations of the same type. These variants highlight Solr’s pivot faceting capabilities to show, for instance, the most common arrival delay times per route (such asSFO TO ATL) per airline per flight number. The near option leverages the new Lucene and Solr spatial capabilities to perform significantly enhanced spatial calculations such as complex polygon intersections. Thelikely option showcases Solr’s grouping capabilities to show airports at a range of distances from an originating airport that had arrival delays of more than 30 minutes. All of these request types augment the map with display information through a small amount of D3 JavaScript. For the last request type in the list, I simply return the associated JSON. This request type enables you to explore the data on your own. If you use this request type in your own applications, you naturally would want to use the response in an application-specific way.

Now try out some queries on your own. For instance, if you search for SFO TO ATL, you should see results similar to those in Figure 3:

Figure 3. Example SFO TO ATL screen

Screen capture from Solr Air showing SFO TO ATL results

In Figure 3, the two airports are highlighted in the map that’s on the left. The Route Stats list on the right shows the most common arrival delay times per flight per airline. (I loaded the data for 1987 only.) For instance, it tells you that five times Delta flight 156 was delayed five minutes on arriving in Atlanta and was six minutes early on four occasions.

You can see the underlying Solr request in your browser’s console (for example, in Chrome on the Mac, choose View -> Developer -> Javascript Console) and in the Solr logs. The SFO-TO-ATL request that I used (broken into three lines here solely for formatting purposes) is:

AND Dest:ATL&q=*:*&facet.pivot=UniqueCarrier,FlightNum,ArrDelay&

The facet.pivot parameter provides the key functionality in this request. facet.pivot pivots from the airline (called UniqueCarrier) to FlightNum through to ArrDelay, thereby providing the nested structure that’s displayed in Figure 3‘s Route Stats.

If you try a near query, as in near:JFK, your result should look similar to Figure 4:

Figure 4. Example screen showing airports near JFK

Screen capture from Solr Air showing JFK and nearby airports

The Solr request that underlies near queries takes advantage of Solr’s new spatial functionality, which I’ll detail later in this article. For now, you can likely discern some of the power of this new functionality by looking at the request itself (shortened here for formatting purposes):

&fq=source:Airports&q=AirportLocationJTS:"IsWithin(Circle(40.639751,-73.778925 d=3))"

As you might guess, the request looks for all airports that fall within a circle whose center is at 40.639751 degrees latitude and -73.778925 degrees longitude and that has a radius of 3 degrees, which is roughly 111 kilometers.

By now you should have a strong sense that Lucene and Solr applications can slice and dice data — numerical, textual, or other — in interesting ways. And because Lucene and Solr are both open source, with a commercial-friendly license, you are free to add your own customizations. Better yet, the 4.x line of Lucene and Solr increases the number of places where you can insert your own ideas and functionality without needing to overhaul all of the code. Keep this capability in mind as you look next at some of the highlights of Lucene 4 (version 4.4 as of this writing) and then at the Solr 4 highlights.

Lucene 4: Foundations for next-generation search and analytics

A sea change

Lucene 4 is nearly a complete rewrite of the underpinnings of Lucene for better performance and flexibility. At the same time, this release represents a sea change in the way the community develops software, thanks to Lucene’s new randomized unit-testing framework and rigorous community standards that relate to performance. For instance, the randomized test framework (which is available as a packaged artifact for anyone to use) makes it easy for the project to test the interactions of variables such as JVMs, locales, input content and queries, storage formats, scoring formulas, and many more. (Even if you never use Lucene, you might find the test framework useful in your own projects.)

Some of the key additions and changes to Lucene are in the categories of speed and memory, flexibility, data structures, and faceting. (To see all of the details on the changes in Lucene, read the CHANGES.txt file that’s included within every Lucene distribution.)

Speed and memory

Although prior Lucene versions are generally considered to be fast enough — especially, relative to comparable general-purpose search libraries — enhancements in Lucene 4 make many operations significantly faster than in previous versions.

The graph in Figure 5 captures the performance of Lucene indexing as measured in gigabytes per hour. (Credit Lucene committer Mike McCandless for the nightly Lucene benchmarking graphs; seeResources.) Figure 5 shows that a huge performance improvement occurred in the first half of May [[year?]]:

Figure 5. Lucene indexing performance

Graph of Lucene indexing performance that shows an increase from 100GB per hour to approximately 270GB per hour in the first half of May [[year?]]

The improvement that Figure 5 shows comes from a series of changes that were made to how Lucene builds its index structures and how it handles concurrency when building them (along with a few other changes, including JVM changes and use of solid-state drives). The changes focused on removing synchronizations while Lucene writes the index to disk; for details (which are beyond this article’s scope) seeResources for links to Mike McCandless’s blog posts.

In addition to improving overall indexing performance, Lucene 4 can perform near real time (NRT) indexing operations. NRT operations can significantly reduce the time that it takes for the search engine to reflect changes to the index. To use NRT operations, you must do some coordination in your application between Lucene’s IndexWriter andIndexReader. Listing 1 (a snippet from the download package’s src/main/java/ file) illustrates this interplay:

Listing 1. Example of NRT search in Lucene
doc = new HashSet<IndexableField>();
index(writer, doc);
//Get a searcher
IndexSearcher searcher = new IndexSearcher(;
//Now, index one more doc
doc.add(new StringField("id", "id_" + 100, Field.Store.YES));
doc.add(new TextField("body", "This is document 100.", Field.Store.YES));
//The results are still 100
//Don't commit; just open a new searcher directly from the writer
searcher = new IndexSearcher(, false));
//The results now reflect the new document that was added

In Listing 1, I first index and commit a set of documents to the Directory and then search the Directory — the traditional approach in Lucene. NRT comes in when I proceed to index one more document: Instead of doing a full commit, Lucene creates a new IndexSearcher from the IndexWriter and then does the search. You can run this example by changing to the $SOLR_AIR directory and executing this sequence of commands:

  1. ant compile
  2. cd build/classes
  3. java -cp ../../lib/*:. IndexingExamples

Note: I grouped several of this article’s code examples into, so you can use the same command sequence to run the later examples in Listing 2 and Listing 4.

The output that prints to the screen is:

Num docs: 100
Num docs: 100
Num docs: 101

Lucene 4 also contains memory improvements that leverage some more-advanced data structures (which I cover in more detail in Finite state automata and other goodies). These improvements not only reduce Lucene’s memory footprint but also significantly speed up queries that are based on wildcards and regular expressions. Additionally, the code base moved away from working with Java String objects in favor of managing large allocations of byte arrays. (The BytesRef class is seemingly ubiquitous under the covers in Lucene now.) As a result, String overhead is reduced and the number of objects on the Java heap is under better control, which reduces the likelihood of stop-the-world garbage collections.

Some of the flexibility enhancements also yield performance and storage improvements because you can choose better data structures for the types of data that your application is using. For instance, as you’ll see next, you can choose to index/store unique keys (which are dense and don’t compress well) one way in Lucene and index/store text in a completely different way that better suits text’s sparseness.


What’s a segment? A Lucene segment is a subset of the overall index. In many ways a segment is a self-contained mini-index. Lucene builds its index by using segments to balance the availability of the index for searching with the speed of writing. Segments are write-once files during indexing, and a new one is created every time you commit during writing. In the background, by default, Lucene periodically merges smaller segments into larger segments to improve read performance and reduce system overhead. You can exercise complete control over this process.

The flexibility improvements in Lucene 4.x unlock a treasure-trove of opportunity for developers (and researchers) who want to squeeze every last bit of quality and performance out of Lucene. To enhance flexibility, Lucene offers two new well-defined plugin points. Both plugin points have already had a significant impact on the way Lucene is both developed and used.

The first new plugin point is designed to give you deep control over the encoding and decoding of a Lucene segment. The Codec class defines this capability. Codec gives you control over the format of the postings list (that is, the inverted index), Lucene storage, boost factors (also called norms), and much more.

In some applications you might want to implement your own Codec. But it’s much more likely that you’ll want to change the Codec that’s used for a subset of the document fields in the index. To understand this point, it helps to think about the kinds of data you are putting in your application. For instance, identifying fields (for example, your primary key) are usually unique. Because primary keys only ever occur in one document, you might want to encode them differently from how you encode the body of an article’s text. You don’t actually change the Codec in these cases. Instead, you change one of the lower-level classes that the Codec delegates to.

To demonstrate, I’ll show you a code example that uses my favorite Codec, the SimpleTextCodec. TheSimpleTextCodec is what it sounds like: a Codec for encoding the index in simple text. (The fact thatSimpleTextCodec was written and passes Lucene’s extensive test framework is a testament to Lucene’s enhanced flexibility.) SimpleTextCodec is too large and slow to use in production, but it’s a great way to see what a Lucene index looks like under the covers, which is why it is my favorite. The code in Listing 2 changes a Codec instance to SimpleTextCodec:

Listing 2. Example of changing Codec instances in Lucene
conf.setCodec(new SimpleTextCodec());
File simpleText = new File("simpletext");
directory = new SimpleFSDirectory(simpleText);
//Let's write to disk so that we can see what it looks like
writer = new IndexWriter(directory, conf);
index(writer, doc);//index the same docs as before

By running the Listing 2 code, you create a local build/classes/simpletext directory. To see the Codec in action, change to build/classes/simpletext and open the .cfs file in a text editor. You can see that the .cfs file truly is plain old text, like the snippet in Listing 3:

Listing 3. Portion of _0.cfs plain-text index file
  term id_97
    doc 97
  term id_98
    doc 98
  term id_99
    doc 99
doc 0
  numfields 4
  field 0
    name id
    type string
    value id_100
  field 1
    name body
    type string
    value This is document 100.

For the most part, changing the Codec isn’t useful until you are working with extremely large indexes and query volumes, or if you are a researcher or search-engine maven who loves to play with bare metal. Before changing Codecs in those cases, do extensive testing of the various available Codecs by using your actual data. Solr users can set and change these capabilities by modifying simple configuration items. Refer to the Solr Reference Guide for more details (see Resources).

The second significant new plugin point makes Lucene’s scoring model completely pluggable. You are no longer limited to using Lucene’s default scoring model, which some detractors claim is too simple. If you prefer, you can use alternative scoring models such as BM25 and Divergence from Randomness (see Resources), or you can write your own. Why write your own? Perhaps your “documents” represent molecules or genes; you want a fast way of ranking them, but term frequency and document frequency aren’t applicable. Or perhaps you want to try out a new scoring model that you read about in a research paper to see how it works on your content. Whatever your reason, changing the scoring model requires you to change the model at indexing time through the IndexWriterConfig.setSimilarity(Similarity) method, and at search time through theIndexSearcher.setSimilarity(Similarity) method. Listing 4 demonstrates changing the Similarity by first running a query that uses the default Similarity and then re-indexing and rerunning the query using Lucene’s BM25Similarity:

Listing 4. Changing Similarity in Lucene
conf = new IndexWriterConfig(Version.LUCENE_44, analyzer);
directory = new RAMDirectory();
writer = new IndexWriter(directory, conf);
index(writer, DOC_BODIES);
searcher = new IndexSearcher(;
System.out.println("Lucene default scoring:");
TermQuery query = new TermQuery(new Term("body", "snow"));
printResults(searcher, query, 10);

BM25Similarity bm25Similarity = new BM25Similarity();
Directory bm25Directory = new RAMDirectory();
writer = new IndexWriter(bm25Directory, conf);
index(writer, DOC_BODIES);
searcher = new IndexSearcher(;
System.out.println("Lucene BM25 scoring:");
printResults(searcher, query, 10);

Run the code in Listing 4 and examine the output. Notice that the scores are indeed different. Whether the results of the BM25 approach more accurately reflect a user’s desired set of results is ultimately up to you and your users to decide. I recommend that you set up your application in a way that makes it easy for you to run experiments. (A/B testing should help.) Then compare not only the Similarity results, but also the results of varying query construction, Analyzer, and many other items.

Finite state automata and other goodies

A complete overhaul of Lucene’s data structures and algorithms spawned two especially interesting advancements in Lucene 4:

  • DocValues (also known as column stride fields).
  • Finite State Automata (FSA) and Finite State Transducers (FST). I’ll refer to both as FSAs for the remainder of this article. (Technically, an FST is outputs values as its nodes are visited, but that distinction isn’t important for the purposes of this article.)

Both DocValues and FSA provide significant new performance benefits for certain types of operations that can affect your application.

On the DocValues side, in many cases applications need to access all of the values of a single field very quickly, in sequence. Or applications need to do quick lookups of values for sorting or faceting, without incurring the cost of building an in-memory version from an index (a process that’s known as un-inverting). DocValues are designed to answer these kinds of needs.

An application that does a lot of wildcard or fuzzy queries should see a significant performance improvement due to the use of FSAs. Lucene and Solr now support query auto-suggest and spell-checking capabilities that leverage FSAs. And Lucene’s default Codec significantly reduces disk and memory footprint by using FSAs under the hood to store the term dictionary (the structure that Lucene uses to look up query terms during a search). FSAs have many uses in language processing, so you might also find Lucene’s FSA capabilities to be instructive for other applications.

Figure 6 shows an FSA that’s built from using the words moppop,mothstarstop, and top, along with associated weights. From the example, you can imagine starting with input such as moth, breaking it down into its characters (m-o-t-h), and then following the arcs in the FSA.

Figure 6. Example of an FSA

Illustration of an FSA from

Listing 5 (excerpted from the file in this article’s sample code download) shows a simple example of building your own FSA by using Lucene’s API:

Listing 5. Example of a simple Lucene automaton
String[] words = {"hockey", "hawk", "puck", "text", "textual", "anachronism", "anarchy"};
Collection<BytesRef> strings = new ArrayList<BytesRef>();
for (String word : words) {
  strings.add(new BytesRef(word));

//build up a simple automaton out of several words
Automaton automaton = BasicAutomata.makeStringUnion(strings);
CharacterRunAutomaton run = new CharacterRunAutomaton(automaton);
System.out.println("Match: " +"hockey"));
System.out.println("Match: " +"ha"));

In Listing 5, I build an Automaton out of various words and feed it into a RunAutomaton. As the name implies, aRunAutomaton runs input through the automaton, in this case to match the input strings that are captured in the print statements at the end of Listing 5. Although this example is trivial, it lays the groundwork for understanding much more advanced capabilities that I’ll leave to readers to explore (along with DocValues) in the Lucene APIs. (See Resources for relevant links.).


At its core, faceting generates a count of document attributes to give users an easy way to narrow down their search results without making them guess which keywords to add to the query. For example, if someone searches a shopping site for televisions, facets tell them how many TVs models are made by which manufacturers. Increasingly, faceting is also often used to power search-based business analytics and reporting tools. By using more-advanced faceting capabilities, you give users the ability to slice and dice facets in interesting ways.

Facets were long a hallmark of Solr (since version 1.1). Now Lucene has its own faceting module that stand-alone Lucene applications can leverage. Lucene’s faceting module it isn’t as rich in functionality as Solr’s, but it does offer some interesting tradeoffs. Lucene’s faceting module isn’t dynamic, in that you must make some faceting decisions at indexing time. But it is hierarchical, and it doesn’t have the cost of un-inverting fields into memory dynamically.

Listing 6 (part of the sample code’s file) showcases some of Lucene’s new faceting capabilities:

Listing 6. Lucene faceting examples
DirectoryTaxonomyWriter taxoWriter = 
     new DirectoryTaxonomyWriter(facetDir, IndexWriterConfig.OpenMode.CREATE);
FacetFields facetFields = new FacetFields(taxoWriter);
for (int i = 0; i < DOC_BODIES.length; i++) {
  String docBody = DOC_BODIES[i];
  String category = CATEGORIES[i];
  Document doc = new Document();
  CategoryPath path = new CategoryPath(category, '/');
  //Setup the fields
  facetFields.addFields(doc, Collections.singleton(path));//just do a single category path
  doc.add(new StringField("id", "id_" + i, Field.Store.YES));
  doc.add(new TextField("body", docBody, Field.Store.YES));
DirectoryReader reader =;
IndexSearcher searcher = new IndexSearcher(reader);
DirectoryTaxonomyReader taxor = new DirectoryTaxonomyReader(taxoWriter);
ArrayList<FacetRequest> facetRequests = new ArrayList<FacetRequest>();
CountFacetRequest home = new CountFacetRequest(new CategoryPath("Home", '/'), 100);
facetRequests.add(new CountFacetRequest(new CategoryPath("Home/Sports", '/'), 10));
facetRequests.add(new CountFacetRequest(new CategoryPath("Home/Weather", '/'), 10));
FacetSearchParams fsp = new FacetSearchParams(facetRequests);

FacetsCollector facetsCollector = FacetsCollector.create(fsp, reader, taxor); MatchAllDocsQuery(), facetsCollector);

for (FacetResult fres : facetsCollector.getFacetResults()) {
  FacetResultNode root = fres.getFacetResultNode();
  printFacet(root, 0);

The key pieces in Listing 6 that go beyond normal Lucene indexing and search are in the use of theFacetFieldsFacetsCollectorTaxonomyReader, and TaxonomyWriter classes. FacetFields creates the appropriate field entries in the document and works in concert with TaxonomyWriter at indexing time. At search time, TaxonomyReader works with FacetsCollector to get the appropriate counts for each category. Note, also, that Lucene’s faceting module creates a secondary index that, to be effective, must be kept in sync with the main index. Run the Listing 6 code by using the same command sequence you used for the earlier examples, except substitute FacetExamples for IndexingExamples in the java command. You should get:

Home (0.0)
 Home/Children (3.0)
  Home/Children/Nursery Rhymes (3.0)
 Home/Weather (2.0)

 Home/Sports (2.0)
  Home/Sports/Rock Climbing (1.0)
  Home/Sports/Hockey (1.0)
 Home/Writing (1.0)
 Home/Quotes (1.0)
  Home/Quotes/Yoda (1.0)
 Home/Music (1.0)
  Home/Music/Lyrics (1.0)

Notice that in this particular implementation I’m not including the counts for the Home facet, because including them can be expensive. That option is supported by setting up the appropriate FacetIndexingParams, which I’m not covering here. Lucene’s faceting module has additional capabilities that I’m not covering. I encourage you to explore them — and other new Lucene features that this article doesn’t touch on — by checking out the article Resources. And now, on to Solr 4.x.

Solr 4: Search and analytics at scale

From an API perspective, much of Solr 4.x looks and feels the same as previous versions. But 4.x contains numerous enhancements that make it easier to use, and more scalable, than ever. Solr also enables you to answer new types of questions, all while leveraging many of the Lucene enhancements that I just outlined. Other changes are geared toward the developer’s getting-started experience. For example, the all-new Solr Reference Guide (see Resources) provides book-quality documentation of every Solr release (starting with 4.4). And Solr’s new schemaless capabilities make it easy to add new data to the index quickly without first needing to define a schema. You’ll learn about Solr’s schemaless feature in a moment. First you’ll look at some of the new search, faceting, and relevance enhancements in Solr, some of which you saw in action in the Solr Air application.

Search, faceting, and relevance

Several new Solr 4 capabilities are designed to make it easier — on both the indexing side and the search-and-faceting side — to build next-generation data-driven applications. Table 1 summarizes the highlights and includes command and code examples when applicable:

Table 1. Indexing, searching, and faceting highlights in Solr 4
Name Description Example
Pivot faceting Gather counts for all of a facet’s subfacets, as filtered through the parent facet. See the Solr Air examplefor more details. Pivot on a variety of fields:
http://localhost:8983/solr/collection1/travel?&wt=json&facet=true&facet.limit=5&fq=&q=*:*  &facet.pivot=Origin,Dest,UniqueCarrier,FlightNum,ArrDelay&indent=true
New relevance function queries Access various index-level statistics such as document frequency and term frequency as part of a function query. Add the Document frequency for the term Origin:SFO to all returned documents:
http://localhost:8983/solr/collection1/travel?&wt=json&q=*:*&fl=*, {!func}docfreq('Origin',%20'SFO')&indent=true
Note that this command also uses the new DocTransformers capability.
Joins Represent more-complex document relationships and then join them at search time. More-complex joins are slated for future releases of Solr. Return only flights that have originating airport codes that appear in the Airport data set (and compare to the results of a request without the join):
Codecsupport Change theCodec for the index and the postings format for individual fields. Use the SimpleTextCodec for a field:
<fieldType name="string_simpletext" postingsFormat="SimpleText" />
New update processors Use Solr’s Update Processor framework to plug in code to change documents before they are indexed but after they are sent to Solr.
  • Field mutating (for example, concatenate fields, parse numerics, trim)
  • Scripting. Use JavaScript or other code that’s supported by the JavaScript engine to process documents. See the update-script.js file in the Solr Air example.
  • Language detection (technically available in 3.5, but worth mentioning here) for identifying the language (such as English or Japanese) that’s used in a document.
Atomic updates Send in just the parts of a document that have changed, and let Solr take care of the rest. From the command line, using cURL, change the origin of document 243551 to be FOO:
curl http://localhost:8983/solr/update -H 'Content-type:application/json' -d ' [{"id": "243551","Origin": {"set":"FOO"}}]'

You can run the first three example commands in Table 1 in your browser’s address field (not the Solr Air UI) against the Solr Air demo data.

For more details on relevance functions, joins, and Codec — and other new Solr 4 features — see Resourcesfor relevant links to the Solr Wiki and elsewhere.

Scaling, NoSQL, and NRT

Probably the single most significant change to Solr in recent years was that building a multinode scalable search solution became much simpler. With Solr 4.x, it’s easier than ever to scale Solr to be the authoritative storage and access mechanism for billions of records — all while enjoying the search and faceting capabilities that Solr has always been known for. Furthermore, you can rebalance your cluster as your capacity needs change, as well as take advantage of optimistic locking, atomic updates of content, and real-time retrieval of data even if it hasn’t been indexed yet. The new distributed capabilities in Solr are referred to collectively as SolrCloud.

How does SolrCloud work? Documents that are sent to Solr 4 when it’s running in (optional) distributed mode are routed according to a hashing mechanism to a node in the cluster (called the leader). The leader is responsible for indexing the document into a shard. A shard is a single index that is served by a leader and zero or more replicas. As an illustration, assume that you have four machines and two shards. When Solr starts, each of the four machines communicates with the other three. Two of the machines are elected leaders, one for each shard. The other two nodes automatically become replicas of one of the shards. If one of the leaders fails for some reason, a replica (in this case the only replica) becomes the leader, thereby guaranteeing that the system still functions properly. You can infer from this example that in a production system enough nodes must participate to ensure that you can handle system outages.

To see SolrCloud in action, you can run launch a two-node, two-shard system by running the script that you used in the Solr Air example with a -z flag. From the *NIX command line, first shut down your old instance:

kill -9 PROCESS_ID

Then restart the system:

bin/ -c -z

The -c flag erases the old index. The -z flag tells Solr to start up with an embedded version of Apache Zookeeper.

Apache Zookeeper

Zookeeper is a distributed coordination system that’s designed to elect leaders, establish a quorum, and perform other tasks to coordinate the nodes in a cluster. Thanks to Zookeeper, a Solr cluster never suffers from “split-brain” syndrome, whereby part of the cluster behaves independently of the rest of the cluster as the result of a partitioning event. See Resources to learn more about Zookeeper.

Point your browser at the SolrCloud admin page, http://localhost:8983/solr/#/~cloud, to verify that two nodes are participating in the cluster. You can now re-index your content, and it will be spread across both nodes. All queries to the system are also automatically distributed. You should get the same number of hits for a match-all-documents search against two nodes that you got for one node.

The script launches Solr with the following command for the first node:

java -Dbootstrap_confdir=$SOLR_HOME/solr/collection1/conf 
-Dcollection.configName=myconf -DzkRun -DnumShards=2 -jar start.jar

The script tells the second node where Zookeeper is:

java -Djetty.port=7574 -DzkHost=localhost:9983 -jar start.jar

Embedded Zookeeper is great for getting started, but to ensure high availability and fault tolerance for production systems, set up a stand-alone set of Zookeeper instances in your cluster.

Stacked on top the SolrCloud capabilities are support for NRT and many NoSQL-like functions, such as:

  • Optimistic locking
  • Atomic updates
  • Real-time gets (retrieving a specific document before it is committed)
  • Transaction-log-backed durability

Many of the distributed and NoSQL functions in Solr — such as automatic versioning of documents and transaction logs — work out of the box. For a few other features, the descriptions and examples in Table 2 will be helpful:

Table 2. Summary of distributed and NoSQL features in Solr 4
Name Description Example
Realtime get Retrieve a document, by ID, regardless of its state of indexing or distribution. Get the document whose ID is 243551:
Shard splitting Split your index into smaller shards so they can be migrated to new nodes in the cluster. Split shard1 into two shards:
NRT Use NRT to search for new content much more quickly than in previous versions. Turn on <autoSoftCommit> in your solrconfig.xml file. For example:
Document routing Specify which documents live on which nodes. Ensure that all of a user’s data is on certain machines. Read Joel Bernstein’s blog post (see Resources).
Collections Create, delete, or update collections as needed, programmatically, using Solr’s new collections API. Create a new collection named hockey:

Going schemaless

Schemaless: Marketing hype?

Data collections rarely lack a schema. Schemaless is a marketing term that’s derived from a data-ingestion engine’s ability to react appropriately to the data “telling” the engine what the schema is — instead of the engine specifying the form that the data must take. For instance, Solr can accept JSON input and can index content appropriately on the basis of the schema that’s implicitly defined in the JSON. As someone pointed out to me on Twitter, less schema is a better term than schemaless, because you define the schema in one place (such as a JSON document) instead of two (such as a JSON document and Solr).

Based on my experience, in the vast majority of cases you should not use schemaless in a production system unless you enjoy debugging errors at 2 a.m. when your system thinks it has one type of data and in reality has another.

Solr’s schemaless functionality enables clients to add content rapidly without the overhead of first defining a schema.xml file. Solr examines the incoming data and passes it through a cascading set of value parsers. The value parsers guess the data’s type and then automatically add the fields to the internal schema and add the content to the index.

A typical production system (with some exceptions) shouldn’t use schemaless, because the value guessing isn’t always perfect. For instance, the first time Solr sees a new field, it might identify the field as an integer and thus define an integer FieldType in the underlying schema. But you may discover three weeks later that the field is useless for searching because the rest of the content that Solr sees for that field consists of float point values.

However, schemaless is especially helpful for early-stage development or for indexing content whose format you have little to no control over. For instance, Table 2 includes an example of using the collections API in Solr to create a new collection:


After you create the collection, you can use schemaless to add content to it. First, though, take a look at the current schema. As part of implementing schemaless support, Solr also added Representational State Transfer (REST) APIs for accessing the schema. You can see all of the fields defined for the hockey collection by pointing your browser (or cURL on the command line) at http://localhost:8983/solr/hockey/schema/fields. You see all of the fields from the Solr Air example. The schema uses those fields because the create option used my default configuration as the basis for the new collection. You can override that configuration if you want. (A side note: The script that’s included in the sample code download uses the new schema APIs to create all of the field definitions automatically.)

To add to the collection by using schemaless, run:


The following JSON is added to the hockey collection that you created earlier:

        "id": "id1",
        "team": "Carolina Hurricanes",
        "description": "The NHL franchise located in Raleigh, NC",
        "cupWins": 1

As you know from examining the schema before you added this JSON to the collection, the teamdescription, and cupWins fields are new. When the script ran, Solr guessed their types automatically and created the fields in the schema. To verify, refresh the results at http://localhost:8983/solr/hockey/schema/fields. You should now see teamdescription, andcupWins all defined in the list of fields.

Spatial (not just geospatial) improvements

Solr’s longstanding support for point-based spatial searching enables you to find all documents that are within some distance of a point. Although Solr supports this approach in an n-dimensional space, most people use it for geospatial search (for example, find all restaurants near my location). But until now, Solr didn’t support more-involved spatial capabilities such as indexing polygons or performing searches within indexed polygons. Some of the highlights of the new spatial package are:

  • Support through the Spatial4J library (see Resources) for many new spatial types — such as rectangles, circles, lines, and arbitrary polygons — and support for the Well Known Text (WKT) format
  • Multivalued indexed fields, which you can use to encode multiple points into the same field
  • Configurable precision that gives the developer more control over accuracy versus computation speed
  • Fast filtering of content
  • Query support for Is WithinContains, and IsDisjointTo
  • Optional support for the Java Topological Suite (JTS) (see Resources)
  • Lucene APIs and artifacts

The schema for the Solr Air application has several field types that are set up to take advantage of this new spatial functionality. I defined two field types for working with the latitude and longitude of the airport data:

<fieldType name="location_jts" 
distErrPct="0.025" spatialContextFactory=
maxDistErr="0.000009" units="degrees"/>

<fieldType name="location_rpt" 
distErrPct="0.025" geo="true" maxDistErr="0.000009" units="degrees"/>

The location_jts field type explicitly uses the optional JTS integration to define a point, and thelocation_rpt field type doesn’t. If you want to index anything more complex than simple rectangles, you need to use the JTS version. The fields’ attributes help to define the system’s accuracy. These attributes are required at indexing time because Solr, via Lucene and Spatial4j, encodes the data in multiple ways to ensure that the data can be used efficiently at search time. For your applications, you’ll likely want to run some tests with your data to determine the tradeoffs to make in terms of index size, precision, and query-time performance.

In addition, the near query that’s used in the Solr Air application uses the new spatial-query syntax (IsWithinon a Circle) for finding airports near the specified origin and destination airports.

New administration UI

In wrapping up this section on Solr, I would be remiss if I didn’t showcase the much more user-friendly and modern Solr admin UI. The new UI not only cleans up the look and feel but also adds new functionality for SolrCloud, document additions, and much more.

For starters, when you first point your browser at http://localhost:8983/solr/#/, you should see a dashboard that succinctly captures much of the current state of Solr: memory usage, working directories, and more, as in Figure 7:

Figure 7. Example Solr dashboard

Screen capture of an example Solr dashboard

If you select Cloud in the left side of the dashboard, the UI displays details about SolrCloud. For example, you get in-depth information about the state of configuration, live nodes, and leaders, as well as visualizations of the cluster topology. Figure 8 shows an example. Take a moment to work your way through all of the cloud UI options. (You must be running in SolrCloud mode to see them.)

Figure 8. Example SolrCloud UI

Screen capture of a SolrCloud UI example

The last area of the UI to cover that’s not tied to a specific core/collection/index is the Core Admin set of screens. These screens provides point-and-click control over the administration of cores, including adding, deleting, reloading, and swapping cores. Figure 9 shows the Core Admin UI:

Figure 9. Example of Core Admin UI

Screen capture of the core Solr admin UI

By selecting a core from the Core list, you access an overview of information and statistics that are specific to that core. Figure 10 shows an example:

Figure 10. Example core overview

Screen capture of a core overview example in the Solr UI

Most of the per-core functionality is similar to the pre-4.x UI’s functionality (albeit in a much more pleasant way), with the exception of the Documents option. You can use the Documents option to add documents in various formats (JSON, CSV, XML, and others) to the collection directly from the UI, as Figure 11:

Figure 11. Example of adding a document from the UI

Screen capture from the Solr UI that shows a JSON document being added to a collection

You can even upload rich document types such as PDF and Word. Take a moment to add some documents into your index or browse the other per-collection capabilities such as the Query interface or the revampedAnalysis screen.

The road ahead

Next-generation search-engine technology gives users the power to decide what to do with their data. This article gave you a good taste of what Lucene and Solr 4 are capable of, and, I hope, a broader sense of how search engines solve non-text-based search problems that involve analytics and recommendations.

Lucene and Solr are in constant motion, thanks to a large sustaining community that’s backed by more than 30 committers and hundreds of contributors. The community is actively developing two main branches: the current officially released 4.x branch and the trunk branch, which represents the next major (5.x) release On the official release branch, the community is committed to backward compatibility and an incremental approach to development that focuses on easy upgrades of current applications. On the trunk branch, the community is a bit less restricted in terms of ensuring compatibility with previous releases. If you want to try out the cutting edge in Lucene or Solr, check the trunk branch of the code out from Subversion or Git (see Resources). Whichever path you choose, you can take advantage of Lucene and Solr for powerful search-based analytics that go well beyond plain text search.


Thanks to David Smiley, Erik Hatcher, Yonik Seeley, and Mike McCandless for their help.