Sawzall and the PIG

When I heard interesting uses cases of how “Sawzall” is used to hack huge amounts of log data within Google I was thinking about two things.

  • Apache PIG, which is “a platform for analyzing large data sets that consists of a high-level language Pig for expressing data analysis programs, coupled with infrastructure for evaluating these programs. The salient property of Pig programs is that their structure is amenable to substantial parallelization, which in turns enables them to handle very large data sets.”
  • CEP (Complex event processing) – consists in processing many events happening across all the layers of an organization, identifying the most meaningful events within the event cloud, analyzing their impact, and taking subsequent action in real time. [ Also look at esper ]

Google has opened parts of this framework in a project called “Szl”

Sawzall is a procedural language developed for parallel analysis of very large data sets (such as logs). It provides protocol buffer handling, regular expression support, string and array manipulation, associative arrays (maps), structured data (tuples), data fingerprinting (64-bit hash values), time values, various utility operations and the usual library functions operating on floating-point and string values. For years Sawzall has been Google’s logs processing language of choice and is used for various other data analysis tasks across the company.

Instead of specifying how to process the entire data set, a Sawzall program describes the processing steps for a single data record independent of others. It also provides statements for emitting extracted intermediate results to predefined containers that aggregate data over all records. The separation between per-record processing and aggregation enables parallelization. Multiple records can be processed in parallel by different runs of the same program, possibly distributed across many machines. The language does not specify a particular implementation for aggregation, but a number of aggregators are supplied. Aggregation within a single execution is automatic. Aggregation of results from multiple executions is not automatic but an example program is supplied.

Here is a quick example of how it could be used…

  topwords: table top(3) of word: string weight count: int;
   fields: array of bytes = splitcsvline(input);
   w: string = string(fields[0]);
   c: int = int(string(fields[1]), 10);
   if (c != 0) {
   emit topwords <- w weight c;
   }

Given the input:

  abc,1
   def,2
   ghi,3
   def,4
   jkl,5

The program (using –table_output) emits:

  topwords[] = def, 6, 0
   topwords[] = jkl, 5, 0
   topwords[] = ghi, 3, 0

Real-Time MapReduce using S4

While trying to figure out how to do real-time log analysis in my own organization I realized that most map-reduce frameworks are designed to run as batch jobs in time delays manner rather than be instantaneous like a SQL query to a Mysql DB. There are some frameworks which are bucking the trend. Yahoo! Lab! recently announced that their “Advertising Sciences” group has built a general purpose, real-time, distributed, fault-tolerant, scalable, event driven, expandable platform called “S4” which allows programmers to easily implement applications for processing continuous unbounded streams of data.

S4 clusters are built using low-cost commoditized hardware, and leverage many technologies from Yahoo!’s Hadoop project. S4 is written in Java and uses the Spring Framework to build a software component architecture. Over a dozen pluggable modules have been created so far.

Why do we need a real-time map-reduce framework?
Applications such as personalization, user feedback, malicious traffic detection, and real-time search require both very fast response and scalability. In S4 we abstract the input data as streams of key-value pairs that arrive asynchronously and are dispatched intelligently to processing nodes that produce data sets of output key-value pairs. In search, for example, the output data sets are made available to the serving system before a user executes her next search query. We use this rapid feedback to adapt the search models based on user intent

Read more: Original post from Yahoo! Labs

Spanner: Google’s next Massive Storage and Computation infrastructure

MapReduce, Bigtable and Pregel have their origins in Google and they all deal with “large systems”. But all of them may be dwarfed in size and complexity by a new project Google is working on, which was mentioned briefly (may be un-intentionally) at an event last year.

Instead of caching data closer to user, it looks like Google is trying to take “the data” to the user. If you use GMail or a Google Doc service, then with this framework, Google could, auto-magically, “move” one of the master copies of your data to the nearest Google data center without really having to cache anything locally. And because they are building one single datastore cluster around the world, instead of building hundreds of smaller ones for different applications, it looks like they may not don’t need dedicated clusters for specific projects anymore.

