Friday, April 22, 2016

Life Cycle of MapReduce Job

Here, I will explain behind the scenes of job execution process in Hadoop MapReduce or MRv1 (MapReduce version 1), from the time user fires a job to the time when the job is executed on the slave nodes.

MapReduce is a "programming model/software framework" designed to process large amount of data in parallel by dividing the job into a number of independent data local tasks. The term data locality is one of the most important concepts of HDFS/MapReduce, since it helps in drastically reducing the network usage. Data locality means "bringing the compute to data" or moving the algorithm to the datanodes for data processing. It is very cost effective, rather than moving data to the algorithm which is generally found in traditional HPC clusters.


Components of Hadoop MapReduce


1. Client: Client acts as a user interface for submitting jobs and collects various status information.
2. Jobtracker: Jobtracker is responsible for scheduling jobs, dividing job into map and reduce tasks to datanodes, task failure recovery, monitoring jobs and tracking job status.
3. Tasktracker: Tasktracker runs map and reduce tasks and manages intermediate outputs.


MapReduce Job Life Cycle


Introduction

Generally a MapReduce program executes in three stages, namely map stage, shuffle stage and reduce stage. The first phase of MapReduce is called mapping. A MapReduce job is submitted to the jobtracker by the user sitting on a client machine. This MapReduce job contains the job configuration which specifies map, combine and reduce functions. It also contains the job location information about the input splits and output directory path.

The InputFormat class calls the getSplits() function to compute the input splits. These MapReduce input splits come from the input files loaded by the user into the HDFS. An ideal input split size should be one filesystem block size. These input splits' information are then retrieved by the jobscheduler and selects the input file from HDFS for map function with the help of InputFormat class.

The tasktrackers on datanodes periodically communicate with the jobtracker using heartbeat signals to convey their availability status. The jobscheduler  uses the key features like data locality and rack-awareness, and lets the jobtracker assign map tasks to the nearest available tasktrackers through their heartbeat signal return value. In case if a datanode fails, it assigns the tasks to another nearest datanode that has replicated input split. This intelligent placement of data blocks and processing them according to the availability and proximity of datanodes/tasktrackers is achieved by the Hadoop's own technologies - Data Locality and Rack Awareness, making HDFS/MapReduce very unique in it's own kind.

The map tasks run on their respective tasktrackers and datanodes assigned to them. The outputs from these map tasks are written to the local disks. Further sort and shuffle are performed on the ouput data in order to transfer the map outputs to the respective reducers as input. This is known as Shuffle/Sort stage. In this phase the intermediate key/value pairs are exchanged between datanodes so that all values with the same key are sent to a single reducer.

In reduce phase, the shuffled/sorted output is provided as input to the reduce tasks. The reduce function is invoked on each key to produce a more sorted output. Finally the output from each reducer is written to a separate file with prefix name "part-00000" into the HDFS. No two map and reduce tasks communicate with each other. In a MapReduce program 20% of the work is done by the mappers in map phase, whereas other 80% of the work is done by the reducers in reduce phase.

Summary:
1. The client prepares the job for submission and hands it off to the jobtracker.
2. Jobtracker schedules the job and tasktrackers are assigned map tasks.
3. Each tasktracker runs map tasks and updates the progress status of the tasks to the jobtracker periodically.
4. Jobtracker assigns reduce tasks to the tasktrackers as soon as the map outputs are available.
5. The tasktracker runs reduce tasks and updates the progress status of the tasks to the jobtracker periodically.


Stages of MapReduce Job Life Cycle


Job Submission

1. The user submits a job to the client.
2. Client checks the output specifications of the job. If the output directory path is not specified or if the output directory already exists, then it will throw an error to the MapReduce program.
3. Client computes the input splits. If the input directory path is not specified then it will throw an error to the MapReduce program.
4. Client copies the job.jar, job.xml and input split information into the HDFS. The job.jar file is copied with a default replication factor of 10 so that there are ample number of copies for the tasktrackers to access. It can be controlled by the property mapred.submit.replication property.
5. Client tells the jobtracker that the job is ready to be submitted for execution.

Job Initialization


1. Jobtracker takes the job and puts it into an internal queue from where the jobscheduler  will pick it up.
2. Jobscheduler retrieves the input splits from the HDFS which the client had computed earlier.
3. Jobscheduler assigns a map task for each input split. The number of reduce tasks is controlled by the mapred.reduce.tasks property.
4. The tasks are given task ids at this point.


Task Assignment


