Thursday, April 28, 2011

MapReduce & Hive - An Overview

I touched upon MapReduce in my previous post "HDFS Overview". In this post I would like to discuss more on MapReduce. Also I would discuss about Hive and how it can be used to implement the use case that I had talked about in the post "A Hadoop Use Case".

A MapReduce job usually splits the input data-set into independent chunks which are processed by the map tasks in a completely parallel manner. The framework sorts the outputs of the maps, which are then input to the reduce tasks. Typically both the input and the output of the job are stored in a file-system. The framework takes care of scheduling tasks, monitoring them and re-executes the failed tasks.

Typically you would keep the storage nodes and the compute nodes on the same machines. This would make the data available to the compute nodes locally and would thus increase the performance and reduce the network traffic.

Like with HDFS the MapReduce framework also works on Master/Slave architecture. We have one master node JobTracker which would manage and schecdule the jobs and multiple TaskTracker nodes which would execute the jobs.

Now on the implementation part. MapReduce framework operates exclusively on pairs. You would provide input to the job as pairs and the framework would produce output again in set of pairs.

When you execute a job using MapReduce framework, it would execute tasks to transform the input to intermediate record. This phase is termed as Mapper phase. In this phase a given input pair may map to zero or many output pairs. All intermediate values associated with a given output key are subsequently grouped by the framework, and passed to a Reducer to determine the final output. You may refer the WordCount example at this link for greater understanding of these two phases.

If we take the trade data analytics (as discussed in "A Hadoop Use Case") as a reference here. We could dump the trade files (flat files) to HDFS. Then in the Map phase read these files line by line and do some analytics on those jobs like count the trades in a region and output as . You can define the input specification for a job by creating an implementation of InputFormat, by defualt the input to the map phase is for each line in a file.

The output from the Map can be passed through a local combiner (which could be for local aggregations) and then the Reducer can finally aggregate the results from all the TaskNodes.

In order to execute a Job you need to follow the following steps:

a. Implement Mapper class by extending it from org.apache.hadoop.mapreduce.Mapper

b. Implement a Reducer class by extending it from org.apache.hadoop.mapreduce.Reducer

c. Implement a main method which would create an instance of org.apache.hadoop.mapreduce.Job and execute it. You would also need to specify the Input paths, the output paths, the mapper class, the reducer class and the input and output formats for the job.

To me the issue here is that we need to write Mapper and Reducer classes for for analytics jobs and they might be very hard to build (or hard to migrate) given that we have been using sql queries for analytics. So Hive comes to our rescue here. The Hadoop subproject Hive provides a SQL interface and relational model for Hadoop. The queries are transformed into MapReduce jobs and thus we do not have to worry about the implementations.

Using Hive you can create Databases, create tables, populate tables and query the tables as in case of any of the known RDBMS that we have been used to, though the query language and the semantics may differ a bit in case of Hive. Like in Hive you can create partitions on a field but that field is not part of data and it would be the developers responsibility to put the maintain the relationship between partition name and data content.

You can create tables in Hive like this:
CREATE TABLE trade(trade_id INT, trade_version TINYINT,
trade_date STRING, location STRING)
COMMENT 'This is trade table'
PARTITIONED BY(date STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '44' LINES TERMINATED BY '12'
STORED AS TEXTFILE;

You can load data from a file in local file systems directly into Hive table like this:
LOAD DATA LOCAL INPATH `/tmp/trade_2008-06-08_us.txt` INTO TABLE trade PARTITION(date='2011-04-28')


You can then query the table like this:
INSERT OVERWRITE TABLE trades_by_loc
SELECT trade.*
FROM trade
GROUP BY trade.location;

Note that unlike SQL, we always insert the results into a table.

Hope this would give you some insight into how we can use Hadoop to build an application that could do analytics on huge amount of data.

1 comment: