hadoop - mongodb webinar june 2014

48
Mongo-Hadoop Integration Justin Lee Software Engineer @ MongoDB

Upload: mongodb

Post on 26-Jan-2015

113 views

Category:

Technology


2 download

DESCRIPTION

 

TRANSCRIPT

Page 1: Hadoop - MongoDB Webinar June 2014

Mongo-Hadoop IntegrationJustin Lee

Software Engineer @ MongoDB

Page 2: Hadoop - MongoDB Webinar June 2014

We will cover:

•what it is•how it works•a tour of what it can do

A quick briefing on what Mongo and Hadoop are all about:

(Q+A at the end)

Page 3: Hadoop - MongoDB Webinar June 2014

document-oriented database with dynamic schema

stores data in JSON-like documents:

{ _id : “kosmo kramer”,

age : 42,location : {

state : ”NY”,zip : ”10024”

},favorite_colors : [“red”, “green”]

}

different structure in each documentvalues can be simple like strings and ints or nested documents

Page 4: Hadoop - MongoDB Webinar June 2014

mongodb scales horizontally via sharding to handle lots of data and load

app

Page 5: Hadoop - MongoDB Webinar June 2014

Java-based framework for MapReduce

Excels at batch processing on large data setsby taking advantage of parallelism

map reduce created by google (white paper)implemented in open source by hadoop

Page 6: Hadoop - MongoDB Webinar June 2014

Mongo-Hadoop Connector - Why

Lots of people using Hadoop and Mongo separately but need integration

Custom import/export scripts often used to get data in+out

Scalability and flexibility with changes in Hadoop or MongoDB configurations

Need to process data across multiple sources

custom scripts slow, fragile

Page 7: Hadoop - MongoDB Webinar June 2014

Mongo-Hadoop Connector

Turn MongoDB into a Hadoop-enabled filesystem:use as the input or output for Hadoop

.BSON

-or-

input data

.BSON

-or-

Hadoop Cluster

outputresults

bson file new in 1.1bson is the output of mongodump

Page 8: Hadoop - MongoDB Webinar June 2014

Mongo-Hadoop ConnectorBenefits + Features

Takes advantage of full multi-core parallelism to process data in Mongo

Full integration with Hadoop and JVM ecosystems

Can be used with Amazon Elastic MapReduce

Can read and write backup files from local filesystem, HDFS, or S3

Page 9: Hadoop - MongoDB Webinar June 2014

Mongo-Hadoop Connector

Vanilla Java MapReduce

write MapReduce code in

ruby

or if you don’t want to use Java,support for Hadoop Streaming.

Benefits + Features

can write your own language binding

Page 10: Hadoop - MongoDB Webinar June 2014

Mongo-Hadoop Connector

Support for Pighigh-level scripting language for data analysis and

building MapReduce workflows

Support for HiveSQL-like language for ad-hoc queries + analysis of data sets on

Hadoop-compatible file systems

Benefits + Features

Page 11: Hadoop - MongoDB Webinar June 2014

Mongo-Hadoop Connector

How it works:

Adapter examines the MongoDB input collection and calculates a set of splits from the data

Each split gets assigned to a node in Hadoop cluster

In parallel, Hadoop nodes pull data for splits from MongoDB (or BSON) and process them locally

Hadoop merges results and streams output back to MongoDB or BSON

Page 12: Hadoop - MongoDB Webinar June 2014

Tour of Mongo-Hadoop, by Example

- Using Java MapReduce with Mongo-Hadoop

- Using Hadoop Streaming

- Pig and Hive with Mongo-Hadoop

- Elastic MapReduce + BSON

Page 13: Hadoop - MongoDB Webinar June 2014

{ "_id" : ObjectId("4f2ad4c4d1e2d3f15a000000"), "body" : "Here is our forecast\n\n ", "filename" : "1.", "headers" : { "From" : "[email protected]", "Subject" : "Forecast Info", "X-bcc" : "", "To" : "[email protected]", "X-Origin" : "Allen-P", "X-From" : "Phillip K Allen", "Date" : "Mon, 14 May 2001 16:39:00 -0700 (PDT)", "X-To" : "Tim Belden ", "Message-ID" : "<18782981.1075855378110.JavaMail.evans@thyme>", "Content-Type" : "text/plain; charset=us-ascii", "Mime-Version" : "1.0" }}

