Sawzall and the PIG

When I heard interesting uses cases of how “Sawzall” is used to hack huge amounts of log data within Google I was thinking about two things.

  • Apache PIG, which is “a platform for analyzing large data sets that consists of a high-level language Pig for expressing data analysis programs, coupled with infrastructure for evaluating these programs. The salient property of Pig programs is that their structure is amenable to substantial parallelization, which in turns enables them to handle very large data sets.”
  • CEP (Complex event processing) – consists in processing many events happening across all the layers of an organization, identifying the most meaningful events within the event cloud, analyzing their impact, and taking subsequent action in real time. [ Also look at esper ]

Google has opened parts of this framework in a project called “Szl”

Sawzall is a procedural language developed for parallel analysis of very large data sets (such as logs). It provides protocol buffer handling, regular expression support, string and array manipulation, associative arrays (maps), structured data (tuples), data fingerprinting (64-bit hash values), time values, various utility operations and the usual library functions operating on floating-point and string values. For years Sawzall has been Google’s logs processing language of choice and is used for various other data analysis tasks across the company.

Instead of specifying how to process the entire data set, a Sawzall program describes the processing steps for a single data record independent of others. It also provides statements for emitting extracted intermediate results to predefined containers that aggregate data over all records. The separation between per-record processing and aggregation enables parallelization. Multiple records can be processed in parallel by different runs of the same program, possibly distributed across many machines. The language does not specify a particular implementation for aggregation, but a number of aggregators are supplied. Aggregation within a single execution is automatic. Aggregation of results from multiple executions is not automatic but an example program is supplied.

Here is a quick example of how it could be used…

  topwords: table top(3) of word: string weight count: int;
   fields: array of bytes = splitcsvline(input);
   w: string = string(fields[0]);
   c: int = int(string(fields[1]), 10);
   if (c != 0) {
   emit topwords <- w weight c;
   }

Given the input:

  abc,1
   def,2
   ghi,3
   def,4
   jkl,5

The program (using –table_output) emits:

  topwords[] = def, 6, 0
   topwords[] = jkl, 5, 0
   topwords[] = ghi, 3, 0

Real-Time MapReduce using S4

While trying to figure out how to do real-time log analysis in my own organization I realized that most map-reduce frameworks are designed to run as batch jobs in time delays manner rather than be instantaneous like a SQL query to a Mysql DB. There are some frameworks which are bucking the trend. Yahoo! Lab! recently announced that their “Advertising Sciences” group has built a general purpose, real-time, distributed, fault-tolerant, scalable, event driven, expandable platform called “S4” which allows programmers to easily implement applications for processing continuous unbounded streams of data.

S4 clusters are built using low-cost commoditized hardware, and leverage many technologies from Yahoo!’s Hadoop project. S4 is written in Java and uses the Spring Framework to build a software component architecture. Over a dozen pluggable modules have been created so far.

Why do we need a real-time map-reduce framework?
Applications such as personalization, user feedback, malicious traffic detection, and real-time search require both very fast response and scalability. In S4 we abstract the input data as streams of key-value pairs that arrive asynchronously and are dispatched intelligently to processing nodes that produce data sets of output key-value pairs. In search, for example, the output data sets are made available to the serving system before a user executes her next search query. We use this rapid feedback to adapt the search models based on user intent

Read more: Original post from Yahoo! Labs

Spanner: Google’s next Massive Storage and Computation infrastructure

MapReduce, Bigtable and Pregel have their origins in Google and they all deal with “large systems”. But all of them may be dwarfed in size and complexity by a new project Google is working on, which was mentioned briefly (may be un-intentionally) at an event last year.

Instead of caching data closer to user, it looks like Google is trying to take “the data” to the user. If you use GMail or a Google Doc service, then with this framework, Google could, auto-magically, “move” one of the master copies of your data to the nearest Google data center without really having to cache anything locally. And because they are building one single datastore cluster around the world, instead of building hundreds of smaller ones for different applications, it looks like they may not don’t need dedicated clusters for specific projects anymore.

Below is the gist of “Spanner” from a talk by Jeff Dean at Symposium held at Cornell. Take a look at the rest of the slides if you are interested in some impressive statistics on hardware performance and reliability.

  • Spanner: Storage & computation system that spans all our datacenters
    • Single global namespace
      • Names are independent of location(s) of data
      • Similarities to Bigtable: table, families, locality groups, coprocessors,…
      • Differences: hierarchical directories instead of rows, fine-grained replication
      • Fine-grained ACLs, replication configuration at the per-directory level
    • support mix of strong and weak consistency across datacenters
      • Strong consistency implemented with Paxos across tablet replicas
      • Full support for distributed transactions across directories/machines
    • much more automated operation
      • System automatically moves and adds replicas of data and computation based on constraints and usage patterns
      • Automated allocation of resources across entire fleet of machines.

image

 
References

Pregel: Google’s other data-processing infrastructure

Inside Google, MapReduce is used for 80% of all the data processing needs. That includes indexing web content, running the clustering engine for Google News, generating reports for popular queries (Google Trends), processing satellite imagery , language model processing for statistical machine translation and  even mundane tasks like data backup and restore.

The other 20% is handled by a lesser known infrastructure called “Pregel” which is optimized to mine artrelationships from “graphs”.

