mongodb hadoop integration

69
MongoDB – Hadoop Integration Senior Solutions Architect, MongoDB Inc. Massimo Brignoli #massimobrignoli

Upload: massimo-brignoli

Post on 26-Jan-2015

119 views

Category:

Technology


0 download

DESCRIPTION

MongoDB and Hadoop usually are seen as separate worlds. In this presentation we will show how to use them combined and to use MongoDB as a filesystem for Hadoop.

TRANSCRIPT

Page 1: MongoDB Hadoop Integration

MongoDB – Hadoop Integration

Senior Solutions Architect, MongoDB Inc.

Massimo Brignoli

#massimobrignoli

Page 2: MongoDB Hadoop Integration

We will Cover…

• A quick briefing on what MongoDB and Hadoop are

• The Mongo-Hadoop connector:– What it is– How it works– A tour of what it can do

Page 3: MongoDB Hadoop Integration

MongoDB and Hadoop Overview

Page 4: MongoDB Hadoop Integration

MongoDB

• document-oriented database with dynamic schema

Page 5: MongoDB Hadoop Integration

MongoDB

• document-oriented database with dynamic schema

• stores data in JSON-like documents: { _id : “mike”, age : 21, location : {

state : ”NY”, zip : ”11222” },favorite_colors : [“red”, “green”] }

Page 6: MongoDB Hadoop Integration

MongoDB

• Scales horizontally

Page 7: MongoDB Hadoop Integration

MongoDB

• Scales horizontally

• With sharding to handle lots of data and load

Page 8: MongoDB Hadoop Integration

MongoDB

• Scales horizontally

• With sharding to handle lots of data and load

Page 9: MongoDB Hadoop Integration

MongoDB

• Scales horizontally

• With sharding to handle lots of data and load

Page 10: MongoDB Hadoop Integration

Hadoop

• Java-based framework for Map/Reduce

• Excels at batch processing on large data sets by taking advantage of parallelism

Page 11: MongoDB Hadoop Integration

Mongo-Hadoop Connector - Why

• Lots of people using Hadoop and Mongo separately, but need integration

• Need to process data across multiple sources

• Custom code or slow and hacky import/export scripts often used to get data in+out

• Scalability and flexibility with changes in

• Scalability and flexibility with changes in Hadoop or MongoDB configurations

Page 12: MongoDB Hadoop Integration

ResultsInput data HadoopCluster

-or- -or-

.BSON .BSON

Mongo-Hadoop Connector

• Turn MongoDB into a Hadoop-enabled filesystem: use as the input or output for Hadoop

• New Feature: As of v1.1, also works with MongoDB backup files (.bson)

Page 13: MongoDB Hadoop Integration

Benefits and Features

• Takes advantage of full multi-core parallelism to process data in Mongo

• Full integration with Hadoop and JVM ecosystems

• Can be used with Amazon Elastic MapReduce

• Can read and write backup files from local filesystem, HDFS, or S3

Page 14: MongoDB Hadoop Integration

Benefits and Features

• Vanilla Java MapReduce

• or if you don’t want to use Java, support for Hadoop Streaming.

• write MapReduce code in

Page 15: MongoDB Hadoop Integration

Benefits and Features

• Support for Pig– high-level scripting language for data analysis

and building map/reduce workflows

• Support for Hive– SQL-like language for ad-hoc queries + analysis

of data sets on Hadoop-compatible file systems

Page 16: MongoDB Hadoop Integration

How It Works

• Adapter examines the MongoDB input collection and calculates a set of splits from the data

• Each split gets assigned to a node in Hadoop cluster

• In parallel, Hadoop nodes pull data for splits from MongoDB (or BSON) and process them locally

• Hadoop merges results and streams output back to MongoDB or BSON

Page 17: MongoDB Hadoop Integration

Tour of Mongo-Hadoop, by Example

Page 18: MongoDB Hadoop Integration

Tour of Mongo-Hadoop, by Example

• Using Java MapReduce with Mongo-Hadoop

• Using Hadoop Streaming

• Pig and Hive with Mongo-Hadoop