1. Before assigning a task to the tasktracker, the jobtracker must first choose a job to select a task from.
2. The tasktracker communicates with the jobtracker by periodically sending a heartbeat signal to the jobtracker, to tell the jobtracker that the tasktracker is alive and it's availability for the new job. If the tasktracker is available, then the jobtracker assigns the tasktracker a new task through the heartbeat signal return value.
3. The tasktracker has a fixed number of map and reduce slots. The map slots are filled before the reduce slots.
4. For each map task, the jobscheduler takes into account the network location of the tasktracker and picks a map task that is closest to the input split.

Task Execution


1. Tasktracker copies the job.jar configuration file into the tasktracker's local filesystem.
2. Tasktracker creates a new directory and unjars the job.jar file's content into it.
3. Tasktracker runs an instance called taskrunner to run the task.
4. Taskrunner runs the task inside a jvm so that the buggy user defined variables do not affect the tasktracker.
5. The child process communicates with the parent process periodically to report the status of the job task.


Job Completion & Progress Updates


1. As map tasks complete successfully, they notify their parent tasktracker of the status update which in turn notifies the jobtracker.
2. These notifications are transmitted over the heartbeat communication mechanism. These statuses change over the course of the job.
3. Mappers and reducers on child jvm report to the tasktracker periodically and set a flag to report a task status change.
4. When the jobtracker receives a notification that the last task of the job is completed, it changes the status of the job to "successful".
5. Finally, jobtracker combines all the updates from tasktrackers to provide a global view of job progress status.


Cleanup


1. The jobtracker cleans up it's working state for the job only after confirming that all the reduce tasks are completed successfully and instructs the tasktrackers to do the same.
2. The cleanup activity involves the deletion of intermediate output and other such cleaning/deletion tasks are performed.

Note: The jobtracker is alone responsible for scheduling jobs, dividing job into map and reduce tasks, distributing map and reduce tasks to datanodes, task failure recovery, monitoring jobs and tracking job status. Hence, jobscheduler must not be confused with a separate MapReduce daemon or identity.


Shuffle/Sort Phase


MapReduce is the heart of Hadoop and Shuffle/Sort phase is one of the most expensive part of MapReduce execution where the actual "magic" happens. The process by which mappers separate out outputs for their respective reducers using sort and transfer the data to the intended reducers to be collected and grouped it by key using shuffle is known as Shuffle/Sort phase. The shuffle/sort phase begins after the first map task is completed. There may be several other map tasks still running to process their outputs on their respective datanodes, but they also start exchanging the intermediate outputs from the map tasks to be sent to the respective reducers. Hence, it is not necessary for all map tasks to complete before any reduce task can begin. In the end, grouped keys are processed by the reducers after all map outputs have been received by the reducers.

Map Phase (Preparation Phase)


In map phase, mappers run on unsorted key/value pairs. Mappers generate zero or multiple output key/value pairs for each input key/value pairs. When the map tasks start producing output, each map task writes the output to a circular memory buffer assigned to it. The default size of this circular memory buffer is 100MB and is regulated by the property io.sort.mb.

Partition Phase
When the contents of the buffer reaches a certain threshold size, a background thread starts to divide the spilled data into partitions before writing it to the local disks. The default threshold size of circular memory buffer is 80MB and is controlled by the property io.sort.mb. The number of partitions is dependent upon the number of reducers specified. The number of reduce tasks is defined by the property mapred.reduce.tasks. Each partition contains multiple K*V* pairs. Hence, partitioner decides which reducer will get particular key/value pair.

Sort Phase
Each partition has a set of intermediate keys that are automatically sorted by Hadoop, also known as in-memory sort key process.

Combine Phase
It is an optional phase also known as mini-reduce phase. Combiners combine key/value pairs with the same key together on a single node. Each combiner may run zero or more times. In this phase a more sorted and compact map output is produced so that less data needs to be transferred and written to local disks. Hence, combiners work before spilling the data to the local disk. Since the reduce phase does not operate parallel tasks as is done by map phase, hence it is slow. Combiners help to optimize and speed up the job by drastically reducing the total bandwidth required by the shuffle phase. It reduces the time by performing some work that has to be performed by the reduce phase later.

Compression
Before each spill is written to disk, it is often a good idea to compress the map output so that it is written faster into the disk, consumes less disk space and reduces the amount of data to be transferred to the reducer. By default compression is not enabled. It is also an optional phase. Setting mapred.compress.map.output property to true enables compression.

Merge Phase
The spill is written to the disk (mapred.local.dir). A new spill file will be created every time the buffer reaches the spill threshold. After the map task has generated it's last output record, there can be several spill files created by a single map task.
Before the task is finished, the spill files on the local disk are merged into a single partition. Property io.sort.factor controls the maximum number of spill records that can be merged at once.
Note: The map outputs will continue to be written to the buffer while the spill takes place, but if the buffer fills up during this time, the map will block the outputs until the spill is complete.


Reduce Side (Calculation Phase)

Shuffle Phase / Copy Phase
A thread in the reducer periodically asks the jobtracker for map output locations until it has retrieved them all. Each map task may finish at different times, but the tasktrackers require the map tasks' outputs to run reduce tasks. Hence the reduce tasks start copying the map task outputs as soon as the map task completes. And, the map outputs are not deleted by the tasktracker as soon as the first reducer has retrieved it. Since, MapReduce ensures that the input to reducers are sorted by key, hence all values of same key are always reduced together regardless of it's mapper's origin. Thus map nodes also perform shuffle so that the mapper's intermediate data is copied to their respective locations with the help of partitioners through HTTP.
The map outputs are copied into the tasktracker's memory if they are small enough. This memory/buffer size is controlled by mapred.job.shuffle.input.buffer.percent property. Else, they are copied to the disk. The map outputs that were compressed in the map side are decompressed so that they can be merged in the later stages. When the buffer memory reaches a threshold size (mapred.job.shuffle.merge.percent) or reaches a threshold number of map outputs (mapred.inmem.merge.threshold), the map outputs are merged and spilled to the disk.

Sort/Merge Phase
The copied spills are merged into a single sorted set of key/value pair. MapReduce does not believe in larger buffer sizes and hence, it concentrates more on smaller disk spills and parallelizing spilling/fetching in order to obtain better reduce times.

Reduce Phase
Finally the sorted and merged files are feeded into the reduce functions to get the final output which is written directly to the HDFS. The first block of replicas is written to the local disk.

MapReduce v1 had a single jobtracker to manage all the tasktrackers and the whole queue of jobs which later proved out to be a bottleneck. An inherent delay and latency was discovered in job submission process which led towards the development of alternate solutions like Facebook's Corona and Yahoo's YARN (Yet Another Resource Negotiator).

Note: I have also prepared a brief overview of the above article here. Please share your views and thoughts in the comments section below. Anything that I might have missed here or any suggestions are all welcome.

Thanks for reading.

Sources: hadoop.apache.org, Hadoop - The Definitive Guide

Tags: Apache Hadoop Cloudera Hadoop MapReduceV1 MRV1 Overview Data flow Mechanism in MapReduce Data Processing in MapReduce Flow of Data in MapReduce Internal Flow of MapReduce MapReduce Data Processing MapReduce Model of Data Processing MapReduce Working MapReduce Anatomy Lifecycle of MapReduce Job MapReduce Job Working

Monday, July 13, 2015

Globus Toolkit Installation on CentOS

This is a quickstart guide to install Globus Toolkit 6.0 using yum on CentOS-6.6. The steps mentioned in “GT 6 Quickstart Guide” which is the official documentation of Globus Toolkit were followed during the installation process.The GT 6.0 release provides both source and binary RPM packages for CentOS which can be downloaded from here.
We have two two servers out of which one will act as a master and the other as client. The hostnames of the servers are master and client repectively. You can try out this setup on virtual appliances.

PREREQUISITES

This section is common for master and all client nodes.

System Configuration

First and foremost, it is very important to check if the hosts file matches in all nodes and each node is having other two nodes' network address and hostname mentioned properly.
Turn off the iptables service.
Make sure the nodes are connected to internet.

If you are going for a minimal install of CentOS then we need to make sure that all the below mentioned packages are installed using yum.

root@master # yum install epel-release java wget gcc sed make openssl-devel perl pkgconfig httpd elinks openssh-clients rpm-build

root@master # yum groupinstall “Development Tools”

Globus Repository Configuration

Since we are following the “GT 6 Quickstart Guide”, we will be downloading the latest RPM package which is actually contains the yum repository definition for CentOS-6. This will set up our nodes to use the Globus RPM repository for the installation of required Globus software packages and their dependencies.

Download Globus repository package
root@master # wget -c http://toolkit.globus.org/ftppub/gt6/installers/repo/globus-toolkit-repo-latest.noarch.rpm

Install Globus repository package
root@master # rpm -hUv globus-toolkit-repo-latest.noarch.rpm

Verify by listing the repository
root@master # ls /etc/yum.repos.d/

GLOBUS TOOLKIT INSTALLATION ON “MASTER” NODE


Globus Toolkit Installation

Once we have installed the Globus repository package, we can use yum to install the Globus components.
The below mentioned command will install GridFTP, GRAM, MyProxy, GSI C services as well as set up a basic SimpleCA used to issue security credentials for users to run the Globus services.
root@master # yum install globus-gridftp globus-gram5 globus-gsi myproxy myproxy-server myproxy-admin

Setting up Security

As we are not using any other tool to authorize our grid users, SimpleCA will take care of the basic security and manage it's own Certificate Authority. There is a globus trusted certificate directory automatically created after the installation which contains the public certicates, host certicate and host key files. The host certificate and host key files need to be copied so that the myproxy service can use it as well.