Input Data: Enron e-mail corpus (501k records, 1.75Gb)

each document is one email

sender

recipients

Page 14: Hadoop - MongoDB Webinar June 2014

{"_id": {"t":"[email protected]", "f":"[email protected]"}, "count" : 14}

{"_id": {"t":"[email protected]", "f":"[email protected]"}, "count" : 9}

{"_id": {"t":"[email protected]", "f":"[email protected]"}, "count" : 99}

{"_id": {"t":"[email protected]", "f":"[email protected]"}, "count" : 48}

{"_id": {"t":"[email protected]", "f":"[email protected]"}, "count" : 20}

Let’s use Hadoop to build a graph of (senders → recipients) and the count of messages exchanged between each pair

bob

alice

eve

charlie

1499

9

48

20

sample, simplified datanodes are people. edges/arrows # of msgs from A to B

Page 15: Hadoop - MongoDB Webinar June 2014

Example 1 - Java MapReduce

mongodb document passed into Hadoop MapReduce

Map phase - each input doc gets passed through a Mapper function

@Overridepublic  void  map(NullWritable  key,  BSONObject  val,  final  Context  context){        BSONObject  headers  =  (BSONObject)val.get("headers");        if(headers.containsKey("From")  &&  headers.containsKey("To")){                String  from  =  (String)headers.get("From");                String  to  =  (String)headers.get("To");                String[]  recips  =  to.split(",");                for(int  i=0;i<recips.length;i++){                        String  recip  =  recips[i].trim();                        context.write(new  MailPair(from,  recip),  new  IntWritable(1));                }        }}

input value doc from mongo. connector will handle translation into BSONObject for you

Page 16: Hadoop - MongoDB Webinar June 2014

output written back to MongoDB

Example 1 - Java MapReduce (cont)

Reduce phase - outputs of Map are grouped together by key and passed to Reducer

the {to, from} key

list of all the values collected under the key

       public  void  reduce(  final  MailPair  pKey,                                                final  Iterable<IntWritable>  pValues,                                                final  Context  pContext  ){                int  sum  =  0;

               for  (  final  IntWritable  value  :  pValues  ){                        sum  +=  value.get();                }

               BSONObject  outDoc  =  new  BasicDBObjectBuilder().start()                                                        .add(  "f"  ,  pKey.from)

                           .add(  "t"  ,  pKey.to  )                            .get();

               BSONWritable  pkeyOut  =  new  BSONWritable(outDoc);                pContext.write(  pkeyOut,  new  IntWritable(sum)  );        }

Page 17: Hadoop - MongoDB Webinar June 2014

Example 1 - Java MapReduce (cont)

mongo.job.input.format=com.mongodb.hadoop.MongoInputFormatmongo.input.uri=mongodb://my-db:27017/enron.messages

Read from MongoDB

Read from BSON

mongo.job.input.format=com.mongodb.hadoop.BSONFileInputFormatmapred.input.dir=file:///tmp/messages.bsonhdfs:///tmp/messages.bsons3:///tmp/messages.bson

Page 18: Hadoop - MongoDB Webinar June 2014

Example 1 - Java MapReduce (cont)

mongo.job.output.format=com.mongodb.hadoop.MongoOutputFormatmongo.output.uri=mongodb://my-db:27017/enron.results_out

Write output to MongoDB

Write output to BSON

mongo.job.output.format=com.mongodb.hadoop.BSONFileOutputFormatmapred.output.dir=file:///tmp/results.bson

hdfs:///tmp/results.bsons3:///tmp/results.bson

Page 19: Hadoop - MongoDB Webinar June 2014

Results : Output Data

mongos> db.results_out.find({"_id.t": /^kenneth.lay/}){ "_id" : { "t" : "[email protected]", "f" : "[email protected]" }, "count" : 1 }{ "_id" : { "t" : "[email protected]", "f" : "[email protected]" }, "count" : 1 }{ "_id" : { "t" : "[email protected]", "f" : "[email protected]" }, "count" : 2 }{ "_id" : { "t" : "[email protected]", "f" : "[email protected]" }, "count" : 2 }{ "_id" : { "t" : "[email protected]", "f" : "[email protected]" }, "count" : 4 }{ "_id" : { "t" : "[email protected]", "f" : "[email protected]" }, "count" : 1 }{ "_id" : { "t" : "[email protected]", "f" : "[email protected]" }, "count" : 1 }...has more

