In this blog series, we will discuss the Hadoop ecosystem (with a focus on Hortonworks distribution of Hadoop for Windows Server and HDInsight Service for Azure) - core technologies such as HDFS and MapReduce, Pig (containing a data flow language to support writing queries and dataset transformation on large datasets using richer data structures than MapReduce), Hive (a SQL abstraction on top of MapReduce for more structured data), Sqoop (a tool to transfers bulk data between Hadoop and relational databases), Mahout (an open source machine-learning library facilitating building scalable matching learning software), and Pegasus (a peta-scale graph or data mining system running on Hadoop).
Problems with conventional database system
In a previous blog article we mentioned that processing big data exceeds the capacity of conventional database systems. While a large number of CPU cores can be placed in a single server, it's not feasible to deliver input data (especially big data) to these cores fast enough for processing. Using hard drives that can individually sustain read speeds of approx. 100 MB/s, and 4 independent I/O channels, a 4 TB data set would take over 2 days to read. Thus a distributed system with many servers working in problem is necessary in the big data domain.
Solution: Apache Hadoop Framework
The Apache Hadoop framework supports distributed processing of large data sets using a cluster of commodity hardware that can scale up to thousands of machines. Each node in the cluster offers local computation and storage and is assumed to be prone to failures. It's designed to detect and handle failures at the application layer, and therefore transparently delivers a highly-available service without the need for expensive hardware or complex programming. Performing distributed computing on large volumes of data has been done before, what sets Hadoop apart is its simplified programming model for client applications and seamless handling of distribution of data and work across the cluster.
Architecture of Hadoop
Let's begin by looking the basic architecture of Hadoop. A typical Hadoop multi-machine cluster consists of one or two "master" nodes (running NameNode and JobTracker processes), and many "slave" or "worker" nodes (running TaskTracker and DataNode processes) spread across many racks. Two main components of the Hadoop framework are described below - a distributed file system to store large amounts of data, and a computational paradigm called MapReduce.
Hadoop Distributed File System (HDFS)
Since the complete data set is unlikely to fit on a single computer's hard drive, a distributed file system which breaks up input data and stores it on different machines in the cluster is needed. Hadoop Distributed File System (HDFS) is a distributed and scalable file system which is included in the Hadoop framework.
It is designed to store a very large amount of information (terabytes or petabytes) reliably and is optimized for long sequential streaming reads rather than random access into the files. HDFS also provides data location awareness (such as the name of the rack or the network switch where a node is).
Reliability is achieved by replicating the data across multiple nodes in the cluster rather than traditional means such as RAID storage. The default replication value is 3, so data is stored on three nodes - two on the same rack, and one on a different rack. Thus a single machine failure does not result in any data being unavailable.
Individual machines in the cluster that store blocks of an individual files are referred to as DataNodes. DataNodes communicate with each other to rebalance data, and re-replicate it in response to system failures. The Hadoop framework schedules processes on the DataNodes that operate on the local subset of data (moving computation to data instead of the other way around), so data is read from local disk into the CPU without network transfers achieving high performance.
The metadata for the file system is stored by a single machine called the NameNode. The large block size and low amount of metadata per file allows NameNode to store all of this information in the main memory, allowing fast access to the metadata from clients.
To open a file, a client contacts the NameNode, retrieves a list of DataNodes that contain the blocks that comprise the file, and then reads the file data in bulk directly from the DataNode servers in parallel, without directly involving the NameNode. A secondary NameNode is used to avoid a single point of failure, it regularly connects to the primary NameNode and builds snapshots of the directory information.
The Windows Azure HDInsight Service supports HDFS for storing data, but also uses an alternative approach called Azure Storage Vault (ASV) which provides a seamless HDFS interface to Azure Blob Storage, a general purpose Windows Azure storage solution that does not co-locate compute with storage, but offers other benefits. In our next blog, we will explore the HDInsight service in more detail.
MapReduce Programming Model
Hadoop programs must be written to conform to the "MapReduce" programming model which is designed for processing large volumes of data in parallel by dividing the work into a set of independent tasks. The records are initially processed in isolation by tasks called Mappers, and then their output is then brought together into a second set of tasks called Reducer as shown below.
MapReduce input comes from files loaded in the processing cluster in HDFS. The client applications submit MapReduce jobs to the JobTracker node which divides and pushes work out to available TaskTracker nodes in the cluster while trying to keep the work as close to the data as possible.
Hadoop internally manages the cluster topology issues as the rack-aware HDFS file system enables the JobTracker to know which nodes contain the data, and which other machines are nearby. If the work cannot be hosted on one of the node where the data resides, priority is given to nodes in the same rack. This reduces the data moved across the network.
When the mapping phase completes, the intermediate (key, value) pairs are exchanged between machines to send all values with the same key to a single reducer. The reduce tasks are spread across
the same nodes in the cluster as the mappers. This data transfer is taken care of by the Hadoop infrastructure (guided by the different keys and their associated values) without the individual map or reduce tasks communicating or being aware of one another's existence. A heartbeat is sent from the TaskTracker to the JobTracker frequently to update its status. If any node or TaskTracker in the cluster fails or times out, that part of the job is rescheduled by the underlying Hadoop layer without any explicit action by the workers.,/p>
The TaskTracker on each node is spawned off in a separate Java Virtual Machine process to prevent the TaskTracker itself from failing if the running job crashes the JVM. User-level tasks do not communicate explicitly with one another and workers continue to operate leaving the challenging aspects of partially restarting the program to the underlying Hadoop layer. Thus Hadoop distributed system is very reliable and fault tolerant.
Hadoop also has a very flat scalability curve. A Hadoop program requires no recoding to work on a much larger data set by using a larger cluster of machines. Hadoop is designed for work that is batch-oriented rather than real-time in nature (due to the overhead involved in starting MapReduce programs), is very data-intensive, and lends itself to processing pieces of data in parallel. This includes use cases such as log or clickstream analysis, sophisticated data mining, web crawling indexing, archiving data for compliance etc.
In subsequent posts, we will look at the MapReduce programming model and other aspects of Hadoop in more detail… Coming soon!