root@master # install -o myproxy -m 644 /etc/grid-security/hostcert.pem /etc/grid-security/myproxy/hostcert.pem

root@master # install -o myproxy -m 600 /etc/grid-security/hostkey.pem /etc/grid-security/myproxy/hostkey.pem

root@master # ls -l /etc/grid-security/

Creating MyProxy Server

MyProxy server is used to store the user's certificates. In order to enable MyProxy to use SimpleCA we need to modify the /etc/myproxy-server.config file, by uncommenting every line in the section “Complete Sample Policy #1” such that the section looks like the below truncated ouput.

#
# Complete Sample Policy #1 - Credential Repository
#
# The following lines define a sample policy that enables all
# myproxy-server credential repository features.
# See below for more examples.
accepted_credentials "*"
authorized_retrievers "*"
default_retrievers "*"
authorized_renewers "*"
default_renewers "none"
authorized_key_retrievers "*"
default_key_retrievers "none"
trusted_retrievers "*"
default_trusted_retrievers "none"
cert_dir /etc/grid-security/certificates

Next we will add the “myproxy” user to the “simpleca” group so that MyProxy server can create certificates.
root@master # usermod -a -G simpleca myproxy

Start the MyProxy service.
root@master # service myproxy-server start

Make it permanent across server reboots
root@master # chkconfig myproxy-server on

Verify the service status.
root@master # service myproxy-server status
root@master # netstat -ntulp | grep 7512

User Credentials

We will create a new local user “Globus User” as it's full name and “guser” as it's user name.
root@master # useradd -c "Globus User" guser

Create a password for the user “guser”.
root@master # passwd guser

Now login using “myproxy” user credentials and set the sbin path in .profile file.
root@master # su - -s /bin/sh myproxy

myproxy@master $ vi ~/.profile
export PATH=$PATH:/usr/sbin

myproxy@master $ source ~/.profile
Next run the below command to create the credentials. It will prompt for a passphrase, which I set as “globus”.

myproxy@master $ myproxy-admin-adduser -c "Globus User" -l guser
Enter PEM pass phrase:
Verifying - Enter PEM pass phrase:
The new signed certificate is at: /var/lib/globus/simple_ca/newcerts/03.pem
using storage directory /var/lib/myproxy
Credential stored successfully
Certificate subject is:
/O=Grid/OU=GlobusTest/OU=simpleCA-master.venus.com/OU=Globus Simple CA/CN=Globus User

Note: Make a note of the “Certificate subject” mentioned in the last line of from the output generated by the above command.

User Authorization

In order to access globus services, a user must have an entry in grid map file. We need to copy the “Certificate subject” as mentioned in earlier and pass it to the “-dn” switch as a parameter. The below mentioned command will create an entry in already present grid map file for this credential.

root@master # grid-mapfile-add-entry -dn "/O=Grid/OU=GlobusTest/OU=simpleCA-master.venus.com/OU=Globus Simple CA/CN=Globus User" -ln guser
Modifying /etc/grid-security/grid-mapfile ...
New entry:
"/O=Grid/OU=GlobusTest/OU=simpleCA-master.venus.com/OU=Globus Simple CA/CN=Globus User" guser
(1) entry added

Verify that the same entry has been generated on /etc/grid-security/grid-mapfile
root@master # cat /etc/grid-security/grid-mapfile

Setting up GridFTP Server

After setting up basic security and authorizing users, we can start globus services. We will start with GridFTP server.

Starting the service related to GridFTP server.
root@master # service globus-gridftp-server start

Making the service persist across server reboots.
root@master # chkconfig globus-gridftp-server on

Verifying the service status.
root@master # service globus-gridftp-server status
root@master # netstat -antupl | grep 2811

Now login as normal user that we initially created “guser” so as to test the GridFTP service.
root@master # su - -s /bin/sh guser

Generate a proxy from myproxy service by using the below mentioned command and provide passphrase.
guser@master $ myproxy-logon -s master

We will test the GridFTP service by copying a file locally.
guser@master $ globus-url-copy gsiftp://master.venus.com/etc/group file:///tmp/guser.test.copy

Verify that both the files are same.
guser@master $ diff /tmp/guser.test.copy /etc/group

After checking that all services are running, a file is transferred successfully and both files are same, we can confirm that the GridFTP server is set up.

Setting up GRAM

Moving on to GRAM, it is a resource manager. GRAM can be configured to use several different Local Resource Managers (LRMs). The default LRM that comes with GRAM is “fork manager”. The GRAM service uses the same host credentials as the GridFTP.

Starting the GRAM service.
root@master # service globus-gatekeeper start

Making the service persist across server reboots.
root@master # chkconfig globus-gatekeeper on