Below is the gist of “Spanner” from a talk by Jeff Dean at Symposium held at Cornell. Take a look at the rest of the slides if you are interested in some impressive statistics on hardware performance and reliability.

  • Spanner: Storage & computation system that spans all our datacenters
    • Single global namespace
      • Names are independent of location(s) of data
      • Similarities to Bigtable: table, families, locality groups, coprocessors,…
      • Differences: hierarchical directories instead of rows, fine-grained replication
      • Fine-grained ACLs, replication configuration at the per-directory level
    • support mix of strong and weak consistency across datacenters
      • Strong consistency implemented with Paxos across tablet replicas
      • Full support for distributed transactions across directories/machines
    • much more automated operation
      • System automatically moves and adds replicas of data and computation based on constraints and usage patterns
      • Automated allocation of resources across entire fleet of machines.

image

 
References

Pregel: Google’s other data-processing infrastructure

Inside Google, MapReduce is used for 80% of all the data processing needs. That includes indexing web content, running the clustering engine for Google News, generating reports for popular queries (Google Trends), processing satellite imagery , language model processing for statistical machine translation and  even mundane tasks like data backup and restore.

The other 20% is handled by a lesser known infrastructure called “Pregel” which is optimized to mine artrelationships from “graphs”.

According to wikipedia a “graph” is a collection of vertices or ‘nodes’ and a collection of ‘edges’ that connect pair of ‘nodes’.  Depending on the requirements, a ‘graph’ can be undirected which means there is no distinction between the two ‘nodes’ in the graph, or it could be directed from one ‘node’ to another.

While you can calculate something like ‘pagerank’ with MapReduce very quickly, you need more complex algorithms to mine some other kinds of relationships between pages or other sets of data.

 

Despite differences in structure and origin, many graphs out there have two things in common: each of them keeps growing in size, and there is a seemingly endless number of facts and details people would like to know about each one. Take, for example, geographic locations. A relatively simple analysis of a standard map (a graph!) can provide the shortest route between two cities. But progressively more sophisticated analysis could be applied to richer information such as speed limits, expected traffic jams, roadworks and even weather conditions. In addition to the shortest route, measured as sheer distance, you could learn about the most scenic route, or the most fuel-efficient one, or the one which has the most rest areas. All these options, and more, can all be extracted from the graph and made useful — provided you have the right tools and inputs. The web graph is similar. The web contains billions of documents, and that number increases daily. To help you find what you need from that vast amount of information, Google extracts more than 200 signals from the web graph, ranging from the language of a webpage to the number and quality of other pages pointing to it.

In order to achieve that, we have created scalable infrastructure, named Pregel, to mine a wide range of graphs. In Pregel, programs are expressed as a sequence of iterations. In each iteration, a vertex can, independently of other vertices, receive messages sent to it in the previous iteration, send messages to other vertices, modify its own and its outgoing edges’ states, and mutate the graph’s topology (experts in parallel processing will recognize that the Bulk Synchronous Parallel Model inspired Pregel).

Currently, Pregel scales to billions of vertices and edges, but this limit will keep expanding. Pregel’s applicability is harder to quantify, but so far we haven’t come across a type of graph or a practical graph computing problem which is not solvable with Pregel. It computes over large graphs much faster than alternatives, and the application programming interface is easy to use. Implementing PageRank, for example, takes only about 15 lines of code. Developers of dozens of Pregel applications within Google have found that "thinking like a vertex," which is the essence of programming in Pregel, is intuitive.

This is from the Paper

