big data lab - kbr · big data lab mongodb and hadoop . 1.0 introduction ... “pending” state,t...
TRANSCRIPT
7701 Greenbelt Road, Suite 400, Greenbelt, MD 20770 Tel: (301) 614-8600 Fax: (301) 614-8601 www.sgt-inc.com
©2015 SGT, Inc. All Rights Reserved
SGT WHITE PAPER
Big Data LabMongoDB and Hadoop
1.0 Introduction
1.1 What is Covered in this Document
2.0 Setup MongoDB
2.1 Amazon AWS Account
2.2 Setup Amazon EC2 instance for MongoDB
2.2.1 Select AMI
2.2.2 Select Instance Type
2.2.3 Number of Instances
2.2.4 Add Storage
2.2.5 Instance Description
2.2.6 Security Group
2.2.7 Create Security Key Pair
2.2.8 Launching Instances
2.2.9 Connect To Instance
2.2.9.1 Generate Private Key
2.2.9.2 Save Private Key
2.2.9.3 Connect With PuTTY
2.2.10 Add Storage Volume for Database
2.3 Install MongoDB
2.3.1 Configure MongoDB
2.4 Create Simulation Data
2.4.1 Move Code to Windows
2.4.2 Configure Number of Providers, Beneficiaries, & Claims
2.4.3 Build JAR
2.4.4 Populate Database
3.0 Setup Cluster
3.1 Setup Amazon EC2 Instances
3.1.1 AWS Sign‐In
3.1.2 Launch Instances
3.1.2.1 Select AMI
3.1.2.2 Select Instance Type
3.1.2.3 Number of Instances
3.1.2.4 Add Storage
3.1.2.5 Instance Description
3.1.2.6 Security Group
3.1.2.7 Create Security Key Pair
3.1.2.8 Launching Instances
3.2 Connecting to Instances with SSH Client
3.2.1 Enable Public Access
3.3 Setup WinSCP Access
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
4.0 Build Mongo‐Hadoop Connector
4.1 Download Mongo‐Hadoop Connector
4.2 Build Mongo‐Hadoop Connector Jars
5.0 Install & Configure Hadoop
5.1 Update Package Dependencies
5.2 Install Java
5.3 Download Hadoop
5.4 Setup Environment Variables
5.5 Setup Password‐less SSH on Servers
5.6 Copy Mongo‐Hadoop Jars to Hadoop
5.7 Hadoop Cluster Setup
5.7.1 hadoop‐env.sh
5.7.2 core‐site.xml
5.7.3 hdfs‐site.xml
5.7.4 mapred‐site.xml
5.7.5 Move Configuration to all Nodes
5.7.6 Configure Master Nodes
5.7.6.1 Modify masters File on Master
5.7.6.2 Modify slaves File on Master
5.7.6.3 Copy masters & slaves Files to All Master Nodes
5.7.7 Configure Slave Nodes
5.7.7.1 Modify slaves file on slave1
5.7.7.2 Copy slaves file to all other slave nodes
5.7.8 Hadoop Daemon Startup
5.7.9 Verify HTTP Status
6.0 Run Map Reduce Programs
6.1 What Is Map Reduce
6.1.1 Mapper Code
6.1.2 Reducer Code
6.2 Build & Run Map Reduce Jars
6.2.1 Build JAR Files
6.2.2 Run JAR Files
6.2.3 Other JAR Files
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
1.0 Introduction
The prototype health care claims system model was built to explore the possibilities of using MongoDB and
Hadoop for big data processing across a cluster of servers in the cloud. The idea was to use MongoDB to store
hundreds of millions of records and then to use Hadoop’s map‐reduce functionality to efficiently process the data.
MongoDB is a NoSQL, document based, database that provides easy scalability, thus it was a fitting choice for this
project. Apache Hadoop is a framework that enables the distributed processing of large data sets across clusters of
computers. Hadoop is designed to scale from use on a single server, to use on thousands of servers.
1.1 What is Covered in this Document
This document covers how to setup MongoDB and a Hadoop cluster in an Amazon cloud environment using
Amazon AWS Ubuntu instances. It includes how to use my code to generate large amounts of data for MongoDB. It
also covers how to setup the Mongo‐Hadoop Connector on a cluster of machines running Hadoop. The concept of
Map‐Reduce is explained and some code is shown to demonstrate how it works and what to put in the
configuration XML file based on the code. Finally it demonstrates how to run map‐reduce programs that were
written for this project that can be retrieved from gitHub.
2.0 Setup MongoDB
MongoDB is a document based NoSQL that provides high performance and easy scalability. Database objects
(documents) easily map to programming language data types so this database will be for the objective of this
project. We are going to setup MongoDB on a dedicated server in the cloud using Amazon Web Services.
2.1 Amazon AWS Account
If you do not already have an Amazon AWS account, please create a new one here (http://aws.amazon.com/).
Since the signup process is straight forward, I am going to skip this step.
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
Sign in to your account.
2.2 Setup Amazon EC2 instance for MongoDB
Once you have signed up for an Amazon account, Login to Amazon Web Services, click on “My Account” and select
AWS Management Console.
Than navigate to the EC2 console.
Once you are on the EC2 Console homepage, click “Launch Instance”
2.2.1 Select AMI
I used Ubuntu Server 12.04.3 64‐bit OS
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
2.2.2 Select Instance Type
Select a Medium instance (m3.medium)
2.2.3 Number of Instances
For this MongoDB setup, I used a single instance to store the database.
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
2.2.4 Add Storage
Choose 8GB for the size of your main drive.
Since we are going to put a large database on this instance, 8 GB will not be nearly big enough, but we will add
additional storage later on a separate drive.
2.2.5 Instance Description
Optionally you can add a description so that you can keep your instances straight later.
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
2.2.6 Security Group
Next we will create a new security group. We will modify the rules later on.
2.2.7 Create Security Key Pair
Review and Launch Instance.
Amazon EC2 uses public–key cryptography to encrypt and decrypt login information. Public–key cryptography uses
a public key to encrypt a piece of data, such as a password, and then the recipient uses the private key to decrypt
the data. The public and private keys are known as a key pair.
Create a new keypair and give it a name “hadoopec2cluster” and download the keypair (.pem) file to your local
machine. Click Launch Instance
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
2.2.8 Launching Instances
Once you click “Launch Instance” you will be redirected to the Launch Status page. In the bottom right corner you
should see a button for View Instance.
Once you click this link, you will be taken back to the EC2 console page. Initially, your instance will be in a
“pending” state, but after a few moments, it should change to “running.”
When you create a group with default options it add a rule for SSH at port 22. In order to have TCP and ICMP
access we need to add 2 additional security rules. Add ‘All TCP’, ‘All ICMP’ and ‘SSH (22)’ under the inbound rules
to “HadoopEC2SecurityGroup”. This will allow ping, SSH, and other similar commands among servers and from any
other machine on the internet. Make sure to “Apply Rule changes” to save your changes.
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
These protocols and ports are also required to enable communication among cluster servers. As this is a test setup
we are allowing access to all for TCP, ICMP and SSH and not bothering about the details of individual server port
and security.
2.2.9 Connect To Instance
We are going to use Putty to connect to our instances. To do this we will need two applications.
1. PuTTY(http://the.earth.li/~sgtatham/putty/latest/x86/putty.exe)
2. PuTTYgen(http://the.earth.li/~sgtatham/putty/latest/x86/puttygen.exe)
2.2.9.1 Generate Private Key
On your local machine, download the PuTTY and PuTTYgen executables and place them wherever you like. Now
we need to generate a private key for our instance using PuttyGen. Let’s launch PUTTYGEN client and import the
key pair we created during launch instance step – “KeyForServer.pem” (hadoopec2cluster)
Navigate to Conversions and “Import Key”
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
Once you import the key you can enter passphrase to protect your private key or leave the passphrase fields blank
to use the private key without any passphrase. Passphrase protects the private key from any unauthorized access
to servers using your machine and your private key.
Any access to server using passphrase protected private key will require the user to enter the passphrase to enable
the private key enabled access to AWS EC2 server.
2.2.9.2 Save Private Key
Now save the private key by clicking on “Save Private Key” and click “Yes” as we are going to leave passphrase
empty.
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
Save the .ppk file and give it a meaningful name.
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
Close PuttyGen. We are now ready to connect to our Amazon Instance Machine.
2.2.9.3 Connect With PuTTY
Let’s connect to our instance now. Launch Putty client, grab the public DNS, import the .ppk private key that we
just created for password‐less SSH access. As per amazon documentation, for Ubuntu machines username is
“ubuntu”
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
when you launch the session first time, you will see below message, click “Yes”
and will prompt you for the username, enter ubuntu, if everything goes well you will be
presented welcome message with Unix shell at the end.
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
If there is a problem with your key, you may receive below error message
2.1.10 Add Storage Volume for Database
Now we have to add storage to our instance so that we can store the database. On the left‐hand side of the online
EC2 console, click on “Volumes.”
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
Then Click “Create Volume” at the top of the page and add the amount of storage you will require for your
database. Since I will be using many different sizes of databases in various tests, I chose 400GB, so that I could
store a few different databases at the same time.
Make sure that you choose the same Availability zone as your MongoDB instance. Here it is us‐west‐2b.
Once we have created a volume, we now have to attach it to an instance. Right click on the new volume and click
“Attach Volume”
Type in the tag that we previously entered (MongoDB_Instance) for the instance, and “/dev/sdf” for the device.
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
Navigate back to “Instances”, select MongoDB_Instance, and confirm that /dev/sdf now shows up in the instance
details.
Connect to your instance using Putty, or some other SSH client. Using the lsblk command you should see your new
volume listed.
Notice that our new drive shows up as xvdf, and it doesn’t have a mountpoint yet. First we need to
create a filesystem on our new drive. To do this run the command “sudo mkfs –t ext4 /dev/xvdf”
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
Now we need to mount the drive with the “sudo mount /dev/xvdf /mnt” command.
Run lsblk again and you should now see the device mountpoint set to “/mnt”.
Now we need to set this to mount on every system reboot so that we don’t have to keep manually mounting the
drive. First create a backup of your /etc/fstab file so that you can use it if you accidentally destroy or delete this file
while it is being edited.
Edit the file to show the following:
Save and close the file. Run “sudo mount –a” to ensure that you have edited the file correctly. If there is no error
message, then there is no error in the fstab file and you are good to continue.
WARNING: Errors in /etc/fstab can render a system unbootable. Do not shut down a system that has
errors in the /etc/fstab file!
At this point, our new volume will now be mounted upon every reboot.
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
2.3 Install MongoDB
Now we need to install MongoDB. First we will need to import the public key used by the package management system. To import the key, issue the following command:
$ sudo apt‐key adv ‐‐keyserver hkp://keyserver.ubuntu.com:80 ‐‐recv 7F0CEB10
Next we will need to create a list file for MongoDB with the following command:
$ echo 'deb http://downloads‐distro.mongodb.org/repo/ubuntu‐upstart dist 10gen' | sudo tee /etc/apt/sources.list.d/mongodb.list
Then ensure that all local packages are up to date:
$ sudo apt‐get update
We can now install the latest stable version of MongoDB with the following command:
$ sudo apt‐get install mongodb‐10gen
2.3.1 Configure MongoDB
Now that MongoDB is installed, we need to choose a location for the database files. If we try start MongoDB right
now using “sudo mongod” you will see an error because the default data directory (/data/db/) doesn’t exist.
To fix this error, you can simply create a directory “/data/db/”, however, we are going to set the default directory
to our newly mounted drive(volume). To do this you can either manually reset the default path in the
/etc/mongodb.conf by changing
dbpath=/var/lib/mongodb
To
dbpath=/mnt
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
Or when you start MongoDB, you can simply pass in the mount location of our new drive with the dbpath option
with the following command:
$ sudo mongod ‐‐dbpath /mnt
MongoDB should now start without errors. To view your databases, open a new terminal and type “mongo”.
2.4 Create Simulation Data
Now that we have MongoDB installed, we need to create data that we can use so that we can do something with
it. We will use a simulation data generation program that I wrote that can be found on github (stash). First we have
to install git. Issue the following command to do this:
$ sudo apt‐get install git
Answer “Y” when prompted if you want to download the file. Now get this code, we will use git on our Ubuntu
instance. Issue the following commands:
$ cd ~
$ git clone http://[email protected]/stash/scm/hccp/simulationdatagenerator.git
When prompted for a password for the repository, enter “issuemanager”. Now you can list all the files in the home
directory and you should see a new directory named “simulationdatagenerator.”
2.4.1 Move Code to Windows
Now we are going to use WinSCP to copy this over to our local windows machine so we can build the code in
Eclipse. You can download WinSCP here. Once downloaded, simply run the winscp553setup.exe and choose the
installation directory. Once the installer is finished running, navigate to your new WinSCP installation directory and
run WinSCP.
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
You will need to Provide the public‐DNS for the Amazon EC2 instance, The User Name “ubuntu”, and the private
key file that we generated with PuTTYgen.
When you are asked if you trust this host, press Yes. This will open a secure copy instance and you will simply need
to drag and drop the “simulationdatagenerator” directory over to your machine.
Now you simply need to open this project with eclipse. If you do not have eclipse, it can be downloaded
here(https://www.eclipse.org/downloads/). I am using Eclipse IDE for Java EE Developers, Windows 64 Bit, but
Eclipse Standard 4.3.2 should work fine.
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
When you first launch eclipse, it will ask you to select a workspace folder. Select the folder that we just copied the
simulationdatagenerator directory to (in this case C:\Users\UserName\Documents\workspace).
Once eclipse loads, go to File > Import and select “Existing Project into Workspace”.
On the Import Projects page, select “Select root directory:” and find our “simulationdatagenerator” directory.
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
Click Finish and now you should see the project appear in the Project Explorer.
2.4.2 Configure Number of Providers, Beneficiaries, & Claims
Before we run the program, we need to choose how much simulation data we want. Open up the
MongoDBdataImport.java file.
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
Near the top of this file, you will see a section that looks like this:
This is where you set how many beneficiaries, providers, and claims you want in your database. Set the ‘totalBen’
variable to the number of Beneficiaries you want in the database. In this case there would be 5000 Beneficiary
database documents generated. Likewise, set ‘numProviders’ to the number of Providers you want in the
database. In this case there would only be 25 Providers. To set the number of claims you use the ‘max’ and ‘min’
variables. For each Provider, the program will choose a random number within the range of max‐min inclusively
and create that many claims for that particular provider. If you want all Providers to have the same number of
claims simply set max equal to min.
For example, if you set these values to
totalBen = 100000000
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
numProviders = 25000
max = 6000
min = 2000
This would generate a database with data for 100 million beneficiaries, 25 thousand providers, and each provider
would have a different random number of claims between 2000 and 6000 (so one might have 2349, another might
have 4302 etc.), with an overall number of claims roughly at 100 million. If instead we set max and min both equal
to 4000, so that we have the following:
totalBen = 100000000
numProviders = 25000
max = 4000
min = 4000
Then we could generate a database with 100 million beneficiaries, 25 thousand providers, and exactly 4000 claims
per provider for a total of exactly 100 million claims.
The program will generate two collections in a database called testData. There will be one collection called
Beneficiaries, which will simply have a list of beneficiaries and their information. A database document for a
beneficiary will look something like:
The second collection will be a list of Providers and each provider will have subdocuments that represent the
claims filed by the beneficiaries of that particular provider. A Provider database document will look something like:
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
2.4.3 Build JAR
To run this code on our Instance out in the cloud, we first need to build a Jar file for it using eclipse. To do this,
hold down Ctrl and select both ‘com.generateData’ and ‘zipCityStateLatLong.csv’ in the Project Explorer, right click
and select export.
Choose to export to a JAR file.
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
Then check the ‘Export generated class files and resources’, ‘Compress the contents of the Jar file’, and the
‘zipCityStateLatLong.csv’ checkboxes. Choose a location and name for the JAR file and click Finish.
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
Now open back up WinSCP and copy your new JAR file over to your Amazon EC2 instance that is running
MongoDB.
2.4.4 Populate Database
Before you can run the jar file, you will need to install java and download the most current version of MongoDB
java driver. The MongoDB java driver can be downloaded here (https://github.com/downloads/mongodb/mongo‐
java‐driver/mongo‐2.10.1.jar). Once that is downloaded, copy it over to your EC2 instance.
Before we install java let’s update the packages on our instance.
$ sudo apt‐get update
To install java run the following commands.
$ sudo add‐apt‐repository ppa:webupd8team/java
$ sudo apt‐get update
$ sudo apt‐get install oracle‐jdk7‐installer
Run the ‘java –version’ command to ensure that it installed properly.
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
After this is done, use the following command from the directory you copied the JAR files into to populate the database:
$ java ‐cp test.jar:mongo‐2.10.1.jar com.generateData.MongoDBdataImport
Note: This can take a long time (a few days) if you are generating hundreds of millions of beneficiary records.
After the jar is done running, you should see output like this:
We can then go into MongoDB and double check that the data was created.
$ mongo > show dbs > use testData > show collections > db.Provider.count() > db.Beneficiary.count()
The output should look like the following.
3.0 Setup Cluster
We will setup a 4 node cluster for running Hadoop using medium instances. You can stop the instance anytime to avoid the charge, but you will lose the public IP and host and restarting the instance will create new ones. You can also terminate your Amazon EC2
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
instance anytime and by default it will delete your instance upon termination, so just be careful what you are doing.
3.1 Setup Amazon EC2 Instances
tmp
3.1.1 AWS Sign‐In
Sign back into your Amazon AWS account.
3.1.2 Launch Instances
Once you have signed up for an Amazon account, Login to Amazon Web Services, click on “My Account” and select
AWS Management Console.
Than navigate to the EC2 console.
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
Once you are on the EC2 Console homepage, click “Launch Instance”
3.1.2.1 Select AMI
I used Ubuntu Server 12.04.3 64‐bit OS
3.1.2.2 Select Instance Type
Select a Medium instance (m3.medium)
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
3.1.2.3 Number of Instances
Select put 4 for the number of instances. We are going to configure the cluster with 1 master and 3 slave nodes.
3.1.2.4 Add Storage
Choose 8GB, which is the minimum amount of storage allowed for a medium instance.
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
3.1.2.5 Instance Description
Leave the instance description empty since we are adding multiple instances, we will add these individually later.
3.1.2.6 Security Group
Choose “Select an existing security group” and choose the one we created for our instance running MongoDB,
HadoopEC2SecurityGroup.
3.1.2.7 Create Security Key Pair
Finally Select “Review and Launch” and then if everything looks right select “Launch.” When prompted to select a
key pair, select “Choose and existing key pair” and use the same key pair we previously created,
hadoopec2keypair. This way we can use the same private key that we generated with PuTTYgen to connect to all
of our instances.
Once you click “Launch Instance” 4 instance should be launched with “pending” state
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
Once in “running” state we are now going to rename the instance name as below.
1. namenode (Master)
2. slave1 (data node will reside here)
3. slave2 (data node will reside here)
4. slave3 (data node will reside here)
You should now have five instances running, one for the MongoDB database, and the four new ones we just created.
If you used the same security group for all of them (HadoopEC2SecurityGroup) then we shouldn’t have to change
any of the security rules at this point, however if you used a different security group, then you will have to repeat the
steps in section 2.1.8.
3.2 Connecting to Instances with SSH Client
Let’s connect to our instance now. Launch Putty client, grab the public DNS for each instance, import the .ppk
private key that we created in section 2.1.9.2. As per amazon documentation, for Ubuntu machines username is
“ubuntu”
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
when you launch the session first time, you will see below message, click “Yes”
and will prompt you for the username, enter ubuntu, if everything goes well you will be
presented welcome message with Unix shell at the end.
If there is a problem with your key, you may receive below error message
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
Similarly connect to remaining 3 machines slave1, slave2, and slave3 respectively to make
sure you can connect successfully.
3.2.1 Enable Public Access
Back on the Amazon EC2 Console; retrieve the Public DNS, Public IP, and the Private IP’s for all five instances.
Next we are going to update the hostname with the ec2 public URL and update the /etc/hosts file to map the ec2
public URL with the ip address. Finally we are going to setup aliases for each node’s ip address and the ip addresses
for the nodes that each node will communicate with. This will help us configure the master and slaves with these
aliases instead of the ip addresses. This is useful because when you shut down an EC2 instance, you will lose your
ip address. We are going to set it up so that when you turn your cluster off and then back on again later you will
only have to modify the /etc/hosts file instead of having to modify all of your Hadoop configuration files.
We are going to add the following format to this file on all four our instances in the Hadoop cluster.
The general format of our master node (namenode) /etc/hosts file will look like:
Private IP of namenode Public DNS of namenode
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
Private IP of namenode namenode alias
Public IP of slave1 slave1 alias
Public IP of slave2 slave2 alias
Public IP of slave3 slave3 alias
Public IP of MongoDB_Instance MongoDB_Instance alias
The general format of our slave nodes’ (slave1, slave2, slave3) /etc/hosts file will look like:
Private IP of the slave machine Public DNS of the slave machine
Private IP of the slave machine slave machine alias (slave1, slave2, or slave3)
Public IP of namenode namenode alias
So these files on all four machines will look like:
3.3 Setup WinSCP Access
In order to securely transfer files from your windows machine to Amazon EC2 WinSCP is a handy utility.
Provide hostname, username and private key file and save your configuration and Login
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
If you see above error, just ignore and upon successful login you will see unix file system of a logged in user
/home/ubuntu your Amazon EC2 Ubuntu machine.
Upload the .pem file to master machine (namenode). It will be used while connecting to slave nodes during
hadoop startup daemons and connecting to the our MongoDB_Instance during map‐reduce job execution.
4.0 Build Mongo‐Hadoop Connector
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
Tmp Since we are using MongoDB for input and output, which is not standard Hadoop input/output, we will need
to use the MongoDB Connectro for Hadoop. The connector is used to pull data from MongoDB and route it to
Hadoop Map‐Reduce jobs, process it, and then return it back to a MongoDB collection.
4.1 Download Mongo‐Hadoop Connector
To download the connector, first you will need to install git. Enter the following command from your namenode
instance.
$ sudo apt‐get install git
Answer ‘Y’ when asked if you want to download git. Enter the following command.
$ cd ~
$ git clone https://github.com/mongodb/mongo‐hadoop.git
Then change directories to your new mongo‐hadoop directory.
$ cd mongo‐hadoop
We will be using a self‐bootstrapping version of sbt to build the JAR file for this code. You can find full
documentation in the README.md file that is included in this package and online at
https://github.com/mongodb/mongo‐hadoop.
4.2 Build Mongo‐Hadoop Connector Jars
Since we will be using Hadoop 1.2.1, we will build the connector for Hadoop 1.1. The reason that we set it to build
for 1.1 is because the changes in between Hadoop 1.1 and 1.2 do not affect the Mongo‐Hadoop Connector so
there isn’t a separate build for version 1.2. To build the JAR file we will be using the general purpose build tool,
Gradle, that is included in this version (1.2.1 at the time of writing) of the Mongo‐Hadoop Connector. To build
against Hadoop 1.1, enter the following command.
$ ./gradlew jar –Phadoop_version=’1.1’
The JAR file for the core module should be placed in core/build/libs.
5.0 Install & Configure Hadoop
5.1 Update Package Dependencies
Let’s update the packages , I will start with master , repeat this for SNN and 2 slaves.
$ sudo apt‐get update
Once its complete, let’s install java
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
5.2 Install Java
Use the following commands to install the latest Oracle Java (JDK) 7 in Ubuntu.
$ sudo add‐apt‐repository ppa:webupd8team/java $ sudo apt‐get update $ sudo apt‐get install oracle‐jdk7‐installer
Run the ‘java –version’ command to ensure that it installed properly.
Repeat this for all other nodes as well. (slave1, slave2, & slave3)
5.3 Download Hadoop
I am going to use haddop 1.2.1 stable version from apache download page and here is the 1.2.1 mirror
issue wget command from shell
$ wget http://apache.mirror.gtcomm.net/hadoop/common/hadoop‐1.2.1/hadoop‐1.2.1.tar.gz
Unzip the files and review the package content and configuration files.
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
$ tar ‐xzvf hadoop‐1.2.1.tar.gz
For simplicity, rename the ‘hadoop‐1.2.1’ directory to ‘hadoop’ for ease of operation and maintenance.
$ mv hadoop‐1.2.1 hadoop
5.4 Setup Environment Variables
Setup Environment Variable for ‘ubuntu’ user
Update the .bashrc file to add important Hadoop paths and directories.
Navigate to home directory
$cd
Open .bashrc file in vi edito
$ vi .bashrc
Add following at the end of file
export HADOOP_CONF=/home/ubuntu/hadoop/conf
export HADOOP_PREFIX=/home/ubuntu/hadoop
#Set JAVA_HOME
export JAVA_HOME=/usr/lib/jvm/java‐7‐oracle
# Add Hadoop bin/ directory to path
export PATH=$PATH:$HADOOP_PREFIX/bin
Save and Exit.
To check whether its been updated correctly or not, reload bash profile, use following commands
source ~/.bashrc
echo $HADOOP_PREFIX
echo $HADOOP_CONF
Repeat 5.3 and 5.4 for remaining 3 instances (slave1, slave2, & slave3).
5.5 Setup Password‐less SSH on Servers
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
Master server remotely starts services on slave nodes, which requires password‐less access to Slave Servers. AWS
Ubuntu server comes with pre‐installed OpenSSh server.
Quick Note:
The public part of the key loaded into the agent must be put on the target system in ~/.ssh/authorized_keys. This
has been taken care of by the AWS Server creation process
Now we need to add the AWS EC2 Key Pair identity haddopec2cluster.pem to SSH profile. In order to do that we
will need to use following ssh utilities
‘ssh‐agent ‐s’ is a background program that handles passwords for SSH private keys.
‘ssh‐add’ command prompts the user for a private key password and adds it to the list maintained by ssh‐agent. Once you add a password to ssh‐agent, you will not be asked to provide the key when using SSH or SCP to connect to hosts with your public key.
Amazon EC2 Instance has already taken care of ‘authorized_keys’ on master server, execute following commands to allow password‐less SSH access to slave servers.eval
First of all we need to protect our keypair files, if the file permissions are too open (see below) you will get an error
To fix this problem, we need to issue following commands
$ chmod 644 .ssh/authorized_keys
Quick Tip: If you set the permissions to ‘chmod 644′, you get a file that can be written by you, but can only be read by the rest of the world.
$ chmod 400 haddoec2cluster.pem
Quick Tip: chmod 400 is a very restrictive setting giving only the file onwer read‐only access. No write / execute capabilities for the owner, and no permissions what‐so‐ever for anyone else.
To use ssh‐agent and ssh‐add, follow the steps below:
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
1. At the Unix prompt, enter: eval `ssh‐agent ‐s`Note: Make sure you use the backquote ( ` ), located under the tilde ( ~ ), rather than the single quote ( ' ).
2. Enter the command: ssh‐add hadoopec2cluster.pem
if you notice .pem file has “read‐only” permission now and this time it works for us.
**Keep in mind ssh session will be lost upon shell exit and you have repeat ssh‐agent and ssh‐add commands. It is
recommended that you simply add these commands to the .bashrc file so that you don’t have to repeat yourself
everytime you login. Add the following to the end of the .bashrc file on the Master node (namenode):
eval `ssh-agent –s` ssh-add /home/ubuntu/ hadoopec2cluster.pem Now let’s verify that we can connect to all of our slave nodes and our mongo node with SSH.
Type “exit” to close your SSH session and return to your master node.
5.6 Copy Mongo‐Hadoop Jars to Hadoop
Now we need to copy that mongo‐hadoop JAR that we built into Hadoop’s lib folder. For version 1.2.1, the lib
folder is simply hadoop/lib. To do this issue the following commands.
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
$ cd ~/mongo‐hadoop/core/build/libs
$ cp mongo‐hadoop‐core*.jar ~/hadoop/lib/mongo‐hadoop‐core.jar
You will also need the MongoDB java driver, mongo‐2.10.1.jar that we used earlier to populate the MongoDB
database. Simply copy this over with WinSCP.
Now to copy these files to all of the slave nodes, issue the commands:
$ scp mongo‐2.10.1.jar mongo‐hadoop‐core.jar ubuntu@slave1:/home/ubuntu/hadoop/lib
$ scp mongo‐2.10.1.jar mongo‐hadoop‐core.jar ubuntu@slave2:/home/ubuntu/hadoop/lib
$ scp mongo‐2.10.1.jar mongo‐hadoop‐core.jar ubuntu@slave3:/home/ubuntu/hadoop/lib
5.7 Hadoop Cluster Setup
This section will cover the hadoop cluster configuration. We will have to modify
hadoop‐env.sh ‐ This file contains some environment variable settings used by Hadoop. You can use these to affect
some aspects of Hadoop daemon behavior, such as where log files are stored, the maximum amount of heap used
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
etc. The only variable you should need to change at this point is in this file is JAVA_HOME, which specifies the path
to the Java 1.7.x installation used by Hadoop.
core‐site.xml – key property fs.default.name – for namenode configuration for e.g hdfs://namenode/
hdfs‐site.xml – key property ‐ dfs.replication – by default 3
mapred‐site.xml ‐ key property mapred.job.tracker for jobtracker configuration for e.g jobtracker:8021
We will first start with master (NameNode) and then copy above xml changes to remaining 3 nodes (SNN and
slaves)
Finally, in section 1.6.2 we will have to configure conf/masters and conf/slaves.
masters ‐ defines on which machines Hadoop will start secondary NameNodes in our multi‐node cluster.
slaves ‐ defines the lists of hosts, one per line, where the Hadoop slave daemons (datanodes and tasktrackers) will
run.
Lets go over one by one. Start with masters (namenode).
5.7.1 hadoop‐env.sh
$ vi $HADOOP_CONF/hadoop‐env.sh and add JAVA_HOME shown below and save changes.
tmp
5.7.2 core‐site.xml
This file contains configuration settings for Hadoop Core (for e.g I/O) that are common to HDFS and MapReduce
Default file system configuration property – fs.default.name goes here it could for e.g hdfs / s3 which will be used
by clients.
$ sudo $HADOOP_CONF/core‐site.xml
We are going t0 add two properties
fs.default.name will point to NameNode URL and port (usually 8020)
hadoop.tmp.dir ‐ A base for other temporary directories. Its important to note that every node needs hadoop tmp
directory. I am going to create a new directory “hdfstmp” as below in all 4 nodes. Ideally you can write a shell
script to do this for you, but for now going the manual way.
$ cd
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
$ mkdir hdfstmp
Quick Tip: Some of the important directories are dfs.name.dir, dfs.data.dir in hdfs‐site.xml. The default value for
the dfs.name.dir is ${hadoop.tmp.dir}/dfs/data and dfs.data.dir is${hadoop.tmp.dir}/dfs/data. It is critical that you
choose your directory location wisely in production environment.
<configuration>
<property>
<name>fs.default.name</name>
<value>hdfs://namenode:8020</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/home/ubuntu/hdfstmp</value>
</property>
</configuration>
5.7.3 hdfs‐site.xml
This file contains the configuration for HDFS daemons, the NameNode, SecondaryNameNode and data nodes.
We are going to add 2 properties
dfs.permissions.enabled with value false, This means that any user, not just the “hdfs” user, can do anything they
want to HDFS so do not do this in production unless you have a very good reason. if “true”, enable permission
checking in HDFS. If “false”, permission checking is turned off, but all other behavior is unchanged. Switching from
one parameter value to the other does not change the mode, owner or group of files or directories. Be very careful
before you set this
dfs.replication – Default block replication is 3. The actual number of replications can be specified when the file is
created. The default is used if replication is not specified in create time. Since we have 3 slave nodes we will set
this value to 3.
<configuration>
<property>
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
<name>dfs.replication</name>
<value>3</value>
</property>
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>
</configuration>
5.7.4 mapred‐site.xml
This file contains the configuration settings for MapReduce daemons; the job tracker and the task‐trackers.
The mapred.job.tracker parameter is a hostname (or IP address) and port pair on which the Job Tracker listens for
RPC communication. This parameter specify the location of the Job Tracker for Task Trackers and MapReduce
clients.
JobTracker will be running on master (NameNode)
<configuration>
<property>
<name>mapred.job.tracker</name>
<value>hdfs://namenode:8021</value>
</property>
</configuration>
5.7.5 Move Configuration to all Nodes
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
Now, we are done with hadoop xml files configuration master, lets copy the files to remaining 3 nodes using secure
copy (scp)
start with SNN, if you are starting a new session, follow ssh‐add as per section 1.5
from master’s unix shell issue below command
$ scp hadoop‐env.sh core‐site.xml hdfs‐site.xml mapred‐site.xml ubuntu@slave1:/home/ubuntu/hadoop/conf
$ scp hadoop‐env.sh core‐site.xml hdfs‐site.xml mapred‐site.xml ubuntu@slave2:/home/ubuntu/hadoop/conf
$ scp hadoop‐env.sh core‐site.xml hdfs‐site.xml mapred‐site.xml ubuntu@slave3:/home/ubuntu/hadoop/conf
5.7.6 Configure Master Nodes
Every hadoop distribution comes with master and slaves files. By default it contains one entry for localhost, we
have to modify these 2 files the master node (namenode) and “slaves” (slave1, slave2, & slave3) machines – since
we only have one master, we have no dedicated machine for SecondaryNamdenode, however if you had a much
more labor intensive program, you would want to have a second master instance dedicated for
SecondaryNamenode.
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
5.7.6.1 Modify masters File on Master
conf/masters file defines on which machines Hadoop will start Secondary NameNodes in our multi‐node cluster. In our case, there will only be one master, but keep in mind that running a second master node with one dedicated for
Hadoop HDFS user guide : “The secondary NameNode merges the fsimage and the edits log files periodically and keeps edits log size within a limit. It is usually run on a different machine than the primary NameNode since its memory requirements are on the same order as the primary NameNode. The secondary NameNode is started by “bin/start‐dfs.sh“ on the nodes specified in “conf/masters“ file.“
$ vi $HADOOP_CONF/masters
Provide an entry for the hostname where you want to run NameNode daemon. In our case this is namenode. We can simply use the aliases we setup in section 3.2.1.
5.7.6.2 Modify slaves File on Master
The slaves file is used for starting DataNodes and TaskTrackers. We will add all of our dedicated slave instances (slave1, slave2, & slave3) using the aliases for their public IP addresses that we set up in section 3.2.1, and we will also run our master node (namenode) as a slave node, however, on larger clusters with huge amounts of data, you will not want to run the DataNode and TaskTracker daemon on the master as well.
$ vi $HADOOP_CONF/slaves
5.7.6.3 Copy masters & slaves Files to All Master Nodes
Since I’m not actually using more than one master node, this step can be skipped, but if you wanted to run more than one master each master would have an exact copy of the masters and slaves files from our namenode instance. All other master nodes would have to be listed in the masters file as well. To copy the configuration to other master nodes:
$ scp masters slaves ubuntu@someOtherMasterNode:/home/ubuntu/hadoop/conf
5.7.7 Configure Slave Nodes
Since we are configuring slaves (slave1, slave2, & slave3), masters file on all of the slave machines are going to be empty
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
$ vi $HADOOP_CONF/masters
Next, update the ‘slaves’ file on slave server (slave1) with the IP address of the slave node. Notice that the ‘slaves’ file at Slave node contains only its own IP address (or alias) and not of any other slave node in the cluster.
$ vi $HADOOP_CONF/slaves
Similarly update masters and slaves for slave2 and slave3. The slaves files on all four instances will look like:
5.7.8 Hadoop Daemon Startup
The first step to starting up your Hadoop installation is formatting the Hadoop file system, which is implemented on top of the local file systems of your cluster. You will need to do this the first time you set up a Hadoop installation. Do not format a running Hadoop file system, this will cause all of your data to be erased. To format the namenode:
$ hadoop namenode ‐format
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
Now let’s start all hadoop daemons from our master node (namenode).
$ cd HADOOP_CONF
$ start‐all.sh
This starts NameNode, JobTracker, and SecondaryNameNode on our master node (namenode). If we had a second
master node, it would run SecondaryNameNode on it as well.
This starts DataNode and TaskTracker on all slave nodes listed in the $HADOOP_CONF/slaves file (including our
master node in this case).
namenode:
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
slave1:
slave2:
Slave3:
5.7.9 Verify HTTP Status
We can verify the status of Namenode at http://ec2‐54‐189‐60‐175.us‐west‐
2.compute.amazonaws.com:50070/dfshealth.jsp (this is using the public DNS of namenode)
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
We can check the jobtracker status at http://ec2‐54‐189‐60‐175.us‐west‐
2.compute.amazonaws.com:50030/jobtracker.jsp (this using the public DNS of namenode)
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
We can check the status of the slave nodes at http://ec2‐54‐184‐198‐174.us‐west‐
2.compute.amazonaws.com:50060/tasktracker.jsp (using public DNS for slave1, just change public DNS to check
other slave nodes)
6.0 Run Map Reduce Programs
6.1 What is Map‐Reduce
Hadoop map‐reduce accepts input parameters for a key, which is a key‐value pair from a document, and
a value, which is also a key‐value pair from a document, it then combines (maps) all of the keys with an
equal value into a single document and then allows the programmer to perform any calculations
necessary with the value data(reduce). The general data flow for a hadoop map‐reduce job using
MongoDB for input and output looks as follows:
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
Figure 1: Generalized Map‐Reduce Sequence Diagram
For example, in this project there is a map reduce program called AVGperNPI (Appendices A, B,
& C), it takes in the key‐value pair “Provider_NPI_Number” for the key for the mapper, and it takes in
the key‐value pair “Covered Paid Amount” as the value. The mapper then combines all claims that have
the same NPI number and sends the NPI number and a list of all the “Covered Paid Amount” values as
doubles (or DoubleWritable) to the Reducer. The reducer then iterates through the list of doubles, sums
them together, and divides by the number of elements in the list to calculate the average claim amount
for all the claims with the matching NPI number. The reducer writes the NPI number and the calculated
average to a BSON object and writes this BSON object into a new collection in MongoDB. This new
document will look something like:
{ "_id" : ObjectId("5320855cac4f1874ff31ce7d"), "Provider NPI Number" : NumberLong("2389959670"), "Average Claim Amount" : 613212.3526743455 }
This map‐reduce program maps all claims with the same NPI number together, and reduces
them to one document with the average claim amount for the NPI number listed instead of the
individual claim amounts. It should be noted, that doubles, floats, ints, etc. cannot be directly used for a
mapper key or passed to the reducer. Instead classes that implement Writable must be used, such as
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
DoubleWritable, FloatWritable, IntWritable, etc. A complete list of these classes is in the
org.apache.hadoop.io package which can be viewed here
(http://hadoop.apache.org/docs/current/api/index.html). Hadoop cannot read or write any data that is
not contained in a class that implements Writable, because Hadoop needs the data in a serialized format
that Writable provides.
6.1.1 Mapper Code
We are going to look at the code in Appendix A that is going to look at our Provider collection and calculate the
average of all of the claims per unique provider or NPI Number. To do this we will be mapping claim data to each
individual NPI number. The MongoDB documents that we generated with our simulation data generator in section
2.4 for the Providers collection look like the following:
Provider NPI Number
Provider Data
Subdocument Claim1
Covered Paid Amount
Other Claim1 data
Subdocument Claim2
Covered Paid Amount
Other Claim2 data
…
Subdocument Claim(n)
Covered Paid Amount
Other Claim(n) data
Since we are calculating the average value of subdocuments per document, the generalized form of the mapper
output key‐value pair that will be passed to the Reducer will be Document ID : Subdocument Claim Value (Covered
Paid Amount). In this code, since our document ID is the NPI number(long) for our provider and our subdocument
claim value is the value in the key‐value pair of “Covered Paid Amount” : Number(double), we will have to use a
mapper output key‐value pair of LongWritable:DoubleWritable. This key is set in the code by the highlighted
portion of the following line:
public class AvgPerNPIMapper extends Mapper<Object, BSONObject, LongWritable, DoubleWritable> {
The first two entries here, Object and BSONObject, are the mapper input’s format. Since MongoDB stores all of its
data in BSON format, which are binary JSON files, we use the class BSONObject for the input value format, and the
generalized Object class for the input key format. These need to match the values in the map function shown
below:
@Override public void map(final Object pKey, final BSONObject pValue, final Context pContext)
The last parameter in this method, pContext, is what will be read by the reducer, so this is where we will write our
key‐value pairs that we want to send to the reducer.
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
Since we are passing in one MongoDB document at a time, and each document is one Provider with n number of
claims, and we want to map each NPI Number to the covered value of each claim we will have to iterate through
all of the claims in each document and map the covered paid amount to the NPI number and then send that key‐
value pair to the reducer. First, to get the NPI number, we use the following code:
long npi = (long)pValue.get("Provider_NPI_Number");
Then to iterate through the claims for that particular provider, we will use the following loop:
while(true) { try { i++; temp = ((BSONObject)pValue.get("Claim " + Integer.toString(i))); pContext.write(new LongWritable(npi), new DoubleWritable((double)temp.get("Covered Paid Amount"))); } //when there are no more claims for this Provider exit loop catch(Throwable t) { System.out.println("Claim " + i + " Does Not Exist"); break; } }
This loop will iterate through claims until it tries to find a data for a claim that does not exist. This means that if a
provider has 2309 this loop will run until it tries to pull data for claim 2310, and then it will catch an exception
which will cause the loop to terminate. The iteration was done in this way because we don’t want to have to know
how many claims each provider has and manually set it in the code each time we run a map‐reduce job on new
data. This also allows us to easily perform the map‐reduce job on data sets where we have a different number of
claims per provider.
In the above code, first we read in the entire claim subdocument as a BSONObject with:
i++; temp = ((BSONObject)pValue.get("Claim " + Integer.toString(i)));
Then the output key‐value pair for the mapper is written to pContext with:
pContext.write(new LongWritable(npi), new DoubleWritable((double)temp.get("Covered Paid Amount")));
Notice that we are writing in the NPI number (npi) as a LongWritable and the covered paid amount as a
DoubleWritable as a key‐value pair into pContext which is then accessed by the reducer.
The last section in the reducer that should be mentioned is the termination condition for the loop. Because the
number of claims for each provider is unknown, the loop is terminated by a break statement when the mapper
tries to access a claim that has a higher claim number than the final claim for that provider. This is done by the
following code:
catch(Throwable t) { System.out.println("Claim " + i + " Does Not Exist"); break; }
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
6.1.2 Reducer Code
In the reducer, the key‐value pair being received as input from the mapper needs to match the key that you
defined in the mapper and will be defined in the class declaration:
public class AvgPerNPIReducer extends Reducer<LongWritable, DoubleWritable, BSONWritable, BSONWritable>
The last two entries here are the Key‐Value pairs for the output. Since MongoDB stores all of its data in BSON files,
we are going to use BSON‐Writable for both of these.
The reducer is going to receive everything that was written into the mapper pContext variable and split it into
groups by the key and then allows the programmer to do whatever he wants with the values. So if you had a
provider with 2409 claims, you would get a single LongWritable key for the NPI number with a collection of
DoubleWritables for the claim values. These values are then parameters for the reduce function:
@Override public void reduce(final LongWritable pKey, final Iterable<DoubleWritable> pValues,final Context pContext)
The values pKey and pValues are the key‐value pair that was passed in from the mapper through the mapper
pContext variable. Note that the above pContext variable is not the same as the one in the mapper, it is just the
name used for data that will be written to the output after the reduce function runs.
The reducer is very simple, it just takes the average with the following code:
double avg = 0; int cnt = 0;
for (final DoubleWritable value : pValues) { cnt++; avg += value.get(); } avg = avg/cnt;
Then it writes both the NPI number and the average value to a BasicBSONObject.
BasicBSONObject output = new BasicBSONObject(); output.put("Provider NPI Number", pKey.get()); output.put("Average Claim Amount", avg);
Finally this BSON object is written to pContext, which is written back into MongoDB after the reducer runs.
pContext.write(null, new BSONWritable(output));
The first parameter above is used for specifying the document _id field, if it is set to null then MongoDB will
automatically generate a unique _id, whereas if you wanted to specify an _id then you could simply put the desired
id number instead of null. The second parameter tells the writer which
6.1.3 XML Configuration File
All of the options for running the map‐reduce jobs are stored in an xml file that is named test.xml, which can be
found in Appendix D. The TestXMLCoinfig.java file in Appendix C tells the map reduce function where to find the
configuration file with the code:
Configuration.addDefaultResource("test.xml");
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
The options that we need to pay attention to in this file are:
1. mongo.input.uri
2. mongo.output.uri
3. mongo.job.mapper
4. mongo.job.reducer
5. mongo.job.input.format
6. mongo.job.output.format
7. mongo.job.output.key
8. mongo.job.output.value
9. mongo.job.mapper.output.key
10. mongo.job.mapper.output.value
mongo.input.uri
This should be set to the URI of your mongoDB which should be:
<property> <!‐‐ If you are reading from mongo, the URI ‐‐> <name>mongo.input.uri</name> <value>mongodb://ec2‐54‐203‐82‐12.us‐west‐2.compute.amazonaws.com/testData.Provider</value> </property>
Where ec2‐54‐203‐82‐12.us‐west‐2.compute.amazonaws.com is the public DNS of your server running
MongoDB, testData is the name of the database, and Provider is the name of the collection in the
database. Alternatively you could probably use the alias “mongo” that we setup earlier, but it wasn’t
tested it this way.
mongo.output.uri
This should likewise be set to:
<property> <!‐‐ If you are writing to mongo, the URI ‐‐> <name>mongo.output.uri</name> <value>mongodb://ec2‐54‐203‐82‐12.us‐west‐2.compute.amazonaws.com/testData.AvgPerNPI</value> </property>
mongo.job.mapper
This should be set to the class path of the mapper class:
<property> <!‐‐ Class for the mapper ‐‐> <name>mongo.job.mapper</name> <value>avgPerNPI.AvgPerNPIMapper</value> </property>
mongo.job.reducer
This should be set to the class path of the reducer class:
<property> <!‐‐ Reducer class ‐‐> <name>mongo.job.reducer</name> <value>avgPerNPI.AvgPerNPIReducer</value>
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
</property>
mongo.job.input.format
When using MongoDB as input, this should be set to:
<property> <!‐‐ InputFormat Class ‐‐> <name>mongo.job.input.format</name> <value>com.mongodb.hadoop.MongoInputFormat</value> </property>
mongo.job.output.format
When using MongoDB as output, this should be set to:
<property> <!‐‐ OutputFormat Class ‐‐> <name>mongo.job.output.format</name> <value>com.mongodb.hadoop.MongoOutputFormat</value> </property>
mongo.job.output.key
This should be set to the same class as the mapper output key:
<property> <!‐‐ Output key class for the output format ‐‐> <name>mongo.job.output.key</name> <value>org.apache.hadoop.io.LongWritable</value> </property>
Where the mapper output key we used was:
public class AvgPerNPIMapper extends Mapper<Object, BSONObject, LongWritable, DoubleWritable> {
mongo.job.output.value
This should be set to the class used for the final output of the map‐reduce job:
<property> <!‐‐ Output value class for the output format ‐‐> <name>mongo.job.output.value</name> <value>com.mongodb.hadoop.io.BSONWritable</value> </property>
Where the final output class for the map‐reduce job is:
public class AvgPerNPIReducer extends Reducer<LongWritable, DoubleWritable, BSONWritable, BSONWritable>
mongo.job.mapper.output.key
If you download the example code from the mongo‐hadoop connector on gitHub, it will say that this is
optional, but it is not. This must be set to the same class as the mapper output key in the key‐value pair:
<property> <!‐‐ Output key class for the output format ‐‐> <name>mongo.job.output.key</name>
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
<value>org.apache.hadoop.io.LongWritable</value> </property>
Where the mapper output key that was used was:
public class AvgPerNPIMapper extends Mapper<Object, BSONObject, LongWritable, DoubleWritable> {
mongo.job.mapper.output.value
If you download the example code from the mongo‐hadoop connector on gitHub, it will say that this is
optional, but it is not. This must be set to the same class as the mapper output value in the key‐value pair:
<property> <!‐‐ Output value class for the mapper ‐‐> <name>mongo.job.mapper.output.value</name> <value>org.apache.hadoop.io.DoubleWritable</value> </property>
Where the mapper output value that was used was:
public class AvgPerNPIMapper extends Mapper<Object, BSONObject, LongWritable, DoubleWritable> {
6.2 Build & Run Map Reduce Jars
Install Cygwin which can be found here (http://cygwin.com/install.html). When installing Cygwin it is
recommended that you choose a path without spaces, because Linux file paths with spaces in them are annoying
to deal with.
Make sure to install git: Distributed version control system.
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
If you want to install any other packages, you can always just run the setup.exe again and it will allow you to add
new packages at any time.
To download this code from gitHub, you can simply run the command from a Cygwin Terminal:
$ cd /cygdrive/c/Users/TFischer/Documents/workspace (or wherever your workspace directory is)
$ git clone http://[email protected]/stash/scm/hccp/mapreduceprograms.git
When prompted for a password, enter “issuemanager”. Alternatively if you don’t want to install Cygwin you can do
it in an amazon cloud instance and copy the code over from there with WinSCP like we did earlier for the
simulation data generator. When finished, the data should be placed in a directory named mapreduceprograms.
Launch Eclipse and select your workspace repository where you just downloaded the files.
Once eclipse loads, go to File > Import and select “Existing Project into Workspace”.
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
On the Import Projects page, select “Select root directory:” and find our “mapreduceprograms” directory.
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
Select all of the projects and then click Finish.
You should now see these projects listed in the Project Explorer.
6.2.1 Build JAR Files
To Build the JAR file for the AVGperNPI map‐reduce program, right‐click on the package located under the src
directory (avgPerNPI) hold Ctrl and select test.xml as well.
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
and select Export.
Choose JAR file
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
Choose a name for the JAR file and click finish.
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
You can build the JAR files for all of the other projects in the same way, just be sure to export both the package
and the configuration file named test.xml to the JAR.
6.2.2 Run JAR Files
Now to run the JAR file, copy it over to your Amazon Instance master Node using WinSCP.
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
Once it is copied over, assuming that you still have Hadoop and MongoDB running, you can run it by typing the
following command.
$ hadoop jar testAPN.jar avgPerNPI.TestXMLConfig
This will run the Program that calculates the average claim amount per Provider (NPI number). When Running it
will give console updates that look like:
The status can also be checked online using the jobtracker: http://ec2‐54‐189‐60‐175.us‐west‐
2.compute.amazonaws.com:50070/dfshealth.jsp (this is using the public DNS of namenode)
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
When it finishes it will print this report
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
The map‐reduce program to calculate the average claim amount per NPI number was run using the cluster that
was setup in this document with 5 medium instances that were utilized as 1 master/slave node, 3 slave nodes, and
1 dedicated node for MongoDB. It was run multiple times on databases with 1 million, 100 million, and 200 million
claims. The times for completion of the code were as follows:
Number of Claim Records Average Execution Time
1 million 2 minutes, 9 seconds (~ 2 minutes)
100 million 3 hours, 7 minutes, 14 seconds (~ 187 minutes)
200 million 6 hours, 9 minutes, 23 seconds (~ 369 minutes)
When running 1 million records in roughly 2 minutes, it was estimated that 100 million records would run in about
200 minutes, however the average time proved to be slightly lower at 188 minutes. The reason for this is probably
related to Hadoop setup times where the program is initializing but not actually running any map‐reduce jobs.
With larger sets of data it would be expected that Hadoop setup times would make up a smaller percentage of the
overall execution time and therefore have a smaller affect. This proved true when it was run on 200 million
records. With running 100 million records at roughly 187 minutes, it would be expected that 200 million records
would run double that and be somewhere around 374 minutes. The map‐reduce job with 200 million records
averaged at 369 minutes which is close enough to the estimated 374 to determine that the time is going to
increase linearly with an increase in database size.
6.2.3 Other JAR Files
The other JAR files can be ran by following the same format:
$ hadoop jar WhateverYouNamedTheJar.jar PackageName.TestXMLConfig
The map‐reduce programs that are included in the download that was retrieved form gitHub include:
1. AVGperNPI
This map‐reduce program returns the average claim amount per Provider .
Run with command
$ hadoop jar WhateverYouNamedTheJar.jar avgPerNPI.TestXMLConfig
2. AVGperNPIperYear
This map‐reduce program returns the average claim amount per Provider per year.
Run with command
$ hadoop jar WhateverYouNamedTheJar.jar avgPerNPI.year.TestXMLConfig
3. StateOverMillion_MR
This map‐reduce program returns all claims that are valued over one million and the state of the
beneficiary who received the pay‐out.
Run with command
$ hadoop jar WhateverYouNamedTheJar.jar overMillionByState.TestXMLConfig
4. StateTherapyCode
This map‐reduce program returns the number of each therapy code per state.
Run with command
$ hadoop jar WhateverYouNamedTheJar.jar stateTherapyCode.TestXMLConfig
5. Hadoop_MR_Test
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
This map‐reduce program separates the data by NPI number, therapy code, year, month, and
then gives the total number of beneficiaries and the total sum of the value of the claims.
Run with command
$ hadoop jar WhateverYouNamedTheJar.jar mapReduce.TestXMLConfig
Note: The StateOverMillion_MR and StateTherapyCode run slowly on this code because they require reading data
from both the Beneficiary and the Provider Collection so I used the mongo‐hadoop connector to read in from the
Provider collection and the mongo java driver to read in from the Beneficiary Collection. The mongo driver kills
performance because it has to query the database every time it needs data, which on large data sets can be
hundreds of millions of times. There is a way to use multiple collections as input data for the mongo‐hadoop
connector which should improve the performance of this code significantly, but I couldn’t figure out how to do it.
There is some example code for using multiple collections as input with the MultiCollectionSplitBuilder class in the
mongo‐hadoop examples that can be found here (https://github.com/mongodb/mongo‐
hadoop/blob/master/examples/treasury_yield/src/main/java/com/mongodb/hadoop/examples/treasury/Treasur
yYieldMulti.java), however I could never figure out what configuration to put in the XML file for the input URI when
using the MultiCollectionSplitBuilder.
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
Appendix A: AvgPerNPIMapper
package avgPerNPI; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.DoubleWritable; import org.apache.hadoop.mapreduce.Mapper; import org.bson.BSONObject; import java.io.IOException; public class AvgPerNPIMapper extends Mapper<Object, BSONObject, LongWritable, DoubleWritable> { @Override public void map(final Object pKey, final BSONObject pValue, final Context pContext) throws IOException, InterruptedException { BSONObject temp; long npi = (long)pValue.get("Provider_NPI_Number"); int i = 0; while(true) { try { i++; temp = ((BSONObject)pValue.get("Claim " + Integer.toString(i))); pContext.write(new LongWritable(npi), new DoubleWritable((double)temp.get("Covered Paid Amount"))); } catch(Throwable t) { break; }//end try/catch }//end while(true) } }
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
Appendix B: AvgPerNPIReducer
package avgPerNPI; import com.mongodb.hadoop.io.BSONWritable; import org.apache.hadoop.mapreduce.Reducer; import org.bson.BasicBSONObject; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.DoubleWritable; public class AvgPerNPIReducer extends Reducer<LongWritable, DoubleWritable, BSONWritable, BSONWritable> { @Override public void reduce(final LongWritable pKey, final Iterable<DoubleWritable> pValues, final Context pContext) throws IOException, InterruptedException { double avg = 0; int cnt = 0; for (final DoubleWritable value : pValues) { cnt++; avg += value.get(); } avg = avg/cnt; BasicBSONObject output = new BasicBSONObject(); output.put("Provider NPI Number", pKey.get()); output.put("Average Claim Amount", avg); pContext.write(null, new BSONWritable(output)); } }
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
Appendix C: TestXMLConfig
package avgPerNPI; import com.mongodb.hadoop.util.MongoTool; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.util.ToolRunner; public class TestXMLConfig extends MongoTool { static { Configuration.addDefaultResource("test.xml"); } public static void main(final String[] pArgs) throws Exception { System.exit(ToolRunner.run(new TestXMLConfig(), pArgs)); } }
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
Appendix D: test.xml
<?xml version="1.0"?> <configuration> <property> <!‐‐ run the job verbosely ? ‐‐> <name>mongo.job.verbose</name> <value>false</value> </property> <property> <!‐‐ Run the job in the foreground and wait for response, or background it? ‐‐> <name>mongo.job.background</name> <value>false</value> </property> <property> <!‐‐ The field to pass as the mapper key. Defaults to _id if blank ‐‐> <name>mongo.input.key</name> <value></value> </property> <property> <!‐‐ If you are reading from mongo, the URI ‐‐> <name>mongo.input.uri</name> <value>mongodb://ec2‐54‐203‐82‐12.us‐west‐2.compute.amazonaws.com/testData.Provider</value> </property> <property> <!‐‐ If you are writing to mongo, the URI ‐‐> <name>mongo.output.uri</name> <value>mongodb://ec2‐54‐203‐82‐12.us‐west‐2.compute.amazonaws.com/testData.AvgPerNPI</value> </property> <property> <!‐‐ The query, in JSON, to execute [OPTIONAL] ‐‐> <name>mongo.input.query</name> <!‐‐<value>{"x": {"$regex": "^eliot", "$options": ""}}</value>‐‐> <value></value> </property> <property> <!‐‐ The fields, in JSON, to read [OPTIONAL] ‐‐> <name>mongo.input.fields</name> <value></value> </property> <property> <!‐‐ A JSON sort specification for read [OPTIONAL] ‐‐> <name>mongo.input.sort</name> <value></value> </property> <property> <!‐‐ The number of documents to limit to for read [OPTIONAL] ‐‐> <name>mongo.input.limit</name> <value>0</value> <!‐‐ 0 == no limit ‐‐> </property> <property> <!‐‐ The number of documents to skip in read [OPTIONAL] ‐‐> <!‐‐ TODO ‐ Are we running limit() or skip() first? ‐‐> <name>mongo.input.skip</name> <value>0</value> <!‐‐ 0 == no skip ‐‐> </property> <property> <!‐‐ Class for the mapper ‐‐> <name>mongo.job.mapper</name> <value>avgPerNPI.AvgPerNPIMapper</value> </property> <property> <!‐‐ Reducer class ‐‐> <name>mongo.job.reducer</name> <value>avgPerNPI.AvgPerNPIReducer</value> </property> <property>
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved
<!‐‐ InputFormat Class ‐‐> <name>mongo.job.input.format</name> <value>com.mongodb.hadoop.MongoInputFormat</value> </property> <property> <!‐‐ OutputFormat Class ‐‐> <name>mongo.job.output.format</name> <value>com.mongodb.hadoop.MongoOutputFormat</value> </property> <property> <!‐‐ Output key class for the output format ‐‐> <name>mongo.job.output.key</name> <value>org.apache.hadoop.io.LongWritable</value> </property> <property> <!‐‐ Output value class for the output format ‐‐> <name>mongo.job.output.value</name> <value>com.mongodb.hadoop.io.BSONWritable</value> </property> <property> <!‐‐ Output key class for the mapper [optional] ‐‐> <name>mongo.job.mapper.output.key</name> <value></value> <value>org.apache.hadoop.io.LongWritable</value> </property> <property> <!‐‐ Output value class for the mapper [optional] ‐‐> <name>mongo.job.mapper.output.value</name> <value>org.apache.hadoop.io.DoubleWritable</value> </property> <property> <!‐‐ Partitioner class [optional] ‐‐> <name>mongo.job.partitioner</name> <value></value> </property> <property> <!‐‐ Sort Comparator class [optional] ‐‐> <name>mongo.job.sort_comparator</name> <value></value> </property> </configuration>
SGT Innovation Center Big Data Lab Mongo DB and Hadoop White Paper ©2015 SGT, Inc. All Rights Reserved