Verifying the service status.
root@master # service globus-gatekeeper status
root@master # netstat -ntupl | grep 2119

Now after verifying that the GRAM service is running, we need to check if the default LRM is enabled.
root@master # globus-gatekeeper-admin -l
jobmanager-fork-poll [DISABLED]

If the LRM is disabled as shown in above output, then we need to enable it using the below mentioned command.
root@master # globus-gatekeeper-admin -e jobmanager-fork-poll

Now login as normal user that we initially created “guser” so as to test the GRAM service.
root@master # su - -s /bin/sh guser

Generate a proxy from myproxy service by using the below mentioned command and provide passphrase.
guser@master $ myproxy-logon -s master

GRAM Authentication test
guser@master # globusrun -a -r master/jobmanager-fork-poll

GRAM Job Submission
Execute the below mentioned commands to run sample jobs locally on a grid compute
guser@master $ globus-job-run master/jobmanager-fork-poll /bin/hostname
master.venus.com

guser@master $ globus-job-run master/jobmanager-fork-poll /usr/bin/whoami
guser

guser@master $ globus-job-run master/jobmanager-fork-poll /bin/date

As a user we can explore more into GRAM commands like globus-job-submit, globus-job-status, globus-job-cancel, and many more.

SimpleCA Configuration

Login as normal user “guser” that we initially created so as to create .globus directory in guser's home directory which consist of necessary authentication certificates.
guser@master $ grid-cert-request

Login back as root and sign the usercert.pem file that we just created and which should be zero kb in size with usercert_request.pem.
root@master # cd /home/guser/.globus/
root@master # grid-ca-sign -in usercert_request.pem -out usercert.pem

Verifying Basic Security

After completing the SimpleCA configuration, the below commands should get valid outputs.

Displaying the Certificate information
guser@master $ grid-cert-info

Displaying the Certificate subject
guser@master $ grid-cert-info -subject

Proxy Verification
guser@master $ grid-proxy-init -verify -debug

GLOBUS TOOLKIT INSTALLATION ON “CLIENT” NODE



Be sure that the prequisites are fixed as a mandatory part before we start with the installation of client machine.

Globus Toolkit Installation

Same as the master node we will start with the installation of globus software using the yum repository that we just set up as suggested in prerequisites section earlier.

root@client # yum install globus-gridftp myproxy globus-gram5

Setting up Security

The below command makes the original SimpleCA that we had set up on master to bootstrap the trust on this client machine.
root@client # myproxy-get-trustroots -b -s master
Bootstrapping MyProxy server root of trust.
New trusted MyProxy server: /O=Grid/OU=GlobusTest/OU=simpleCA-master.venus.com/CN=master.venus.com
New trusted CA (8209248b.0): /O=Grid/OU=GlobusTest/OU=simpleCA-master.venus.com/CN=Globus Simple CA

Trust roots have been installed in /etc/grid-security/certificates/.

Next we will create host certificates for the client on master node. Ensure that you are logged into the master node before proceeding further.
root@master # su - -s /bin/sh myproxy
myproxy@master $ myproxy-admin-addservice -c "client.venus.com" -l client
Enter PEM pass phrase:
Verifying - Enter PEM pass phrase:
The new signed certificate is at: /var/lib/globus/simple_ca/newcerts/04.pem
using storage directory /var/lib/myproxy
Credential stored successfully
Certificate subject is:
 /O=Grid/OU=GlobusTest/OU=simpleCA-master.venus.com/OU=Globus Simple CA/CN=client.venus.com

Now we will retrieve the generated credentials from the client node. Ensure that you are logged into the client node before proceeding further.

root@client # myproxy-retrieve -s master -k client.venus.com -l client
Enter MyProxy pass phrase:
Credentials for client have been stored in /etc/grid-security/hostcert.pem and /etc/grid-security/hostkey.pem.

After retrieving the credentials, we will destroy the client node's host certificate from the master node as it is no longer required.

root@client # myproxy-destroy -s master -k client.venus.com -l client
MyProxy credential 'client.venus.com' for user client was successfully removed

User Credentials

We will create a new local user “Globus User” as it's full name and “guser” as it's user name same as done in master node mentioned in earlier steps.
root@client # useradd -c "Globus User" guser

Create a password for the user “guser”.
root@client # passwd guser

User Authorization

In order to make the user "guser" access the globus services, we need to add the guser's credentials into the grid-mapfile on client node.
root@client # grid-mapfile-add-entry -dn "/O=Grid/OU=GlobusTest/OU=simpleCA-master.venus.com/OU=Globus Simple CA/CN=Globus User" -ln guser
master.venus.com/OU=Globus Simple CA/CN=Globus User" -ln guser
Modifying /etc/grid-security/grid-mapfile ...
New entry:
"/O=Grid/OU=GlobusTest/OU=simpleCA-master.venus.com/OU=Globus Simple CA/CN=Globus User" guser
(1) entry added