We conducted an experiment solving single-source shortest paths using a simple Belman-Ford algorithm expressed in Pregel. As input data, we chose a randomly generated graph with a log-normal distribution of outdegrees; the graph had 1B vertices and approximately 80B edges. Weights of all edges were set to 1 for simplicity. We used a cluster of 480 commodity multi-core PCs and 2000 workers. The graph was partitioned 8000 ways using the default partitioning function based on a random hash, to get a sense of the default performance of the Pregel system. Each worker was assigned 4 graph partitions and allowed up to 2 computational threads to execute the Compute() function over the partitions (not counting system threads that handle messaging, etc.). Running shortest paths took under 200 seconds, using 8 supersteps.

 

A talk is scheduled on Pregel at SIGMOD 2010

491 Pregel: A System for Large-Scale Graph Processing
Greg Malewicz, Google, Inc.; Matthew Austern, Google, Inc.; Aart Bik, Google, Inc.; James Dehnert, Google, Inc.; Ilan Horn, Google, Inc.; Naty Leiser, Google, Inc.; Grzegorz Czajkowski, Google, Inc.

 

References
  1. Pregel: A system for large-scale graph processing graph
  2. Inference Anatomy of the Google Pregel
  3. Large-scale graph computing at Google
  4. Mapreduce sigmetrics 09
  5. Bulk synchronous parallel
  6. Hamburg, Graph processing on Hadoop
  7. Bulk synchronous parallel
  8. 12 Reasons to learn graph theory
Related databases and products
  1. Neo4j is an embedded, disk-based, fully transactional Java persistence engine that stores data structured in graphs rather than in tables. A graph (mathematical lingo for a network) is a flexible data structure that allows a more agile and rapid style of development. You can think of Neo4j as a high-performance graph engine with all the features of a mature and robust database. The programmer works with an object-oriented, flexible network structure rather than with strict and static tables — yet enjoys all the benefits of a fully transactional, enterprise-strength database.
  2. Hypergraphdb is a general purpose, extensible, portable, distributed, embeddable, open-source data storage mechanism. It is a graph database designed specifically for artificial intelligence and semantic web projects, it can also be used as an embedded object-oriented database for projects of all sizes.
  3. Infogrid is an Internet Graph Database with a many additional software components that make the development of REST-ful web applications on a graph foundation easy.
  4. DEX is a high performance library to manage very large graphs or networks. DEX is available for personal use in this web page. Take some time to play with it and develop your applications to manage the network like data structures that represent your network of friends, or your network of information.
  5. Gremlin – Gremlin is a graph-based programming language. The documentation herein will provide all the information necessary to understand how to use Gremlin for graph query, analysis, and manipulation.

Hive @Facebook

Hive is a data warehouse infrastructure built over Hadoop. It provides tools to enable easy data ETL, a mechanism to put structures on the data, and the capability to querying and analysis of large data sets stored in Hadoop files. Hive defines a simple SQL-like query language, called QL, that enables users familiar with SQL to query the data. At the same time, this language also allows programmers who are familiar with the MapReduce fromwork to be able to plug in their custom mappers and reducers to perform more sophisticated analysis that may not be supported by the built-in capabilities of the language.

At a user group meeting, Ashish Thusoo from Facebook data team, spoke about how Facebook uses Hive for their data processing needs.

Problem

Facebook is a free service and has been experiencing rapid growth in last few years. The amount of data it collects, which used to be around 200GB per day in March 2008, has now grown to 15TB per day today.  Facebook realized early on that insights derived from simple algorithms on more data is better than insights from complex algorithm on smaller set of data.

But the traditional approach towards ETL on proprietary storage systems was not only getting expensive to maintain, it was also limited in the size it could scale to. This is when they started experimenting with Hadoop.

How Hadoop gave birth to Hive

Hadoop turned out to be superior in availability, scalability and manageability. Its efficiency wasn’t that great, but one could get more throughput by throwing more cheap hardware at it. Ashish pointed out that though at that point partial availability, resilience and scale was more important than ACID they had a hard time finding Hadoop programmers within Facebook to make use of the cluster.