Page 20: Hadoop - MongoDB Webinar June 2014

Example 2 - Hadoop Streaming

Let’s do the same Enron MapReduce job with Python instead of Java

$ pip install pymongo_hadoop

Page 21: Hadoop - MongoDB Webinar June 2014

Example 2 - Hadoop Streaming (cont)

Hadoop passes data to an external process via STDOUT/STDIN

map(k, v)map(k, v)map(k, v)map()

JVM

STDINPython / Ruby / JS

interpreter

STDOUT

Hadoop (JVM)

def mapper(documents): . . .

Page 22: Hadoop - MongoDB Webinar June 2014

Example 2 - Hadoop Streaming (cont)

from pymongo_hadoop import BSONMapper

def mapper(documents): i = 0 for doc in documents: i = i + 1 from_field = doc['headers']['From'] to_field = doc['headers']['To'] recips = [x.strip() for x in to_field.split(',')] for r in recips: yield {'_id': {'f':from_field, 't':r}, 'count': 1}

BSONMapper(mapper)print >> sys.stderr, "Done Mapping."

BSONMapper is pymongo layer that translates from hadoop streaming back to hadoop

Page 23: Hadoop - MongoDB Webinar June 2014

Example 2 - Hadoop Streaming (cont)

from pymongo_hadoop import BSONReducer

def reducer(key, values): print >> sys.stderr, "Processing from/to %s" % str(key) _count = 0 for v in values: _count += v['count'] return {'_id': key, 'count': _count}

BSONReducer(reducer)

Page 24: Hadoop - MongoDB Webinar June 2014

Surviving Hadoop:making MapReduce easier

Pig + Hive

writing m/r jobs from scratch can be clunky and cumbersome

Page 25: Hadoop - MongoDB Webinar June 2014

Example 3 - Mongo-Hadoop and Pig

Let’s do the same thing yet again, but this time using Pig

Pig is a powerful language that can generate sophisticated MapReduce workflows from simple

scripts

Can perform JOIN, GROUP, and execute user-defined functions (UDFs)

Page 26: Hadoop - MongoDB Webinar June 2014

Example 3 - Mongo-Hadoop and Pig (cont)

Pig directives for loading data:BSONLoader and MongoLoader

Writing data outBSONStorage and MongoInsertStorage

data = LOAD 'mongodb://localhost:27017/db.collection' using com.mongodb.hadoop.pig.MongoLoader;

STORE records INTO 'file:///output.bson' using com.mongodb.hadoop.pig.BSONStorage;

Page 27: Hadoop - MongoDB Webinar June 2014

Pig has its own special datatypes: Bags, Maps, and Tuples

Mongo-Hadoop Connector intelligently converts between Pig datatypes and

MongoDB datatypes

Example 3 - Mongo-Hadoop and Pig (cont)

bags -> arraysmaps -> objects

Page 28: Hadoop - MongoDB Webinar June 2014

raw = LOAD 'hdfs:///messages.bson' using com.mongodb.hadoop.pig.BSONLoader('','headers:[]') ;

send_recip = FOREACH raw GENERATE $0#'From' as from, $0#'To' as to;

send_recip_filtered = FILTER send_recip BY to IS NOT NULL;

send_recip_split = FOREACH send_recip_filtered GENERATE from as from, TRIM(FLATTEN(TOKENIZE(to))) as to;

send_recip_grouped = GROUP send_recip_split BY (from, to);send_recip_counted = FOREACH send_recip_grouped GENERATE group, COUNT($1) as count;

STORE send_recip_counted INTO 'file:///enron_results.bson' using com.mongodb.hadoop.pig.BSONStorage;

Example 3 - Mongo-Hadoop and Pig (cont)

Page 29: Hadoop - MongoDB Webinar June 2014

Hive with Mongo-Hadoop

Similar idea to Pig - process your data without needing to write MapReduce code from

scratch

...but with SQL as the language of choice

Page 30: Hadoop - MongoDB Webinar June 2014

Hive with Mongo-Hadoop

Sample Data:db.users