Setting up GridFTP

After the set up of security, simple authentication, host certificate and user authorization, we will finally turn on the globus services. First we will start the GridFTP service. Follow the below mentioned commands to start the GridFTP service.

Starting the service related to GridFTP server.
root@client # service globus-gridftp-server start

Making the service persist across server reboots.
root@master # chkconfig globus-gridftp-server on

Verifying the service status.
root@master # service globus-gridftp-server status
root@master # netstat -antupl | grep 2811

Login to guser account.
root@client # su - -s /bin/sh guser

We will use the guser to receive the proxy credentials to access the globus services.
guser@client $ myproxy-logon -s master

We will test copying a file between the GridFTP servers running on master and client nodes.
guser@client $ globus-url-copy gsiftp://master.venus.com/etc/group gsiftp://client.venus.com/tmp/from-master

Note: "-nodcau" switch stands for "no data channel authentication" which turns off data channel authentication for GridFTP transfers.

Setting up GRAM

In this final stage we will submit a sample job into the Grid to test it's working. Before we proceed run the below mentioned command for GRAM authentication.
guser@client $ globusrun -a -r master/jobmanager-fork-poll

Job Submission
guser@client $ globus-job-run master/jobmanager-fork-poll /bin/hostname

The above steps have helped us setting up a GRID using Globus Toolkit 6.0 on CentOS-6.6 successfully.

You are most welcome to leave a comment in case if you want to report any errors in the above steps that you have come across or any suggestions and free advices.

Monday, January 12, 2015

Fully Distributed Hadoop Cluster - Automatic Failover HA Cluster with Zookeeper & QJM

After configuring an automatic failover HA with ZooKeeper and NFS, we will now configure an automatic failover HA with ZooKeeper and QJM.
We will use the Quorum Journal Manager to share edit logs between active and standby namenodes. Any namespace modification done by active namenode is recorded by the journal nodes. These journal node daemons can be run alongside any other daemons like namenode or jobtracker. It is important to note that it is highly recommended to use three journal nodes, so that the edit log information is written to majority of machines.

Setup Summary

The below table describes my setup.

  Role         Hostname   IP-Address
namenode1/     ha-nn01   192.168.56.101
journal-node1 
namenode2/     ha-nn02   192.168.56.102
journal-node2 
namenode3/     ha-nn03   192.168.56.103
journal-node3 
datanode1      ha-dn01   192.168.56.104
datanode2      ha-dn02   192.168.56.105
client         ha-client 192.168.56.101

According to the above table, I have three namenodes which will be concurrently running journal-nodes too. And two datanodes with one client machine.

Downloads
Download the below packages and place them in all nodes.
Download Apache Hadoop 2.6 here.
Download Oracle JDK 8 here.

Note: Before moving directly towards the installation and configuration part, read the pre-checks listed below.
1. Disable firewall on all nodes.
2. Disable selinux on all nodes.
3. Update the hostname and their repective ip-addresses of all nodes in /etc/hosts file on all nodes.
4. It is recommended to use Oracle JDK.

INSTALLATION & CONFIGURATION

Hadoop installation & configuration includes user settings, java installation, passwordless ssh configuration and lastly, hadoop installation and configuration.

User & Group Settings

Location: ha-nn01, ha-nn02, ha-nn03, ha-dn01, ha-dn02, ha-client

First we will create a group named 'hadoop'. Next we create a user 'huser' to perform all hadoop administrative tasks and setup a password for it.

# groupadd hadoop
# useradd -m -d /home/huser -g hadoop huser
# passwd huser

Note: Henceforth we will be using the newly created 'huser' user to perform all hadoop tasks.

Java Installation

Locationha-nn01, ha-nn02, ha-nn03, ha-dn01, ha-dn02, ha-client

To install java, you can refer the blog here.


Passwordless SSH Configuration

Passwordless ssh environment is needed by the namenode to start HDFS & MapReduce related daemons in all nodes.

Location: ha-nn01, ha-nn02, ha-nn03

huser@ha-nn01:~$ ssh-keygen -t rsa
huser@ha-nn01:~$ ssh-copy-id -i ~/.ssh/id_rsa.pub huser@ha-nn01
huser@ha-nn01:~$ ssh-copy-id -i ~/.ssh/id_rsa.pub huser@ha-nn02
huser@ha-nn01:~$ ssh-copy-id -i ~/.ssh/id_rsa.pub huser@ha-nn03
huser@ha-nn01:~$ ssh-copy-id -i ~/.ssh/id_rsa.pub huser@ha-dn01 
huser@ha-nn01:~$ ssh-copy-id -i ~/.ssh/id_rsa.pub huser@ha-dn02