• Elastic MapReduce + BSON

Page 19: MongoDB Hadoop Integration

{

"_id" : ObjectId("4f2ad4c4d1e2d3f15a000000"),

"body" : "Here is our forecast\n\n ",

"filename" : "1.",

"headers" : {

"From" : "[email protected]",

"Subject" : "Forecast Info",

"X-bcc" : "",

"To" : "[email protected]",

"X-Origin" : "Allen-P",

"X-From" : "Phillip K Allen",

"Date" : "Mon, 14 May 2001 16:39:00 -0700 (PDT)",

"X-To" : "Tim Belden ",

"Message-ID" : "<18782981.1075855378110.JavaMail.evans@thyme>",

"Content-Type" : "text/plain; charset=us-ascii",

"Mime-Version" : "1.0"

}

}

Input Data: Enron e-mail corpus (501k records, 1.75Gb)

Page 20: MongoDB Hadoop Integration

{

"_id" : ObjectId("4f2ad4c4d1e2d3f15a000000"),

"body" : "Here is our forecast\n\n ",

"filename" : "1.",

"headers" : {

"From" : "[email protected]",

"Subject" : "Forecast Info",

"X-bcc" : "",

"To" : "[email protected]",

"X-Origin" : "Allen-P",

"X-From" : "Phillip K Allen",

"Date" : "Mon, 14 May 2001 16:39:00 -0700 (PDT)",

"X-To" : "Tim Belden ",

"Message-ID" : "<18782981.1075855378110.JavaMail.evans@thyme>",

"Content-Type" : "text/plain; charset=us-ascii",

"Mime-Version" : "1.0"

}

}

Input Data: Enron e-mail corpus (501k records, 1.75Gb)

sender

Page 21: MongoDB Hadoop Integration

{

"_id" : ObjectId("4f2ad4c4d1e2d3f15a000000"),

"body" : "Here is our forecast\n\n ",

"filename" : "1.",

"headers" : {

"From" : "[email protected]",

"Subject" : "Forecast Info",

"X-bcc" : "",

"To" : "[email protected]",

"X-Origin" : "Allen-P",

"X-From" : "Phillip K Allen",

"Date" : "Mon, 14 May 2001 16:39:00 -0700 (PDT)",

"X-To" : "Tim Belden ",

"Message-ID" : "<18782981.1075855378110.JavaMail.evans@thyme>",

"Content-Type" : "text/plain; charset=us-ascii",

"Mime-Version" : "1.0"

}

}

Input Data: Enron e-mail corpus (501k records, 1.75Gb)

sender

recipients

Page 22: MongoDB Hadoop Integration

The Problem

Let’s use Hadoop to build a graph of (senders → recipients) and the count of messages exchanged between each pair

Page 23: MongoDB Hadoop Integration

The Output Required

alice1499

bob 48 charlie

20eve9

{"_id": {"t":"[email protected]", "f":"[email protected]"}, "count" : 14}{"_id": {"t":"[email protected]", "f":"[email protected]"}, "count" : 9}{"_id": {"t":"[email protected]", "f":"[email protected]"}, "count" : 99}{"_id": {"t":"[email protected]", "f":"[email protected]"}, "count" : 48}{"_id": {"t":"[email protected]", "f":"[email protected]"}, "count" : 20}

Page 24: MongoDB Hadoop Integration

Example 1 - Java MapReduce

Page 25: MongoDB Hadoop Integration

Map Phase

Map phase - each input doc gets passed through a Mapper function

@Override

public void map(NullWritable key, BSONObject val, final Context context){

BSONObject headers = (BSONObject)val.get("headers");

if(headers.containsKey("From") && headers.containsKey("To")){

String from = (String)headers.get("From");

String to = (String)headers.get("To");

String[] recips = to.split(",");

for(int i=0;i<recips.length;i++){

String recip = recips[i].trim();

context.write(new MailPair(from, recip), new IntWritable(1)); }

}

}

Page 26: MongoDB Hadoop Integration

Reduce Phase

Map phase - each input doc gets passed through a Mapper function

