S4: Distributed Stream Computing Platform

A few weeks ago I mentioned Yahoo! Labs was working on something called S4 for real-time data analysis. Yesterday they released an 8 page paper with detailed description of how and why they built this. Here is the abstract from the paper.

Its interesting to note that the authors compared S4 with MapReduce and explained that MapReduce was too optimized for batch process and wasn’t the best place to do real time computation. They also made an architectural decision of not building a system which can do both offline (batch) processing and real-time processing since they feared such a system would end up to be not good for either.

S4 is a general-purpose, distributed, scalable, partially fault-tolerant, pluggable imageplatform that allows programmers to easily develop applications for processing continuous unbounded streams of data. Keyed data events are routed with affinity to Processing Elements (PEs), which consume the events and do one or both of the following: (1) emit one or more events which may be consumed by other PEs, (2) publish results. The architecture resembles the Actors model [1], providing semantics of encapsulation and location transparency, thus allowing applications to be massively concurrent while exposing a simple programming interface to application developers. In this paper, we outline the S4 architecture in detail, describe various applications, including real-life deployments. Our de- sign is primarily driven by large scale applications for data mining and machine learning in a production environment. We show that the S4 design is surprisingly flexible and lends itself to run in large clusters built with commodity hardware.

Code: http://s4.io/

Authors: Neumeyer L, Robbins B, Nair A, Kesari A
Source: Yahoo! Labs

Kafka : A high-throughput distributed messaging system.

Found an interesting new open source project which I hadn’t heard about before. Kafka is a messaging system used by linkedin to serve as the foundation of their activity stream processing.

Kafka is a distributed publish-subscribe messaging system. It is designed to support the following

  • Persistent messaging with O(1) disk structures that provide constant time performance even with many TB of stored messages.
  • High-throughput: even with very modest hardware Kafka can support hundreds of thousands of messages per second.
  • Explicit support for partitioning messages over Kafka servers and distributing consumption over a cluster of consumer machines while maintaining per-partition ordering semantics.
  • Support for parallel data load into Hadoop.

Kafka is aimed at providing a publish-subscribe solution that can handle all activity stream data and processing on a consumer-scale web site. This kind of activity (page views, searches, and other user actions) are a key ingredient in many of the social feature on the modern web. This data is typically handled by “logging” and ad hoc log aggregation solutions due to the throughput requirements. This kind of ad hoc solution is a viable solution to providing logging data to an offline analysis system like Hadoop, but is very limiting for building real-time processing. Kafka aims to unify offline and online processing by providing a mechanism for parallel load into Hadoop as well as the ability to partition real-time consumption over a cluster of machines.

The use for activity stream processing makes Kafka comparable to Facebook’s Scribe or Cloudera’s Flume, though the architecture and primitives are very different for these systems and make Kafka more comparable to a traditional messaging system. See our design page for more details.

OpenTSDB – Distributed time series database

Ever since I saw a demo of this tool, I’ve been on the edge, waiting for it to be opensourced so that I could use it.  The problem its trying to solve is a real pain-point which most webops folks would understand.

Yesterday folks at stumbleupon finally opened it up. Its released under LGPLv3 license. You can find the source here and the documentation here.

At StumbleUpon, we have found this system tremendously helpful to:

  • Get real-time state information about our infrastructure and services.
  • Understand outages or how complex systems interact together.
  • Measure SLAs (availability, latency, etc.)
  • Tune our applications and databases for maximum performance.
  • Do capacity planning.

Cfmap: Publishing, discovering and dashboarding infrastructure state

image

Dynamic infrastructure can be a challenging if apps and scripts can’t keep up with them. At Ingenuity we observed this problem when we started moving towards virtualization and SOA (service oriented architecture). Remembering server names became impractical, and error-free manual configuration changes became impossible.

imageWhile there are some tools which solve parts of this specific problem, we couldn’t find any opensource tool which could be used to both publish and discover state of a system in a distributed, scalable and fault-tolerant way. Zookeeper which comes pretty close to what we needed was a fully consistent system which was not designed to be used across multiple data centers over high latency, unstable network connections. We wanted a system which could not only be up during network outages, but also sync up the state from different data-centers when they are connected.

We built a few different tools to solve our scalability problems, one of which is a tool called Cfmap which we are opensourcing today to help others facing the same problem.

So what is cfmap ?

Built over cassandra, cfmap is designed to be a scalable, eventually consistent and a fault tolerant repository of state information. It provides a set of REST APIs and UIs to both publish and discover state of an entity or a group of entities with great ease. The APIs are so simple that you would most probably be writing your own custom agents for the various servers and processes than use the agent which comes bundled with the tool.

We have been using cfmap internally for a few months and the results are promising. Here is an example of how cfmap’s dashboard looks like on our network  (I’ve changed some names to protect the actual resource names).  Here is another dashboard which is running out in the public which you can use today as a demo.

image

Cfmap provides the ability to quickly drill down to a filtered set of servers or apps, and the ability to export them quickly into a json or a shell greppable format. The two export formats available today makes dashboarding and scripting a trivial task.

The image above shows a small set of applications from our dev cluster which is sorted in the order of the time when the apps were deployed. In addition to showing the host names, status of the apps, and version information, it also lists the time when the app sent the last heartbeat. What is not visible here is that it also keeps track of certain changes in a “log” which could be used to understand historical changes of a particular record over time.

While REST interface is easy to use, you could choose to use the commandline tool “cfquery”, which comes with Cfmap to interact with cfmap. Cfquery could be used to both publish and search results… lets look at some example.

Here is an example of how one could extract a list of all the hosts in cfmap.