Testing Passwordless SSH
Run the below commands from ha-nn01, ha-nn02 & ha-nn03 to test passwordless logins.
huser:~$ ssh ha-nn01
huser:~$ ssh ha-nn02
huser:~$ ssh ha-nn03
huser:~$ ssh ha-dn01
huser:~$ ssh ha-dn01


ZooKeeper Installation
Location: ha-nn01, ha-nn02, ha-nn03

For zookeeper quorum installation and configuration, you can refer the blog here.

Note: Zookeeper installation and configuration needs to be done only on all namenodes.

Hadoop Installation

We will be installing lastest stable release of Apache Hadoop 2.6.0.

Location: ha-nn01, ha-nn02, ha-nn03, ha-dn01, ha-dn02, ha-client

We will first place the downloaded tarball in /opt directory, untar it and change the ownership of that directory to 'huser' user.

root:~# cd /opt
root:~# tar -xzvf hadoop-2.6.0.tar.gz
root:~# chown -R huser:hadoop hadoop-2.6.0/

Next we will login as 'huser' user and set the environment variables in .bashrc file.

huser:~$ vi ~/.bashrc
###JAVA CONFIGURATION###
JAVA_HOME=/usr/java/jdk1.8.0_25/
export PATH=$PATH:$JAVA_HOME/bin

###HADOOP CONFIGURATION###
HADOOP_PREFIX=/opt/hadoop-2.6.0/
export PATH=$PATH:$HADOOP_PREFIX/bin:$HADOOP_PREFIX/sbin

After making necessary changes to the .bashrc file activate the configured environment settings for 'huser' user by running the below command.
huser:~$ exec bash

Testing Hadoop Installation
Execute the below command to test the successful hadoop installation. It should produce .
huser:~$ hadoop version
Hadoop 2.6.0
Subversion https://git-wip-us.apache.org/repos/asf/hadoop.git -r e3496499ecb8d220fba99dc5ed4c99c8f9e33bb1
Compiled by jenkins on 2014-11-13T21:10Z
Compiled with protoc 2.5.0
From source with checksum 18e43357c8f927c0695f1e9522859d6a

This command was run using /opt/hadoop-2.6.0/share/hadoop/common/hadoop-common-2.6.0.jar

Note: Installation of hadoop has to be done on all nodes.


Hadoop Configuration

There are a couple of files that need to be configured to make hadoop with automatic failover cluster with QJM up and running. All our configuration files reside in /opt/hadoop-2.6.0/etc/hadoop/ directory.

hadoop-env.sh
Location: ha-nn01, ha-nn02, ha-nn03, ha-dn01, ha-dn02, ha-client

huser:~$ vi /opt/hadoop-2.6.0/etc/hadoop/hadoop-env.sh


export JAVA_HOME=/opt/jdk1.8.0_25
export HADOOP_LOG_DIR=/var/log/hadoop/

Create a directory for logs as specified in hadoop-env.sh file with required 'huser' user permissions.

root:~# mkdir /var/log/hadoop
root:~# chown -R huser:hadoop /var/log/hadoop


core-site.xml
Location: ha-nn01, ha-nn02, ha-nn03, ha-dn01, ha-dn02, ha-client

huser:~$ vi /opt/hadoop-2.6.0/etc/hadoop/core-site.xml

<configuration>
 <property>
  <name>fs.default.name</name>
  <value>hdfs://auto-ha</value>
 </property>
</configuration>


hdfs-site.xml
Location: ha-nn01, ha-nn02, ha-nn03, ha-dn01, ha-dn02, ha-client

