MapReduce is a "programming model/software framework" designed to process large amount of data in parallel by dividing the job into a number of independent data local tasks. The term data locality is one of the most important concepts of HDFS/MapReduce, since it helps in drastically reducing the network usage. Data locality means "bringing the compute to data" or moving the algorithm to the datanodes for data processing. It is very cost effective, rather than moving data to the algorithm which is generally found in traditional HPC clusters.
Components of Hadoop MapReduce
1. Client: Client acts as a user interface for submitting jobs and collects various status information.
2. Jobtracker: Jobtracker is responsible for scheduling jobs, dividing job into map and reduce tasks to datanodes, task failure recovery, monitoring jobs and tracking job status.
3. Tasktracker: Tasktracker runs map and reduce tasks and manages intermediate outputs.
MapReduce Job Life Cycle
IntroductionGenerally a MapReduce program executes in three stages, namely map stage, shuffle stage and reduce stage. The first phase of MapReduce is called mapping. A MapReduce job is submitted to the jobtracker by the user sitting on a client machine. This MapReduce job contains the job configuration which specifies map, combine and reduce functions. It also contains the job location information about the input splits and output directory path.
The InputFormat class calls the getSplits() function to compute the input splits. These MapReduce input splits come from the input files loaded by the user into the HDFS. An ideal input split size should be one filesystem block size. These input splits' information are then retrieved by the jobscheduler and selects the input file from HDFS for map function with the help of InputFormat class.
The tasktrackers on datanodes periodically communicate with the jobtracker using heartbeat signals to convey their availability status. The jobscheduler uses the key features like data locality and rack-awareness, and lets the jobtracker assign map tasks to the nearest available tasktrackers through their heartbeat signal return value. In case if a datanode fails, it assigns the tasks to another nearest datanode that has replicated input split. This intelligent placement of data blocks and processing them according to the availability and proximity of datanodes/tasktrackers is achieved by the Hadoop's own technologies - Data Locality and Rack Awareness, making HDFS/MapReduce very unique in it's own kind.
The map tasks run on their respective tasktrackers and datanodes assigned to them. The outputs from these map tasks are written to the local disks. Further sort and shuffle are performed on the ouput data in order to transfer the map outputs to the respective reducers as input. This is known as Shuffle/Sort stage. In this phase the intermediate key/value pairs are exchanged between datanodes so that all values with the same key are sent to a single reducer.
In reduce phase, the shuffled/sorted output is provided as input to the reduce tasks. The reduce function is invoked on each key to produce a more sorted output. Finally the output from each reducer is written to a separate file with prefix name "part-00000" into the HDFS. No two map and reduce tasks communicate with each other. In a MapReduce program 20% of the work is done by the mappers in map phase, whereas other 80% of the work is done by the reducers in reduce phase.
1. The client prepares the job for submission and hands it off to the jobtracker.
2. Jobtracker schedules the job and tasktrackers are assigned map tasks.
3. Each tasktracker runs map tasks and updates the progress status of the tasks to the jobtracker periodically.
4. Jobtracker assigns reduce tasks to the tasktrackers as soon as the map outputs are available.
5. The tasktracker runs reduce tasks and updates the progress status of the tasks to the jobtracker periodically.
Stages of MapReduce Job Life Cycle
Job Submission1. The user submits a job to the client.
2. Client checks the output specifications of the job. If the output directory path is not specified or if the output directory already exists, then it will throw an error to the MapReduce program.
3. Client computes the input splits. If the input directory path is not specified then it will throw an error to the MapReduce program.
4. Client copies the job.jar, job.xml and input split information into the HDFS. The job.jar file is copied with a default replication factor of 10 so that there are ample number of copies for the tasktrackers to access. It can be controlled by the property mapred.submit.replication property.
5. Client tells the jobtracker that the job is ready to be submitted for execution.
1. Jobtracker takes the job and puts it into an internal queue from where the jobscheduler will pick it up.
2. Jobscheduler retrieves the input splits from the HDFS which the client had computed earlier.
3. Jobscheduler assigns a map task for each input split. The number of reduce tasks is controlled by the mapred.reduce.tasks property.
4. The tasks are given task ids at this point.
1. Before assigning a task to the tasktracker, the jobtracker must first choose a job to select a task from.
2. The tasktracker communicates with the jobtracker by periodically sending a heartbeat signal to the jobtracker, to tell the jobtracker that the tasktracker is alive and it's availability for the new job. If the tasktracker is available, then the jobtracker assigns the tasktracker a new task through the heartbeat signal return value.
3. The tasktracker has a fixed number of map and reduce slots. The map slots are filled before the reduce slots.
4. For each map task, the jobscheduler takes into account the network location of the tasktracker and picks a map task that is closest to the input split.
1. Tasktracker copies the job.jar configuration file into the tasktracker's local filesystem.
2. Tasktracker creates a new directory and unjars the job.jar file's content into it.
3. Tasktracker runs an instance called taskrunner to run the task.
4. Taskrunner runs the task inside a jvm so that the buggy user defined variables do not affect the tasktracker.
5. The child process communicates with the parent process periodically to report the status of the job task.
Job Completion & Progress Updates
1. As map tasks complete successfully, they notify their parent tasktracker of the status update which in turn notifies the jobtracker.
2. These notifications are transmitted over the heartbeat communication mechanism. These statuses change over the course of the job.
3. Mappers and reducers on child jvm report to the tasktracker periodically and set a flag to report a task status change.
4. When the jobtracker receives a notification that the last task of the job is completed, it changes the status of the job to "successful".
5. Finally, jobtracker combines all the updates from tasktrackers to provide a global view of job progress status.
1. The jobtracker cleans up it's working state for the job only after confirming that all the reduce tasks are completed successfully and instructs the tasktrackers to do the same.
2. The cleanup activity involves the deletion of intermediate output and other such cleaning/deletion tasks are performed.
Note: The jobtracker is alone responsible for scheduling jobs, dividing job into map and reduce tasks, distributing map and reduce tasks to datanodes, task failure recovery, monitoring jobs and tracking job status. Hence, jobscheduler must not be confused with a separate MapReduce daemon or identity.
MapReduce is the heart of Hadoop and Shuffle/Sort phase is one of the most expensive part of MapReduce execution where the actual "magic" happens. The process by which mappers separate out outputs for their respective reducers using sort and transfer the data to the intended reducers to be collected and grouped it by key using shuffle is known as Shuffle/Sort phase. The shuffle/sort phase begins after the first map task is completed. There may be several other map tasks still running to process their outputs on their respective datanodes, but they also start exchanging the intermediate outputs from the map tasks to be sent to the respective reducers. Hence, it is not necessary for all map tasks to complete before any reduce task can begin. In the end, grouped keys are processed by the reducers after all map outputs have been received by the reducers.
Map Phase (Preparation Phase)
In map phase, mappers run on unsorted key/value pairs. Mappers generate zero or multiple output key/value pairs for each input key/value pairs. When the map tasks start producing output, each map task writes the output to a circular memory buffer assigned to it. The default size of this circular memory buffer is 100MB and is regulated by the property io.sort.mb.
When the contents of the buffer reaches a certain threshold size, a background thread starts to divide the spilled data into partitions before writing it to the local disks. The default threshold size of circular memory buffer is 80MB and is controlled by the property io.sort.mb. The number of partitions is dependent upon the number of reducers specified. The number of reduce tasks is defined by the property mapred.reduce.tasks. Each partition contains multiple K*V* pairs. Hence, partitioner decides which reducer will get particular key/value pair.
Each partition has a set of intermediate keys that are automatically sorted by Hadoop, also known as in-memory sort key process.
It is an optional phase also known as mini-reduce phase. Combiners combine key/value pairs with the same key together on a single node. Each combiner may run zero or more times. In this phase a more sorted and compact map output is produced so that less data needs to be transferred and written to local disks. Hence, combiners work before spilling the data to the local disk. Since the reduce phase does not operate parallel tasks as is done by map phase, hence it is slow. Combiners help to optimize and speed up the job by drastically reducing the total bandwidth required by the shuffle phase. It reduces the time by performing some work that has to be performed by the reduce phase later.
Before each spill is written to disk, it is often a good idea to compress the map output so that it is written faster into the disk, consumes less disk space and reduces the amount of data to be transferred to the reducer. By default compression is not enabled. It is also an optional phase. Setting mapred.compress.map.output property to true enables compression.
The spill is written to the disk (mapred.local.dir). A new spill file will be created every time the buffer reaches the spill threshold. After the map task has generated it's last output record, there can be several spill files created by a single map task.
Before the task is finished, the spill files on the local disk are merged into a single partition. Property io.sort.factor controls the maximum number of spill records that can be merged at once.
Note: The map outputs will continue to be written to the buffer while the spill takes place, but if the buffer fills up during this time, the map will block the outputs until the spill is complete.
Reduce Side (Calculation Phase)Shuffle Phase / Copy Phase
A thread in the reducer periodically asks the jobtracker for map output locations until it has retrieved them all. Each map task may finish at different times, but the tasktrackers require the map tasks' outputs to run reduce tasks. Hence the reduce tasks start copying the map task outputs as soon as the map task completes. And, the map outputs are not deleted by the tasktracker as soon as the first reducer has retrieved it. Since, MapReduce ensures that the input to reducers are sorted by key, hence all values of same key are always reduced together regardless of it's mapper's origin. Thus map nodes also perform shuffle so that the mapper's intermediate data is copied to their respective locations with the help of partitioners through HTTP.
The map outputs are copied into the tasktracker's memory if they are small enough. This memory/buffer size is controlled by mapred.job.shuffle.input.buffer.percent property. Else, they are copied to the disk. The map outputs that were compressed in the map side are decompressed so that they can be merged in the later stages. When the buffer memory reaches a threshold size (mapred.job.shuffle.merge.percent) or reaches a threshold number of map outputs (mapred.inmem.merge.threshold), the map outputs are merged and spilled to the disk.
The copied spills are merged into a single sorted set of key/value pair. MapReduce does not believe in larger buffer sizes and hence, it concentrates more on smaller disk spills and parallelizing spilling/fetching in order to obtain better reduce times.
Finally the sorted and merged files are feeded into the reduce functions to get the final output which is written directly to the HDFS. The first block of replicas is written to the local disk.
MapReduce v1 had a single jobtracker to manage all the tasktrackers and the whole queue of jobs which later proved out to be a bottleneck. An inherent delay and latency was discovered in job submission process which led towards the development of alternate solutions like Facebook's Corona and Yahoo's YARN (Yet Another Resource Negotiator).
Note: I have also prepared a brief overview of the above article here. Please share your views and thoughts in the comments section below. Anything that I might have missed here or any suggestions are all welcome.
Thanks for reading.
Sources: hadoop.apache.org, Hadoop - The Definitive Guide
Tags: Apache Hadoop Cloudera Hadoop MapReduceV1 MRV1 Overview Data flow Mechanism in MapReduce Data Processing in MapReduce Flow of Data in MapReduce Internal Flow of MapReduce MapReduce Data Processing MapReduce Model of Data Processing MapReduce Working MapReduce Anatomy Lifecycle of MapReduce Job MapReduce Job Working