analytics with mongodb aggregation framework and hadoop connector

Post on 26-Jan-2015

129 Views

Category:

Technology

0 Downloads

Preview:

Click to see full reader

DESCRIPTION

What are the Big Data tools in and around MongoDB.

TRANSCRIPT

Analytics with MongoDBalone and with Hadoop Connector

Solution Architect, MongoDB

Henrik Ingo

@h_ingo

The Science in Data Science

• Collect data

• Explore the data, use visualization

• Use math

• Make predictions

• Test predictions– Collect even more data

• Repeat...

Why MongoDB?

When MongoDB?

5 NoSQL categories

Key Value Wide Column Document

Graph Map Reduce

Redis Cassandra

Neo4j Hadoop

MongoDB and Enterprise IT Stack

EDWHadoop

Ma

na

ge

me

nt

& M

on

ito

rin

gS

ecu

rity &

Au

ditin

g

RDBMS

CRM, ERP, Collaboration, Mobile, BI

OS & Virtualization, Compute, Storage, Network

RDBMS

Applications

Infrastructure

Data Management

Online Data Offline Data

How do we do it with MongoDB?

Collect data

Exponential Data Growth

http://www.worldwidewebsize.com/

Volume Velocity Variety

Volume Velocity Variety

Data Sources

Asynchronous writes

Upserts avoid unnecessary reads

Writes buffered in RAM and flushed to disk in

bulk

Data Sources

Data Sources

Data Sources

Spread writes over multiple shards

RDBMSMongoDB

{

_id : ObjectId("4c4ba5e5e8aabf3"),

employee_name: "Dunham, Justin",

department : "Marketing",

title : "Product Manager, Web",

report_up: "Neray, Graham",

pay_band: “C",

benefits : [

{ type : "Health",

plan : "PPO Plus" },

{ type : "Dental",

plan : "Standard" }

]

}

Volume Velocity Variety

Visualization

Visualization

d3js.org, …

Use math

Data Processing in MongoDB

• Pre-aggregated documents

• Aggregation Framework

• Map/Reduce

• Hadoop Connector

Pre-aggregated documents

Design Pattern

Pre-Aggregation

Data for

URL /

Date

{_id: "20101010/site-1/apache_pb.gif",metadata: {

date: ISODate("2000-10-10T00:00:00Z"),site: "site-1",page: "/apache_pb.gif" },

daily: 5468426,hourly: {

"0": 227850,"1": 210231,..."23": 20457 },

minute: {"0": 3612,"1": 3241,..."1439": 2819 }

}

Pre-Aggregation

Data for

URL /

Date

query = { '_id': "20101010/site-1/apache_pb.gif" }

update = { '$inc': {'hourly.12' : 1,'minute.739': 1 } }

db.stats.daily.update(query, update, upsert=True)

Aggregation framework

Dynamic Queries

Find all logs for a

URL

db.logs.find( { ‘path’ : ‘/index.html’ } )

Find all logs for a

time range

db.logs.find( {‘time’ : {

‘$gte’: new Date(2013, 0),‘$lt’: new Date(2013, s1) }

} )

Find all logs for a

host over a range of

dates

db.logs.find( { ‘host’ : ‘127.0.0.1’,‘time’ : {

‘$gte’: new Date(2013, 0),‘$lt’: new Date(2013, 1) }

} )

Aggregation Framework

Requests

per day by

URL

db.logs.aggregate( [{ '$match': {

'time': {'$gte': new Date(2013, 0),'$lt': new Date(2013, 1) } } },

{ '$project': {'path': 1,'date': {

'y': { '$year': '$time' },'m': { '$month': '$time' },'d': { '$dayOfMonth': '$time' } } } },

{ '$group': {'_id': {'p': '$path','y': '$date.y','m': '$date.m','d': '$date.d' },

'hits': { '$sum': 1 } } },])

Aggregation Framework