<configuration>
 <property>
  <name>dfs.replication</name>
  <value>2</value>
 </property>
 <property>
  <name>dfs.name.dir</name>
  <value>file:///hdfs/name</value>
 </property>
 <property>
  <name>dfs.data.dir</name>
  <value>file:///hdfs/data</value>
 </property>
 <property>
  <name>dfs.permissions</name>
  <value>false</value>
 </property>
 <property>
  <name>dfs.nameservices</name>
  <value>auto-ha</value>
 </property>
 <property>
  <name>dfs.ha.namenodes.auto-ha</name>
  <value>nn01,nn02</value>
 </property>
 <property>
  <name>dfs.namenode.rpc-address.auto-ha.nn01</name>
  <value>ha-nn01:8020</value>
 </property>
 <property>
  <name>dfs.namenode.http-address.auto-ha.nn01</name>
  <value>ha-nn01:50070</value>
 </property>
 <property>
  <name>dfs.namenode.rpc-address.auto-ha.nn02</name>
  <value>ha-nn02:8020</value>
 </property>
 <property>
  <name>dfs.namenode.http-address.auto-ha.nn02</name>
  <value>ha-nn02:50070</value>
 </property>
 <property>
  <name>dfs.namenode.shared.edits.dir</name>
  <value>qjournal://ha-nn01:8485;ha-nn02:8485;ha-nn03:8485/auto-ha</value>
 </property>
 <property>
  <name>dfs.journalnode.edits.dir</name>
  <value>/hdfs/journalnode</value>
 </property>
 <property>
  <name>dfs.ha.fencing.methods</name>
  <value>sshfence</value>
 </property>
 <property>
  <name>dfs.ha.fencing.ssh.private-key-files</name>
  <value>/home/huser/.ssh/id_rsa</value>
 </property>
 <property>
  <name>dfs.ha.automatic-failover.enabled.auto-ha</name>
  <value>true</value>
 </property>
 <property>
   <name>ha.zookeeper.quorum</name>
   <value>ha-nn01.hadoop.lab:2181,ha-nn02.hadoop.lab:2181,ha-nn03.hadoop.lab:2181</value>
 </property>
</configuration>


Note:
1. Replication factor is set to '2' as I have only two datanodes.

2. Create a directory /hdfs/name in all namenodes with required 'huser' user permissions.
root:~# mkdir -p /hdfs/name
root:~# chown -R huser:hadoop /hdfs/name

3. Create a directory /hdfs/data in all datanodes with required 'huser' user permissions.
root:~# mkdir -p /hdfs/data
root:~# chown -R huser:hadoop /hdfs/data

4. Create a directory /hdfs/journalnode in all namenodes with required 'huser' user permissions.
root:~# mkdir /hdfs/journalnode
root:~# chown -R huser:hadoop /hdfs/journalnode

4. In ha-client host add the below property to hdfs-site.xml file.
<property>
 <name>dfs.client.failover.proxy.provider.auto-ha</name>
 <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>

6. We can explicitly enable automatic-failover for the nameservice-id 'auto-ha' by setting the property 'dfs.ha.automatic-failover.enabled.auto-ha' to 'true'.


slaves
This file contains only the hostnames of datanodes.

Location: ha-nn01, ha-nn02, ha-nn03

huser:~$ vi /opt/hadoop2.6.0/etc/hadoop/slaves

ha-dn01
ha-dn02

Finally after completing the configuration part, we will be initializing and starting our automatic-failover hadoop cluster.

Initializing HA state in ZooKeeper

Zookeeper needs to initialize the required state by running the below command from any one of the namenodes.

Location: ha-nn01

huser@ha-nn01:~$ hdfs zkfc -formatZK


Starting Journal Nodes

In case of Quorum Journal Manager mechanism to share edit logs, we need to start journalnode daemons on all namenodes.
huser@ha-nn01:~$ hadoop-daemon.sh start journalnode

Formatting & Starting Namenodes

Both the namenodes need to be formatted to start HDFS filesystem.

Location: ha-nn01

huser@ha-nn01:~$ hadoop namenode -format
huser@ha-nn01:~$ hadoop-daemon.sh start namenode

Location: ha-nn02

huser@ha-nn02:~$ hadoop namenode -bootstrapStandby
huser@ha-nn02:~$ hadoop-daemon.sh start namenode

Note: By default both the namenodes will be in 'standby' state.

Starting ZKFC Services

Zookeeper Failover Controller service needs to be started in order to make any one namenode as 'active'. Run the below command on both namenodes.

huser@ha-nn01:~$ hadoop-daemon.sh start zkfc
huser@ha-nn02:~$ hadoop-daemon.sh start zkfc

Note: As soon as the zkfc service is started you can see that one of the namenode is in active state using below command from any one of the namenodes.
huser@ha-nn01:~$ hdfs haadmin -getServiceState nn01
huser@ha-nn01:~$ hdfs haadmin -getServiceState nn02


Starting Datanodes

To start the datanodes run the below mentioned command from any one of the namenodes.
huser@ha-nn01:~$ hadoop-daemons.sh start datanode

Verifying Automatic Failover

To verify the automatic failover, we need to locate the active namenode using command line or by visiting the namenode web interfaces.

Using command line
huser@ha-nn01:~$ hdfs haadmin -getServiceState nn01
huser@ha-nn01:~$ hdfs haadmin -getServiceState nn02

Using web interface
ha-nn01:50070
ha-nn02:50070

After locating the active namenode, we can cause a failure on that node to initiate a failover automatically. One can fail the active namenode by running 'jps' command and kill the namenode daemon by determining it's pid. Within a few seconds the other namenode will automatically become active.


Related Links

Fully Distributed Hadoop Cluster - Automatic Failover HA Cluster with ZooKeeper & NFS