db.users.find(){ "_id": 1, "name": "Tom", "age": 28 }{ "_id": 2, "name": "Alice", "age": 18 }{ "_id": 3, "name": "Bob", "age": 29 }{ "_id": 101, "name": "Scott", "age": 10 }{ "_id": 104, "name": "Jesse", "age": 52 }{ "_id": 110, "name": "Mike", "age": 32 }...

CREATE TABLE mongo_users (id int, name string, age int)STORED BY "com.mongodb.hadoop.hive.MongoStorageHandler"WITH SERDEPROPERTIES( "mongo.columns.mapping" = "_id,name,age" )TBLPROPERTIES ( "mongo.uri" = "mongodb://localhost:27017/test.users");

first, declare the collection to be accessible in Hive:

Page 31: Hadoop - MongoDB Webinar June 2014

Hive with Mongo-Hadoop

. . .then you can run SQL on it, like a table.SELECT name,age FROM mongo_users WHERE id > 100 ;

SELECT * FROM mongo_users GROUP BY age WHERE id > 100 ;

you can use GROUP BY:

or JOIN multiple tables/collections together:

SELECT * FROM mongo_users T1 JOIN user_emails T2 WHERE T1.id = T2.id;

subset of SQL

Page 32: Hadoop - MongoDB Webinar June 2014

Write the output of queries back into new tables:

INSERT OVERWRITE TABLE old_users SELECT id,name,age FROM mongo_users WHERE age > 100 ;

DROP TABLE mongo_users;

Drop a table in Hive to delete the underlying collection in MongoDB

use “external” when declaring your table to prevent the collection drop

Page 33: Hadoop - MongoDB Webinar June 2014

Usage with Amazon Elastic MapReduce

Run mongo-hadoop jobs without needing to set up or manage your

own Hadoop cluster.

Pig, Hive, and streaming work on EMR, too!Logs get captured into S3 files

Page 34: Hadoop - MongoDB Webinar June 2014

Usage with Amazon Elastic MapReduce

First, make a “bootstrap” script that fetches dependencies (mongo-hadoop

jar and java drivers) #!/bin/sh

wget -P /home/hadoop/lib http://central.maven.org/maven2/org/mongodb/mongo-java-driver/2.12.2/mongo-java-driver-2.12.2.jar

wget -P /home/hadoop/lib https://s3.amazonaws.com/mongo-hadoop-code/mongo-hadoop-core_1.1.2-1.1.0.jar

this will get executed on each node in the cluster that EMR builds for us.

working on updating hadoop artifacts in maven

Page 35: Hadoop - MongoDB Webinar June 2014

Example 4 - Usage with Amazon Elastic MapReduce

Put the bootstrap script, and all your code, into an S3 bucket where Amazon can see it.

s3cp ./bootstrap.sh s3://$S3_BUCKET/bootstrap.shs3mod s3://$S3_BUCKET/bootstrap.sh public-read

s3cp $HERE/../enron/target/enron-example.jar s3://$S3_BUCKET/enron-example.jars3mod s3://$S3_BUCKET/enron-example.jar public-read

Page 36: Hadoop - MongoDB Webinar June 2014

$ elastic-mapreduce --create --jobflow ENRON000 --instance-type m1.xlarge --num-instances 5 --bootstrap-action s3://$S3_BUCKET/bootstrap.sh --log-uri s3://$S3_BUCKET/enron_logs --jar s3://$S3_BUCKET/enron-example.jar --arg -D --arg mongo.job.input.format=com.mongodb.hadoop.BSONFileInputFormat --arg -D --arg mapred.input.dir=s3n://mongo-test-data/messages.bson --arg -D --arg mapred.output.dir=s3n://$S3_BUCKET/BSON_OUT --arg -D --arg mongo.job.output.format=com.mongodb.hadoop.BSONFileOutputFormat # (any additional parameters here)

Example 4 - Usage with Amazon Elastic MapReduce

. . .then launch the job from the command line, pointing to your S3 locations

Control the type and number of instances

in the cluster

Page 37: Hadoop - MongoDB Webinar June 2014

Example 4 - Usage with Amazon Elastic MapReduce

Easy to kick off a Hadoop job, without needing to manage a Hadoop cluster

Pig, Hive, and streaming work on EMR, too!

Logs get captured into S3 files

Page 38: Hadoop - MongoDB Webinar June 2014

Example 5 - New Feature: MongoUpdateWritable

. . . but we can also modify an existing output collection