{‘ok’: 1, ‘result’: [ { '_id': {'p':’/index.html’,'y': 2013,'m': 1,'d': 1 }, 'hits’: 124 },{ '_id': {'p':’/index.html’,'y': 2013,'m': 1,'d': 2 }, 'hits’: 245 },{ '_id': {'p':’/index.html’,'y': 2013,'m': 1,'d': 3 }, 'hits’: 322 },{ '_id': {'p':’/index.html’,'y': 2013,'m': 1,'d': 4 }, 'hits’: 175 },{ '_id': {'p':’/index.html’,'y': 2013,'m': 1,'d': 5 }, 'hits’: 94 }

]}

Aggregation Framework Benefits

• Real-time

• Simple yet powerful interface

• Scale-out

• Declared in JSON, executes in C++

• Runs inside MongoDB on local data

Map Reduce in MongoDB

MongoDB Map/Reduce

Map Reduce – Map PhaseGenerate hourly

rollups from log

data

var map = function() {

var key = {

p: this.path,

d: new Date(

this.ts.getFullYear(),

this.ts.getMonth(),

this.ts.getDate(),

this.ts.getHours(),

0, 0, 0) };

emit( key, { hits: 1 } );

}

Map Reduce – Reduce PhaseGenerate hourly

rollups from log

data

var reduce = function(key, values) {

var r = { hits: 0 };

values.forEach(function(v) {

r.hits += v.hits;

});

return r;

}

)

Map Reduce - Execution

query = { 'ts': {'$gte': new Date(2013, 0, 1),'$lte': new Date(2013, 0, 31) } }

db.logs.mapReduce( map, reduce, { ‘query’: query,‘out’: {

‘reduce’ : ‘stats.monthly’ }} )

MongoDB Map/Reduce Benefits

• Runs inside MongoDB

• Sharding supported

• JavaScript– Pro: functionality, expressiveness

– Con: overhead

• Input can be a collection or query!

• Output directly to document or collection

• Easy, when you don’t want overhead of Hadoop

Hadoop Connector

MongoDB with Hadoop

MongoDB with Hadoop

MongoDB

MongoDB with Hadoop

How it works

• Adapter examines MongoDB input collection and calculates a set of splits from data

• Each split is assigned to a Hadoop node

• In parallel hadoop pulls data from splits on MongoDB (or BSON) and starts processing locally

• Hadoop merges results and streams output back to MongoDB (or BSON) output collection

mongo.job.input.format=com.mongodb.hadoop.MongoInputFormat

mongo.input.uri=mongodb://my-db:27017/enron.messages

mongo.job.input.format=com.mongodb.hadoop.BSONFileInputFormat

mapred.input.dir= file:///tmp/messages.bson

mapred.input.dir= hdfs:///tmp/messages.bson

mapred.input.dir= s3:///tmp/messages.bson

(or BSON)Read From MongoDB

mongo.job.output.format=com.mongodb.hadoop.MongoOutputFormat

mongo.output.uri=mongodb://my-db:27017/enron.results_out

mongo.job.output.format=com.mongodb.hadoop.BSONFileOutputFormat

mapred.output.dir= file:///tmp/results.bson

mapred.output.dir= hdfs:///tmp/results.bson

mapred.output.dir= s3:///tmp/results.bson

(or BSON)Write To MongoDB

{

"_id" : ObjectId("4f2ad4c4d1e2d3f15a000000"),

"body" : "Here is our forecast\n\n ",

"filename" : "1.",

"headers" : {

"From" : "phillip.allen@enron.com",

"Subject" : "Forecast Info",

"X-bcc" : "",

"To" : "tim.belden@enron.com",

"X-Origin" : "Allen-P",

"X-From" : "Phillip K Allen",

"Date" : "Mon, 14 May 2001 16:39:00 -0700 (PDT)",

"X-To" : "Tim Belden ",

"Message-ID" : "<18782981.1075855378110.JavaMail.evans@thyme>",

"Content-Type" : "text/plain; charset=us-ascii",

"Mime-Version" : "1.0"

}

}