According to wikipedia a “graph” is a collection of vertices or ‘nodes’ and a collection of ‘edges’ that connect pair of ‘nodes’.  Depending on the requirements, a ‘graph’ can be undirected which means there is no distinction between the two ‘nodes’ in the graph, or it could be directed from one ‘node’ to another.

While you can calculate something like ‘pagerank’ with MapReduce very quickly, you need more complex algorithms to mine some other kinds of relationships between pages or other sets of data.

 

Despite differences in structure and origin, many graphs out there have two things in common: each of them keeps growing in size, and there is a seemingly endless number of facts and details people would like to know about each one. Take, for example, geographic locations. A relatively simple analysis of a standard map (a graph!) can provide the shortest route between two cities. But progressively more sophisticated analysis could be applied to richer information such as speed limits, expected traffic jams, roadworks and even weather conditions. In addition to the shortest route, measured as sheer distance, you could learn about the most scenic route, or the most fuel-efficient one, or the one which has the most rest areas. All these options, and more, can all be extracted from the graph and made useful — provided you have the right tools and inputs. The web graph is similar. The web contains billions of documents, and that number increases daily. To help you find what you need from that vast amount of information, Google extracts more than 200 signals from the web graph, ranging from the language of a webpage to the number and quality of other pages pointing to it.

In order to achieve that, we have created scalable infrastructure, named Pregel, to mine a wide range of graphs. In Pregel, programs are expressed as a sequence of iterations. In each iteration, a vertex can, independently of other vertices, receive messages sent to it in the previous iteration, send messages to other vertices, modify its own and its outgoing edges’ states, and mutate the graph’s topology (experts in parallel processing will recognize that the Bulk Synchronous Parallel Model inspired Pregel).

Currently, Pregel scales to billions of vertices and edges, but this limit will keep expanding. Pregel’s applicability is harder to quantify, but so far we haven’t come across a type of graph or a practical graph computing problem which is not solvable with Pregel. It computes over large graphs much faster than alternatives, and the application programming interface is easy to use. Implementing PageRank, for example, takes only about 15 lines of code. Developers of dozens of Pregel applications within Google have found that "thinking like a vertex," which is the essence of programming in Pregel, is intuitive.

This is from the Paper

We conducted an experiment solving single-source shortest paths using a simple Belman-Ford algorithm expressed in Pregel. As input data, we chose a randomly generated graph with a log-normal distribution of outdegrees; the graph had 1B vertices and approximately 80B edges. Weights of all edges were set to 1 for simplicity. We used a cluster of 480 commodity multi-core PCs and 2000 workers. The graph was partitioned 8000 ways using the default partitioning function based on a random hash, to get a sense of the default performance of the Pregel system. Each worker was assigned 4 graph partitions and allowed up to 2 computational threads to execute the Compute() function over the partitions (not counting system threads that handle messaging, etc.). Running shortest paths took under 200 seconds, using 8 supersteps.

 

A talk is scheduled on Pregel at SIGMOD 2010

491 Pregel: A System for Large-Scale Graph Processing
Greg Malewicz, Google, Inc.; Matthew Austern, Google, Inc.; Aart Bik, Google, Inc.; James Dehnert, Google, Inc.; Ilan Horn, Google, Inc.; Naty Leiser, Google, Inc.; Grzegorz Czajkowski, Google, Inc.

 

References
  1. Pregel: A system for large-scale graph processing graph
  2. Inference Anatomy of the Google Pregel
  3. Large-scale graph computing at Google
  4. Mapreduce sigmetrics 09
  5. Bulk synchronous parallel
  6. Hamburg, Graph processing on Hadoop
  7. Bulk synchronous parallel
  8. 12 Reasons to learn graph theory
Related databases and products
  1. Neo4j is an embedded, disk-based, fully transactional Java persistence engine that stores data structured in graphs rather than in tables. A graph (mathematical lingo for a network) is a flexible data structure that allows a more agile and rapid style of development. You can think of Neo4j as a high-performance graph engine with all the features of a mature and robust database. The programmer works with an object-oriented, flexible network structure rather than with strict and static tables — yet enjoys all the benefits of a fully transactional, enterprise-strength database.
  2. Hypergraphdb is a general purpose, extensible, portable, distributed, embeddable, open-source data storage mechanism. It is a graph database designed specifically for artificial intelligence and semantic web projects, it can also be used as an embedded object-oriented database for projects of all sizes.
  3. Infogrid is an Internet Graph Database with a many additional software components that make the development of REST-ful web applications on a graph foundation easy.
  4. DEX is a high performance library to manage very large graphs or networks. DEX is available for personal use in this web page. Take some time to play with it and develop your applications to manage the network like data structures that represent your network of friends, or your network of information.
  5. Gremlin – Gremlin is a graph-based programming language. The documentation herein will provide all the information necessary to understand how to use Gremlin for graph query, analysis, and manipulation.

Google patents Map reduce “System and method for efficient large-scale data processing”

After filing in 2004, google finally got its patent on “System and method for efficient large-scale data processing”  approved  yesterday.

Gigaom pointed out that if Google really wants to enforce it, it would have to go after many different vendors who are implementing “mapreduce” in some form in their applications and databases.

Google’s intentions of how to use it are not clear, but this is what one of the spokesperson  said.

Like other responsible, innovative companies, Google files patent applications on a variety of technologies it develops. While we do not comment about the use of this or any part of our portfolio, we feel that our behavior to date has been inline with our corporate values and priorities.