Works by applying mongodb update modifiers:$push, $pull, $addToSet, $inc, $set, etc.

Can be used to do incremental MapReduce or“join” two collections

In previous examples, we wrote job output data by inserting into a new collection

Page 39: Hadoop - MongoDB Webinar June 2014

Example 5 - MongoUpdateWritableFor example,

let’s say we have two collections.

{    "_id":  ObjectId("51b792d381c3e67b0a18d678"),    "sensor_id":  ObjectId("51b792d381c3e67b0a18d4a1"),    "value":  3328.5895416489802,    "timestamp":  ISODate("2013-­‐05-­‐18T13:11:38.709-­‐0400"),    "loc":  [-­‐175.13,51.658]}

{    "_id":  ObjectId("51b792d381c3e67b0a18d0ed"),    "name":  "730LsRkX",    "type":  "pressure",    "owner":  "steve",}

sensors

log events

refers to which sensor logged the event

For each owner, we want to calculate how many events were recorded for each type of sensor that logged it.

Page 40: Hadoop - MongoDB Webinar June 2014

For each owner, we want to calculate how many events were recorded for each type of sensor that logged it.

Plain english:

Bob’s sensors for temperature have stored 1300 readingsBob’s sensors for pressure have stored 400 readings

Alice’s sensors for humidity have stored 600 readingsAlice’s sensors for temperature have stored 700 readings

etc...

Page 41: Hadoop - MongoDB Webinar June 2014

sensors(mongodb collection)

Stage 1 -MapReduce on sensors collection

Results(mongodb collection)

for each sensor, emit: {key: owner+type, value: _id}

group data from map() under each key, output:{key: owner+type, val: [ list of _ids] }

read from mongodb

insert() new records to mongodb

MapReduce

log events(mongodb collection)

do this in two stages

Page 42: Hadoop - MongoDB Webinar June 2014

the sensor’s owner and type

After stage one, the output docs look like:

list of ID’s of sensors with this owner and type

{    "_id":  "alice  pressure",    "sensors":  [        ObjectId("51b792d381c3e67b0a18d475"),        ObjectId("51b792d381c3e67b0a18d16d"),        ObjectId("51b792d381c3e67b0a18d2bf"),        …    ]}

Now we just need to count the total # of log events recorded for any sensors that appear in the list for each owner/type group.

Page 43: Hadoop - MongoDB Webinar June 2014

sensors(mongodb collection)

Stage 2 -MapReduce on log events collection

read from mongodb

Results(mongodb collection)

update() existing records in mongodb

MapReduce

log events(mongodb collection)

for each sensor, emit: {key: sensor_id, value: 1}

group data from map() under each keyfor each value in that key: update({sensors: key}, {$inc : {logs_count:1}})

context.write(null,  new  MongoUpdateWritable(      query,  //which  documents  to  modify        update,  //how  to  modify  ($inc)      true,        //upsert      false));  //  multi

Page 44: Hadoop - MongoDB Webinar June 2014

Example - MongoUpdateWritable

Result after stage 2

{    "_id":  "1UoTcvnCTz  temp",    "sensors":  [        ObjectId("51b792d381c3e67b0a18d475"),        ObjectId("51b792d381c3e67b0a18d16d"),        ObjectId("51b792d381c3e67b0a18d2bf"),        …    ],    "logs_count":  1050616}

now populated with correct count

Page 45: Hadoop - MongoDB Webinar June 2014

New Features in v1.2 and beyond

Continually improving Hive support

Performance Improvements - Lazy BSON

Support for multi-collection input sources

API for adding custom splitter implementations

and more

primarily focusing on hive but pig is nextmaven central

Page 46: Hadoop - MongoDB Webinar June 2014

Recap

Mongo-Hadoop - use Hadoop to do massive computations on big data sets stored in MongoDB/BSON

Tools and APIs make it easier: Streaming, Pig, Hive, EMR, etc.

MongoDB becomes a Hadoop-enabled filesystem

Page 47: Hadoop - MongoDB Webinar June 2014

Questions?

https://github.com/mongodb/mongo-hadoop/tree/master/examples

Examples can be found on github:

Page 48: Hadoop - MongoDB Webinar June 2014

MongoDB WorldNew York City, June 23-25

Save 25% with 25JustinLee

Register at world.mongodb.com