Thursday, April 28, 2011

MapReduce & Hive - An Overview

I touched upon MapReduce in my previous post "HDFS Overview". In this post I would like to discuss more on MapReduce. Also I would discuss about Hive and how it can be used to implement the use case that I had talked about in the post "A Hadoop Use Case".

A MapReduce job usually splits the input data-set into independent chunks which are processed by the map tasks in a completely parallel manner. The framework sorts the outputs of the maps, which are then input to the reduce tasks. Typically both the input and the output of the job are stored in a file-system. The framework takes care of scheduling tasks, monitoring them and re-executes the failed tasks.

Typically you would keep the storage nodes and the compute nodes on the same machines. This would make the data available to the compute nodes locally and would thus increase the performance and reduce the network traffic.

Like with HDFS the MapReduce framework also works on Master/Slave architecture. We have one master node JobTracker which would manage and schecdule the jobs and multiple TaskTracker nodes which would execute the jobs.

Now on the implementation part. MapReduce framework operates exclusively on pairs. You would provide input to the job as pairs and the framework would produce output again in set of pairs.

When you execute a job using MapReduce framework, it would execute tasks to transform the input to intermediate record. This phase is termed as Mapper phase. In this phase a given input pair may map to zero or many output pairs. All intermediate values associated with a given output key are subsequently grouped by the framework, and passed to a Reducer to determine the final output. You may refer the WordCount example at this link for greater understanding of these two phases.

If we take the trade data analytics (as discussed in "A Hadoop Use Case") as a reference here. We could dump the trade files (flat files) to HDFS. Then in the Map phase read these files line by line and do some analytics on those jobs like count the trades in a region and output as . You can define the input specification for a job by creating an implementation of InputFormat, by defualt the input to the map phase is for each line in a file.

The output from the Map can be passed through a local combiner (which could be for local aggregations) and then the Reducer can finally aggregate the results from all the TaskNodes.

In order to execute a Job you need to follow the following steps:

a. Implement Mapper class by extending it from org.apache.hadoop.mapreduce.Mapper

b. Implement a Reducer class by extending it from org.apache.hadoop.mapreduce.Reducer

c. Implement a main method which would create an instance of org.apache.hadoop.mapreduce.Job and execute it. You would also need to specify the Input paths, the output paths, the mapper class, the reducer class and the input and output formats for the job.

To me the issue here is that we need to write Mapper and Reducer classes for for analytics jobs and they might be very hard to build (or hard to migrate) given that we have been using sql queries for analytics. So Hive comes to our rescue here. The Hadoop subproject Hive provides a SQL interface and relational model for Hadoop. The queries are transformed into MapReduce jobs and thus we do not have to worry about the implementations.

Using Hive you can create Databases, create tables, populate tables and query the tables as in case of any of the known RDBMS that we have been used to, though the query language and the semantics may differ a bit in case of Hive. Like in Hive you can create partitions on a field but that field is not part of data and it would be the developers responsibility to put the maintain the relationship between partition name and data content.