rkt@torque:~/cc/cfmap/bin$ ./cfquery.pl -c view | grep ":host=" | cut -d':' -f2host=team50host=ip-10-205-15-124host=torquehost=anorien

Here is a more elaborate example which shows up cfmap output could be used as parts of other scripts. In this case, the query just specifies a host “anorien” in the query. The result is a dump of all the properties set by the host. A few extra commands can quickly help you extract specific properties which can then be used as a data-source for other tools (like monitoring).

rkt@torque:~/cc/cfmap/bin$ ./cfquery.pl -c view -p "host=anorien"

52cb892bc339f286bacbcfe9a8c8b4a6:port=0
52cb892bc339f286bacbcfe9a8c8b4a6:stats_host_freeswap=1999
52cb892bc339f286bacbcfe9a8c8b4a6:host=anorien
52cb892bc339f286bacbcfe9a8c8b4a6:stats_host_loadavg5m=0
52cb892bc339f286bacbcfe9a8c8b4a6:cfqversion=1.1
52cb892bc339f286bacbcfe9a8c8b4a6:stats_host_estconn=4
52cb892bc339f286bacbcfe9a8c8b4a6:type=host
52cb892bc339f286bacbcfe9a8c8b4a6:deployed_date=1286217400
52cb892bc339f286bacbcfe9a8c8b4a6:version=2.6.32-00007-g56678ec
52cb892bc339f286bacbcfe9a8c8b4a6:ip=127.0.0.1
52cb892bc339f286bacbcfe9a8c8b4a6:stats_host_pscount=101
52cb892bc339f286bacbcfe9a8c8b4a6:stats_host_loadavg15m=0
52cb892bc339f286bacbcfe9a8c8b4a6:stats_host_loadavgentities=0
52cb892bc339f286bacbcfe9a8c8b4a6:stats_host_freemem=3
52cb892bc339f286bacbcfe9a8c8b4a6:stats_host_loadavg1m=0
52cb892bc339f286bacbcfe9a8c8b4a6:stats_host_totalswap=1999
52cb892bc339f286bacbcfe9a8c8b4a6:stats_host_totalmem=501
52cb892bc339f286bacbcfe9a8c8b4a6:appname=os
52cb892bc339f286bacbcfe9a8c8b4a6:checked=1286331427

rkt@torque:~/cc/cfmap/bin$ ./cfquery.pl -c view -p "host=anorien" | grep stats_host_totalmem
52cb892bc339f286bacbcfe9a8c8b4a6:stats_host_totalmem=501
rkt@torque:~/cc/cfmap/bin$ ./cfquery.pl -c view -p "host=anorien" | grep stats_host_totalmem | cut -d'=' -f2
501

Few other interesting features

  • Schema-less design – cfmap provides a simple schema-less datastore which could be used for other purposes as well. Please note that since it was designed to maintain “state” (instead of a simple datastore API), it has a few reserved keywords which have a special meaning.
  • Low overhead to add/delete cfmap nodes – Since its built over cassandra, adding new nodes is as simple as adding new cassandra servers.
  • Configurable – The recommended way of setting up cfmap for production use would be to host cfmap (which comes with a bundled version of cassandra) on 3 or more servers. Then put them all under a single DNS entry (round robin) and let DNS loadbalancing take care of the rest.
    • If you want an even more redundancy, setup something like haproxy on each of the nodes which could also monitor and redirect traffic to alternate cfmap nodes when failures (or GCs) happen.
    • The default setup doesn’t enforce consistency during reads or writes to facilitate smooth operation even during massive network or system failures. But if you wish, you could tweak the consistency, replication requirements based on your needs.

Cfmap is still a very early prototype, but we welcome others to play with it.

Distributed systems and Unique IDs: Snowflake

Most of us who deal with traditional databases take auto-increments for granted. While auto-increments are simple on consistent clusters, it can become a challenge in a cluster of independent nodes which don’t use the same source for the unique-ids. Even bigger challenge is to do it in such a way so that they are roughly in sequence.

While this may be an old problem, I realized the importance of such a sequence only after using Cassandra in my own environment. Twitter, which has been using Cassandra in many interesting ways has proposed a solution for it which they are releasing as open source today.

Here are some interesting sections from their post announcing “Snowflake”.

The Problem

We currently use MySQL to store most of our online data. In the beginning, the data was in one small database instance which in turn became one large database instance and eventually many large database clusters. For various reasons, the details of which merit a whole blog post, we’re working to replace many of these systems with the Cassandra distributed database or horizontally sharded MySQL (using gizzard).

Unlike MySQL, Cassandra has no built-in way of generating unique ids – nor should it, since at the scale where Cassandra becomes interesting, it would be difficult to provide a one-size-fits-all solution for ids. Same goes for sharded MySQL.

Our requirements for this system were pretty simple, yet demanding:

We needed something that could generate tens of thousands of ids per second in a highly available manner. This naturally led us to choose an uncoordinated approach.

These ids need to be roughly sortable, meaning that if tweets A and B are posted around the same time, they should have ids in close proximity to one another since this is how we and most Twitter clients sort tweets.[1]

Additionally, these numbers have to fit into 64 bits. We’ve been through the painful process of growing the number of bits used to store tweet ids before. It’s unsurprisingly hard to do when you have over 100,000 different codebases involved.

 

Solution

To generate the roughly-sorted 64 bit ids in an uncoordinated manner, we settled on a composition of: timestamp, worker number and sequence number.

Sequence numbers are per-thread and worker numbers are chosen at startup via zookeeper (though that’s overridable via a config file).

We encourage you to peruse and play with the code: you’ll find it on github. Please remember, however, that it is currently alpha-quality software that we aren’t yet running in production and is very likely to change.