Document Example

Graph Sketch

{"_id": {"t":"bob@enron.com", "f":"alice@enron.com"}, "count" : 14}

{"_id": {"t":"bob@enron.com", "f":"eve@enron.com"}, "count" : 9}

{"_id": {"t":"alice@enron.com", "f":"charlie@enron.com"}, "count" : 99}

{"_id": {"t":"charlie@enron.com", "f":"bob@enron.com"}, "count" : 48}

{"_id": {"t":"eve@enron.com", "f":"charlie@enron.com"}, "count" : 20}

Receiver Sender Pairs

@Override

public void map(NullWritable key, BSONObject val, final Context context){

BSONObject headers = (BSONObject)val.get("headers"); if(headers.containsKey("From") && headers.containsKey("To")){

String from = (String)headers.get("From"); String to = (String) headers.get("To"); String[] recips = to.split(",");

for(int i=0;i<recips.length;i++){

String recip = recips[i].trim();

context.write(new MailPair(from, recip), new IntWritable(1));

}

}

}

Map Phase – each document get’s through mapper function

public void reduce(final MailPair pKey, final Iterable<IntWritable> pValues, final Context pContext ){

int sum = 0;

for ( final IntWritable value : pValues ){

sum += value.get();

}

BSONObject outDoc = new BasicDBObjectBuilder().start()

.add( "f" , pKey.from)

.add( "t" , pKey.to )

.get();BSONWritable pkeyOut = new BSONWritable(outDoc);

pContext.write( pkeyOut, new IntWritable(sum) ); }

Reduce Phase – output Maps are grouped by key and passed to Reducer

mongos> db.streaming.output.find({"_id.t": /^kenneth.lay/})

{ "_id" : { "t" : "kenneth.lay@enron.com",

"f" : "15126-1267@m2.innovyx.com" }, "count" : 1 }

{ "_id" : { "t" : "kenneth.lay@enron.com",

"f" : "2586207@www4.imakenews.com" }, "count" : 1 }

{ "_id" : { "t" : "kenneth.lay@enron.com",

"f" : "40enron@enron.com" }, "count" : 2 }

{ "_id" : { "t" : "kenneth.lay@enron.com",

"f" : "a..davis@enron.com" }, "count" : 2 }

{ "_id" : { "t" : "kenneth.lay@enron.com",

"f" : "a..hughes@enron.com" }, "count" : 4 }

{ "_id" : { "t" : "kenneth.lay@enron.com",

"f" : "a..lindholm@enron.com" }, "count" : 1 }

{ "_id" : { "t" : "kenneth.lay@enron.com",

"f" : "a..schroeder@enron.com" }, "count" : 1 }

Query Data

Hadoop Connector Benefits

• Full multi-core parallelism to process MongoDB data

• mongo.input.query

• Full integration w/ Hadoop and JVM ecosystem

• Mahout, et.al.

• Can be used on Amazon Elastic MapReduce

• Read and write backup files to local, HDFS and S3

• Vanilla Java MapReduce, Hadoop Streaming, Pig, Hive

Make predictions & test

A/B testing

• Hey, it looks like teenage girls clicked a lot on that ad with a pink background...

• Hypothesis: Given otherwise the same ad, teenage girls are more likely to click on ads with pink

backgrounds than white

• Test 50-50 pink vs white ads

• Collect click stream stats in MongoDB or Hadoop

• Analyze results

Recommendations – social filtering

• ”Customers who bought this book also bought”

• Computed offline / nightly

• As easy as it sounds! google it: Amazon item-to-item algorithm

Personalization

• ”Even if you are a teenage girl, you seem to be 60% more likely to click on blue ads than pink.”

• User specific recommendations a hybrid of offline & online recommendations

• User profile in MongoDB

• May even be updated real time

Solution Architect, MongoDB

Henrik Ingo

@h_ingo

Questions?

top related