@Override

public void map(NullWritable key, BSONObject val, final Context context){

BSONObject headers = (BSONObject)val.get("headers");

if(headers.containsKey("From") && headers.containsKey("To")){

String from = (String)headers.get("From");

String to = (String)headers.get("To");

String[] recips = to.split(",");

for(int i=0;i<recips.length;i++){

String recip = recips[i].trim();

context.write(new MailPair(from, recip), new IntWritable(1)); }

}

}

mongoDB document passed into Hadoop MapReduce

Page 27: MongoDB Hadoop Integration

Reduce Phase

Reduce phase - outputs of Map are grouped together by key and passed to Reducer

public void reduce( final MailPair pKey,

final Iterable<IntWritable> pValues,

final Context pContext ){

int sum = 0;

for ( final IntWritable value : pValues ){ sum += value.get();}

BSONObject outDoc = new BasicDBObjectBuilder().start()

.add( "f" , pKey.from)

.add( "t" , pKey.to )

.get();

BSONWritable pkeyOut = new BSONWritable(outDoc);

pContext.write( pkeyOut, new IntWritable(sum) );

}

Page 28: MongoDB Hadoop Integration

Reduce Phase

Reduce phase - outputs of Map are grouped together by key and passed to Reducer

public void reduce( final MailPair pKey,

final Iterable<IntWritable> pValues,

final Context pContext ){

int sum = 0;

for ( final IntWritable value : pValues ){ sum += value.get();}

BSONObject outDoc = new BasicDBObjectBuilder().start()

.add( "f" , pKey.from)

.add( "t" , pKey.to )

.get();

BSONWritable pkeyOut = new BSONWritable(outDoc);

pContext.write( pkeyOut, new IntWritable(sum) );

}

the {to, from} key

Page 29: MongoDB Hadoop Integration

Reduce Phase

Reduce phase - outputs of Map are grouped together by key and passed to Reducer

public void reduce( final MailPair pKey,

final Iterable<IntWritable> pValues,

final Context pContext ){

int sum = 0;

for ( final IntWritable value : pValues ){ sum += value.get();}

BSONObject outDoc = new BasicDBObjectBuilder().start()

.add( "f" , pKey.from)

.add( "t" , pKey.to )

.get();

BSONWritable pkeyOut = new BSONWritable(outDoc);

pContext.write( pkeyOut, new IntWritable(sum) );

}

list of all the values collected under the key

the {to, from} key

Page 30: MongoDB Hadoop Integration

Reduce Phase

Reduce phase - outputs of Map are grouped together by key and passed to Reducer

public void reduce( final MailPair pKey,

final Iterable<IntWritable> pValues,

final Context pContext ){

int sum = 0;

for ( final IntWritable value : pValues ){ sum += value.get();}

BSONObject outDoc = new BasicDBObjectBuilder().start()

.add( "f" , pKey.from)

.add( "t" , pKey.to )

.get();

BSONWritable pkeyOut = new BSONWritable(outDoc);

pContext.write( pkeyOut, new IntWritable(sum) );

}

output written back to MongoDB

list of all the values collected under the key

the {to, from} key

Page 31: MongoDB Hadoop Integration

Read from MongoDB

mongo.job.input.format=com.mongodb.hadoop.MongoInputFormat

mongo.input.uri=mongodb://my-db:27017/enron.messages

Page 32: MongoDB Hadoop Integration

Read from BSON

mongo.job.input.format=com.mongodb.hadoop.BSONFileInputFormat

mapred.input.dir= file:///tmp/messages.bson

hdfs:///tmp/messages.bson

s3:///tmp/messages.bson

Page 33: MongoDB Hadoop Integration

Write Output to MongoDB

mongo.job.output.format=com.mongodb.hadoop.MongoOutputFormat

mongo.output.uri=mongodb://my-db:27017/enron.results_out

Page 34: MongoDB Hadoop Integration

Write Output to BSON

mongo.job.output.format=com.mongodb.hadoop.BSONFileOutputFormat

mapred.output.dir= file:///tmp/results.bson