It was this, that eventually forced Facebook, to build a new way of querying data from Hadoop which doesn’t require writing map-reduce jobs in java. That quickly lead to the development of hive, which does exactly what it was set out to do. Lets look at a couple of examples of hive queries.

  hive> FROM invites a INSERT OVERWRITE TABLE events SELECT a.bar, count(1) WHERE a.foo > 0 GROUP BY a.bar;
  hive> INSERT OVERWRITE TABLE events SELECT a.bar, count(1) FROM invites a WHERE a.foo > 0 GROUP BY a.bar;

Hive’s long term goal was to develop a system for managing and querying structured data built on top of Hadoop. To do that it used map-reduce mechanisms for execution and used HDFS for storage. They modeled the language on SQL, designed it to be extensible, interoperable and be able to out perform traditional processing mechanisms.

How it is usedimage

Facebook has a production Hive cluster which is primarily used for log summarization, including aggregation of impressions, click counts and statistics around user engagement. They have a separate cluster for “Ad hoc analysis” which is free for all/most Facebook employees to use. And over time they figured out how to use it for spam detection, ad optimization and a host of other undocumented stuff.

Facebook Hive/Hadoop statistics

The scribe/Hadoop cluster at Facebook has about 50 nodes in the cluster today and processes about 25TB of raw data. About 99% of its data is available for use within 20 seconds. The Hive/Hadoop cluster where most of the data processing happens has about 8400 cores with roughly about 12.5 PB of raw storage which translates to 4PB of usable storage after replication. Each node in the cluster is a 8 core server with 12TB of storage each.

All in all, Facebook gets 12 TB of compressed new data and scans about 135 TB of compressed data per day. There are more than 7500 Hive jobs which use up about 80000 computer hours each day.

References

Google patents Map reduce “System and method for efficient large-scale data processing”

After filing in 2004, google finally got its patent on “System and method for efficient large-scale data processing”  approved  yesterday.

Gigaom pointed out that if Google really wants to enforce it, it would have to go after many different vendors who are implementing “mapreduce” in some form in their applications and databases.

Google’s intentions of how to use it are not clear, but this is what one of the spokesperson  said.

Like other responsible, innovative companies, Google files patent applications on a variety of technologies it develops. While we do not comment about the use of this or any part of our portfolio, we feel that our behavior to date has been inline with our corporate values and priorities.

Scaling Powerset using Amazon’s EC2 and S3

The first thing most doc-com companies do before going public is setup an infrastructure to provide the service. And though it might sound straight forward to most of you, it can be a very expensive affair. To come up with the right kind of infrastructure for any new service a few key architectural decisions have to be made.

  1. Design the infrastructure and architecture to handle the traffic peaks (not average)
  2. Design with long term scaling in mind
  3. Design power and cooling infrastructure to support the servers
  4. Hire Hardware+Systems+network support staff ( more if 24/7 operations are required)
  5. Add buffers to support failures and short term grow requirements
  6. Take into account lead times of ordering and procuring hardware (which can be weeks if not months)..
  7. And a few others.. which I won’t bore you with here.

The point is that the initial capital investment can be in millions even before the first customer starts using the service. And once the capital investment is made, it is very difficult to scale down the operations if the plans change.
Powerset Inc is a secretive search startup with ambitions of out-smarting Google in its own turf. Based in San Francisco, this search startup is working on building a better search engine using natural language processing capabilities to understand the users question a little better before answering it. And just like any other search company its technology is a CPU hungry beast just waiting to be unleashed. Powerset could have gone the way most dot-com companies have gone, but instead they decided to try out Amazon’s EC2 (Elastic Cloud Computing) and S3(Simple Storage Service) to augment their computational needs.

Powerset has been repoted to be testing a 400 server instance EC2 cluster with Hadoop running Map/Reduce and HDFS (Hadoop Distributed File system). This does get a little tricky on EC2 because of absence of persistent storage (OS is re-initialized after every reboot). So they use a combination of HDFS and remote copying process to sync the data to their local network. Since there is no charge to move data from EC2 to S3, they have been thinking about implementing a native HDFS and S3 interface to move data around within Amazon’s network itself.