You can create tables in Hive like this:
CREATE TABLE trade(trade_id INT, trade_version TINYINT,
trade_date STRING, location STRING)
COMMENT 'This is trade table'
PARTITIONED BY(date STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '44' LINES TERMINATED BY '12'
STORED AS TEXTFILE;

You can load data from a file in local file systems directly into Hive table like this:
LOAD DATA LOCAL INPATH `/tmp/trade_2008-06-08_us.txt` INTO TABLE trade PARTITION(date='2011-04-28')


You can then query the table like this:
INSERT OVERWRITE TABLE trades_by_loc
SELECT trade.*
FROM trade
GROUP BY trade.location;

Note that unlike SQL, we always insert the results into a table.

Hope this would give you some insight into how we can use Hadoop to build an application that could do analytics on huge amount of data.

Wednesday, April 27, 2011

HDFS Overview

This is for my friends who got interested in Hadoop reading my previous post "A Hadoop Use Case". In this blog I will try to briefly touch upon the two major sub projects in Apache Hadoop - HDFS and MapReduce and have thus split it into two posts. This one would talk about HDFS mainly. Most of the information here has been picked up from the official site of hadoop.

Apache Hadoop project develops open-source software for distributed computing that are scalable and reliable. It includes sub-projects

a. Hadoop Distributed File System (HDFS): A distributed file system that provides high throughput access to application data

b. MapReduce: A software framework for distributed processing of large data sets on compute clusters.

HDFS is the primary storage system used by Hadoop applications. HDFS is distributed file system designed to run on commodity servers. It is designed for applications that have large data sets (hunders of gigabytes or even peta bytes) and require fast access to application data. HDFS supports write-once-read-many semantics on files.

HDFS works on the master/slave architecture. A HDFS cluster consists of NameNode which is the coordinator node and multiple DataNodes.

NameNode manages the file system namespace and regulates access to the files by clients. The DataNodes manage the storage attached to the nodes that they run on. The NameNode and DataNode are pieces of software designed to run on commodity machines. These machines typically run a GNU/Linux operating system (OS). HDFS is built using the Java language; any machine that supports Java can run the NameNode or the DataNode software.

HDFS provides a commandline interface called FS shell that lets a user interact with the data in HDFS. To copy a local file to HDFS from command line you can use the command below:
hdfs dfs -copyFromLocal URI
For more command line options refer this link.
HDFS also provides java apis for applications to use. Sample java code to copy a file from local file system to HDFS is shown below:
Configuration config = new Configuration();
FileSystem hdfs = FileSystem.get(config);
Path srcPath = new Path(srcFile);
Path dstPath = new Path(dstFile);
hdfs.copyFromLocalFile(srcPath, dstPath);
When you copy a file to HDFS it is split into one or more blocks (based on the configured block size) and these blocks are stored on multiple DataNodes.The NameNode would manage the mapping of blocks to DataNodes. These blocks are also replicated for fault tolerance. The replication factor again is configurable.

Below is the HDFS Architecture diagram from the HDFS Architecture Guide


HDFS Limitations
a. HDFS has one NameNode for each cluster, so the total memory available on NameNode is the primary scalability limitation. On very large clusters, increasing average size of files stored in HDFS helps with increasing cluster size without increasing memory requirements on NameNode.
b. Currently NameNode is the single point of failure, so if the NameNode goes down the cluster is unavailable, but yes we will not loose any data as NameNode persists all the information on disk and can start up again with the latest information. To prevent data loss due to disk failures we can run secondary nodes or save the copies of the fsimage and audit log on multiple disks.

To start the hadoop cluster refer this link.

Tuesday, April 26, 2011

A Hadoop Use Case

I have started learning Hadoop recently and to make the concepts sink in I tried relating the stuff to my previous experience.

We had a system that would load trades (nothing but a contract between two parties for buying or selling some assets) and do some processing on those trades. Miliions of trades would be loaded into the system on a daily basis. We had a requirement to keep 30 days worth of trade data into the system to do some analytical processing and then create some scheduled reports for the business users. We were storing all the 30 days data in a database and faced numerous issues in terms of performance and scalabilty. We finally invested a lot on the database racks, hard ware and software and made it work with some changes in requirements etc

Now when I look at this requirement, I feel had I known about Hadoop then I would have certainly used Hadoop to solve this problem with some commodity servers - with much lesser investment and it would have been a much elegant solution.

How does Hadoop help in this use case?
Hadoop provides a distributed file system (HDFS) that can be used to store huge data running into tera bytes or peta bytes. Scalability, portability and reliability are inherent in its architecture.

Hadoop also provides MapReduce which is a programming model for writing applications that rapidly process vast amounts of data in parallel on large clusters of compute nodes.

So now we can store the trades in the HDFS and use the Map Reduce to run the analytics job on the data that is stored in the HDFS.

How does this architecture help?
a. Seperation of concerns - this way we can design our database for OLTP keep it normailized, and have the OLAP components based on Hadoop cluster.
b. Improve Performance and Scalability - As the database will now be used for transactional data and the data stored in the db would be 1/30 performance would be much better and thus our application could scale better with same set of h/w and s/w.
c. Better Anlytics - We can now analyse more historical data as HDFS and Map Reduce are capable of handling large scale data running into Petabytes.
d. Save Cost - We can save cost not just on the software licensing but also on the hardware as Hadoop cluster can run on commodity servers.

These are some of the advantages that I can think of right away.