hdfs:///tmp/results.bson

s3:///tmp/results.bson

Page 35: MongoDB Hadoop Integration

Write Output to BSON

mongos> db.streaming.output.find({"_id.t": /^kenneth.lay/})

{ "_id" : { "t" : [email protected], "f" : "[email protected]" }, "count" : 1 }

{ "_id" : { "t" : [email protected], "f" : "[email protected]" }, "count" : 1 }

{ "_id" : { "t" : [email protected], "f" : "[email protected]" }, "count" : 2 }

{ "_id" : { "t" : [email protected], "f" : "[email protected]" }, "count" : 2 }

{ "_id" : { "t" : [email protected], "f" : "[email protected]" }, "count" : 4 }

{ "_id" : { "t" : [email protected], "f" : "[email protected]" }, "count" : 1 }

{ "_id" : { "t" : [email protected], "f" : "[email protected]" }, "count" : 1 }

...

has more

Page 36: MongoDB Hadoop Integration

Example 2 - Hadoop Streaming

Page 37: MongoDB Hadoop Integration

Example 2 - Hadoop Streaming

• Let’s do the same Enron Map/Reduce job with Python instead of Java

$ pip install pymongo_hadoop

Page 38: MongoDB Hadoop Integration

Example 2 - Hadoop Streaming

• Hadoop passes data to an external process via STDOUT/STDIN

hadoop (JVM)

STDINPython / Ruby / JS

interpreter

def mapper(documents):STDOUT...

map(k, v)

map(k, v)

Page 39: MongoDB Hadoop Integration

from pymongo_hadoop import BSONMapper

def mapper(documents):

i=0

for doc in documents:

i=i+1

from_field = doc['headers']['From']

to_field = doc['headers']['To']

recips = [x.strip() for x in to_field.split(',')]

for r in recips:

yield {'_id': {'f':from_field, 't':r}, 'count': 1}

BSONMapper(mapper)

print >> sys.stderr, "Done Mapping."

Mapper in Python

Page 40: MongoDB Hadoop Integration

from pymongo_hadoop import BSONReducer

def reducer(key, values):

print >> sys.stderr, "Processing from/to %s" % str(key)

_count = 0

for v in values:

_count += v['count']

return {'_id': key, 'count': _count}

BSONReducer(reducer)

Reducer in Python

Page 41: MongoDB Hadoop Integration

Example 3 - Mongo-Hadoop with Pig and Hive

Page 42: MongoDB Hadoop Integration

Mongo-Hadoop and Pig

• Let’s do the same thing yet again, but this time using Pig

• Pig is a powerful language that can generate sophisticated map/reduce workflows from simple scripts

• Can perform JOIN, GROUP, and execute user-defined functions (UDFs)

Page 43: MongoDB Hadoop Integration

Loading/Writing Data

Pig directives for loading data: BSONLoader and MongoLoader

data = LOAD 'mongodb://localhost:27017/db.collection’ using

com.mongodb.hadoop.pig.MongoLoader;

Writing data out BSONStorage and MongoInsertStorage

STORE records INTO 'file:///output.bson’ using

com.mongodb.hadoop.pig.BSONStorage;

Page 44: MongoDB Hadoop Integration

Datatype Conversion

• Pig has its own special datatypes:– Bags– Maps– Tuples

• Mongo-Hadoop Connector intelligently converts between Pig datatypes and MongoDB datatypes

Page 45: MongoDB Hadoop Integration

raw = LOAD 'hdfs:///messages.bson’ using com.mongodb.hadoop.pig.BSONLoader('','headers:[]') ;

send_recip = FOREACH raw GENERATE $0#'From' as from, $0#'To' as to;

send_recip_filtered = FILTER send_recip BY to IS NOT NULL;

send_recip_split = FOREACH send_recip_filtered GENERATE from as from, TRIM(FLATTEN(TOKENIZE(to))) as to;

send_recip_grouped = GROUP send_recip_split BY (from, to);

send_recip_counted = FOREACH send_recip_grouped GENERATE group, COUNT($1) as count;