EC2 is charged on a per instance per hour usage basis, which means Powerset can bring new nodes online during heavy demand and shut off unused nodes at a flick of a switch at night. Powerset guys also built their own EC2 image configured to automatically join the HDFS cluster after every boot up. In an event of a node failure, Hadoop can take care of data replication, and EC2 takes care of replacing the failed node with a new one.

Amazon EC2 costs 10 cents an hour per instance. If you have to run a 400 node cluster for 1 month thats only about 30000. Based on the performance benchmarks, it looks like the actual CPU throughput from each of the EC2 instance is roughly equivalent of 1Ghz PIII. 72 dollars a month for that kind of server is not too cheap, but just like car leasing, atleast u don’t have to pay upfront and manage it.

So lets do the math. A regular AMD 64bit dual core, 2 cpu server with about 8GB of ram costs about 10000 USD which excluds the cost of hosting, power, cooling and maintainance. Based on some comments on Amazon forum this is about 2 to 3 times faster than the EC2’s infrastructure.If you had to replace the CPU computation power of this new hardware with 8 to 12 server instances on EC2, you would have spent about 700 to 800 dollars a month. It will take a company using EC2 infrastructure close to 12 months before they would have to pay 10000 towards EC2 computational services for the same amount of computation power. And remember that 10000 amount didn’t take into account colocation, power, cooling and general server administration which can be significant as well. Also remember that 12 months is actually 12 months of actual computational usage.. which could over a period of 2 to 4 years depending on how often the instances are used.

However, I also have to point out that there are a few things to look out for. The maximum physical memory available is about 1.7GB which is relatively tiny if you are used to 8 to 16 GB of ram on a 64 bit hardware. And though CPU/Memory might scale horizontaly for some applications, cross-server communication can be extreemly expensive for some. Unless your application is designed to scale horizontally with under 1.7 GB of ram, I would seriously advice you against using EC2 until you figure out how to change that.

I’ve blogged about both S3 and EC2 before and it continues to facinate me. Success of companies like Slideshare.net, and the decision of companies like Powerset to use AWS is something which I’ll watch closely over time.

References
Links about Hadoop, and how to use it on Amazon EC2/S3

Other References about Powerset and Amazon

Hadoop and HBase

This may not be a surprise for a lot of people but it was for me. Even though I have been using lucene and nutch for some time, I didn’t really know enough about Hadoop and HBase until recently.

Hadoop

  • Scalable: Hadoop can reliably store and process petabytes.
  • Economical: It distributes the data and processing across clusters of commonly available computers. These clusters can number into the thousands of nodes.
  • Efficient: By distributing the data, Hadoop can process it in parallel on the nodes where the data is located. This makes it extremely rapid.
  • Reliable: Hadoop automatically maintains multiple copies of data and automatically redeploys computing tasks based on failures.


Hadoop implements MapReduce, using the Hadoop Distributed File System (HDFS) (see figure below.) MapReduce divides applications into many small blocks of work. HDFS creates multiple replicas of data blocks for reliability, placing them on compute nodes around the cluster. MapReduce can then process the data where it is located.

HBase

Google’s Bigtable, a distributed storage system for structured data, is a very effective mechanism for storing very large amounts of data in a distributed environment.

Just as Bigtable leverages the distributed data storage provided by the Google File System, Hbase will provide Bigtable-like capabilities on top of Hadoop.

Data is organized into tables, rows and columns, but a query language like SQL is not supported. Instead, an Iterator-like interface is available for scanning through a row range (and of course there is an ability to retrieve a column value for a specific key).

Any particular column may have multiple values for the same row key. A secondary key can be provided to select a particular value or an Iterator can be set up to scan through the key-value pairs for that column given a specific row key.