STORE send_recip_counted INTO 'file:///enron_results.bson’ using com.mongodb.hadoop.pig.BSONStorage;

Code in Pig

Page 46: MongoDB Hadoop Integration

Mongo-Hadoop and Hive

Similar idea to Pig - process your data without needing to write Map/Reduce code from scratch

...but with SQL as the language of choice

Page 47: MongoDB Hadoop Integration

db.users.find()

{ "_id": 1, "name": "Tom", "age": 28 }

{ "_id": 2, "name": "Alice", "age": 18 }

{ "_id": 3, "name": "Bob", "age": 29 }

{ "_id": 101, "name": "Scott", "age": 10 }

{ "_id": 104, "name": "Jesse", "age": 52 }

{ "_id": 110, "name": "Mike", "age": 32 }

Sample Data: db.users

Page 48: MongoDB Hadoop Integration

CREATE TABLE mongo_users (id int, name string, age int)

STORED BY "com.mongodb.hadoop.hive.MongoStorageHandler"

WITH SERDEPROPERTIES( "mongo.columns.mapping" = "_id,name,age" ) TBLPROPERTIES ( "mongo.uri" = "mongodb://localhost:27017/test.users");

Declare Collection in Hive

Page 49: MongoDB Hadoop Integration

SELECT name,age FROM mongo_users WHERE id > 100 ;

SELECT * FROM mongo_users GROUP BY age WHERE id > 100 ;

SELECT * FROM mongo_users T1

JOIN user_emails T2

WHERE T1.id = T2.id;

Run SQL on it

Page 50: MongoDB Hadoop Integration

DROP TABLE old_users;

INSERT OVERWRITE TABLE old_users SELECT id,name,age FROM mongo_users WHERE age > 100 ;

Write The Results…

Page 51: MongoDB Hadoop Integration

Example 4: Amazon MapReduce

Page 52: MongoDB Hadoop Integration

Example 4

• Usage with Amazon Elastic MapReduce

• Run mongo-hadoop jobs without needing to set up or manage your own Hadoop cluster.

Page 53: MongoDB Hadoop Integration

Bootstrap

• First, make a “bootstrap” script that fetches dependencies (mongo-hadoop jar and java drivers)#!/bin/sh

wget -P /home/hadoop/lib http://central.maven.org/maven2/org/mongodb/mongo-java-driver/2.11.1/mongo-java-driver-2.11.1.jar

wget -P /home/hadoop/lib https://s3.amazonaws.com/mongo-hadoop-code/mongo-hadoop-core_1.1.2-1.1.0.jar

Page 54: MongoDB Hadoop Integration

Bootstrap

Put the bootstrap script, and all your code, into an S3 bucket where Amazon can see it.

s3cp ./bootstrap.sh s3://$S3_BUCKET/bootstrap.sh

s3mod s3://$S3_BUCKET/bootstrap.sh public-read

s3cp $HERE/../enron/target/enron-example.jar s3://$S3_BUCKET/enron-example.jar

s3mod s3://$S3_BUCKET/enron-example.jar public-read

Page 55: MongoDB Hadoop Integration

Launch the Job!

• ...then launch the job from the command line, pointing to your S3 locations

$ elastic-mapreduce --create --jobflow ENRON000--instance-type m1.xlarge--num-instances 5--bootstrap-action s3://$S3_BUCKET/bootstrap.sh--log-uri s3://$S3_BUCKET/enron_logs--jar s3://$S3_BUCKET/enron-example.jar--arg –D --arg mongo.job.input.format=com.mongodb.hadoop.BSONFileInputFormat--arg -D --arg mapred.input.dir=s3n://mongo-test-data/messages.bson--arg -D --arg mapred.output.dir=s3n://$S3_BUCKET/BSON_OUT--arg -D --arg mongo.job.output.format=com.mongodb.hadoop.BSONFileOutputFormat# (any additional parameters here)

Page 56: MongoDB Hadoop Integration

So why Amazon?

• Easy to kick off a Hadoop job, without needing to manage a Hadoop cluster

• Turn up the “num-instances” to make jobs complete faster

• Logs get captured into S3 files

• (Pig, Hive, and streaming work on EMR, too!)

Page 57: MongoDB Hadoop Integration

Example 5: MongoUpdateWritable

Page 58: MongoDB Hadoop Integration

Example 5: New Feature

• In previous examples, we wrote job output data by inserting into a new collection

• ... but we can also modify an existing output collection

• Works by applying mongoDB update modifiers: $push, $pull, $addToSet, $inc, $set, etc.

• Can be used to do incremental Map/Reduce or “join” two collections

Page 59: MongoDB Hadoop Integration

Sample of Data

Let’s say we have two collections.

{"_id": ObjectId("51b792d381c3e67b0a18d0ed"), "name": "730LsRkX","type": "pressure","owner": "steve”}{"_id": ObjectId("51b792d381c3e67b0a18d678"), "sensor_id": ObjectId("51b792d381c3e67b0a18d4a1"),"value": 3328.5895416489802,"timestamp”: ISODate("2013-05-18T13:11:38.709-0400"), "loc": [-175.13,51.658]}

sensors

log events

Page 60: MongoDB Hadoop Integration

Sample of Data

For each owner, we want to calculate how many events were recorded for each type of sensor that logged it.

Bob’s sensors for temperature have stored 1300 readings

Bob’s sensors for pressure have stored 400 readings

Alice’s sensors for humidity have stored 600 readings

Alice’s sensors for temperature have stored 700 readings

etc...

Page 61: MongoDB Hadoop Integration

Stage 1 - Map/Reduce on sensors collection

Sensors(MongoDB Collection)

Log events(MongoDB Collection)

map/reduce

for each sensor, emit:

{key: owner+type, value:

_id}

group data from map() under each

key, output:

{key: owner+type, val: [ list

of _ids] }

Results(MongoDB Collection)

Page 62: MongoDB Hadoop Integration

Stage 1 - Results

After stage one, the output docs look like:

{ "_id": "alice pressure", "sensors": [ObjectId("51b792d381c3e67b0a18d475"), ObjectId("51b792d381c3e67b0a18d16d"), ObjectId("51b792d381c3e67b0a18d2bf"), … ] }

Page 63: MongoDB Hadoop Integration

Stage 1 - Results

After stage one, the output docs look like:

{ "_id": "alice pressure", "sensors": [ ObjectId("51b792d381c3e67b0a18d475"),

ObjectId("51b792d381c3e67b0a18d16d"),ObjectId("51b792d381c3e67b0a18d2bf”…

]}

the sensor’s owner and type

list of ID’s of sensors with this owner and type

Now we just need to count the total # of log events recorded for any sensors that appear in the list for each owner/type group.

Page 64: MongoDB Hadoop Integration

Stage 2 - Map/Reduce on events collection

Sensors(MongoDB Collection)

Log events(MongoDB Collection)

map/reduce

for each sensor, emit:{key: sensor_id, value: 1}

group data from map() under each key for each value in that key:

update({sensors: key}, {$inc : {logs_count:1}})

Results(MongoDB Collection)

update() existing records in mongoDB

context.write(null, new MongoUpdateWritable( query, //which documents to modify update, //how to modify ($inc) true, //upsert false) ); // multi

Page 65: MongoDB Hadoop Integration

Stage 2 - Results

Result after stage 2

{"_id": "1UoTcvnCTz temp", "sensors": [ ObjectId("51b792d381c3e67b0a18d475"), ObjectId("51b792d381c3e67b0a18d16d"), ObjectId("51b792d381c3e67b0a18d2bf"), ...],"logs_count": 1050616 }

now populated with correct count

Page 66: MongoDB Hadoop Integration

Conclusions

Page 67: MongoDB Hadoop Integration

Recap

• Mongo-Hadoop - use Hadoop to do massive computations on big data sets stored in Mongo/BSON

• MongoDB becomes a Hadoop-enabled filesystem

• Tools and APIs make it easier: Streaming, Pig, Hive, EMR, etc.

Page 69: MongoDB Hadoop Integration