mongodb: intro & application for big data
DESCRIPTION
TRANSCRIPT
Intro & Application for Big Data
Rakuten Technology Conference2011
doryokujin
・Takahiro Inoue (age 26)
・twitter: doryokujin
・Data Scientist
・Leader of MongoDB JP
・NoSQL, Hadoop, DataMining
About Me
1. MongoDB Features
1.1 User Friendly
1.2 Scalability
2. MongoDB & BigData
Agenda
News: #mongotokyo2012
http://www.10gen.com/events/mongo-tokyo-2012
1. MongoDB Features
http://blog.nahurst.com/visual-guide-to-nosql-systems
・Document-oriented storage » JSON-style documents with dynamic schemas offer simplicity and power.
・Full Index Support » Index on any attribute, just like you're used to.
・Querying » Rich, document-based queries.
・Replication & High Availability » Mirror across LANs and WANs for scale and peace of mind.
・Auto-Sharding »Scale horizontally without compromising functionality.
・Map/Reduce » Flexible aggregation and data processing.
MongoDB Features
ScalabilityUser Friendly http://www.mongodb.org/
1.1. User Friendly
{
"_id" : ObjectId("4dcd3ebc9278000000005158"),
"timestamp" : ISODate("2011-05-13T14:22:46.777Z"),
"binary" : BinData(0,""),
"string" : "abc",
"number" : 3,
"subobj" : {"subA": 1, "subB": { "subsubC": 2 }},
"array" : [1, 2, 3],
"dbref" : [_id1, _id2, _id3]
}
Document
{
"_id" : ObjectId("4dcd3ebc9278000000005158"),
"nickname" : "doryokujin"
},{
"_id" : ObjectId("4dcd3ebc9278000000005159"),
"firstname" : "Takahiro",
"lastname" : "Inoue",
"mail" : "[email protected]",
"twitter" : "@doryokujin"
},...
Schema Free
{
"_id" : ObjectId("4dcd3ebc9278000000005158"),
"timestamp" : ISODate("2011-05-13T14:22:46.777Z"),
"binary" : BinData(0,""),
"string" : "abc",
"number" : 3,
"subobj" : {"subA": 1, "subB": 2 },
"array" : [1, 2, 3],
}
padding
Document (Insert)
{
"_id" : ObjectId("4dcd3ebc9278000000005158"),
"timestamp" : ISODate("2011-05-13T14:22:46.777Z"),
"binary" : BinData(0,""),
"string" : "def",
"number" : 4,
"subobj" : {"subA": 1, "subB": 2 },
"array" : [1, 2, 3, 4, 5, 6],
"newkey" : "In-place"
}
{ $inc : {"number": 1} }
{ $set : {"string": "def"} }
{ $pull : {"subobj": {"subB": 2 } } }
{ $addToSet : { "array" : { $each : [ 4 , 5 , 6 ] } } }
{ $set : {"newkey": "In-place"} }
Document (Update)
MySQL MongoDB
SELECT a,b FROM users db.users.find( {}, { a: 1, b: 1 })
SELECT * FROM users db.users.find()
SELECT * FROM users WHERE age=33 db.users.find({ age: 33 })
SELECT a,b FROM users WHERE age=33 db.users.find({ age: 33 }, { a: 1, b: 1 })
SELECT * FROM users WHERE age=33 ORDER BY name db.users.find({ age: 33 }).sort({name:1})
MySQL <--> MongoDB(Query)
MySQL MongoDB
SELECT * FROM users WHERE age>33 db.users.find({ age: { $gt: 33 }})
SELECT * FROM users WHERE age!=33 db.users.find({ age: { $ne: 33 }})
SELECT * FROM users WHERE name LIKE "%Joe%" db.users.find({ name: /Joe/ })
SELECT * FROM users WHERE name LIKE "Joe%" db.users.find({ name: /^Joe/ })
SELECT * FROM users WHERE age>33 AND age<=40
db.users.find({ 'age': { $gt: 33, $lte: 40 }})
MySQL <--> MongoDB(Query)
MySQL MongoDBSELECT * FROM users ORDER BY name
DESC db.users.find().sort({ name: -1 })
SELECT * FROM users WHERE a=1 and b='q' db.users.find({ a: 1, b: 'q' })
SELECT * FROM users LIMIT 10 SKIP 20 db.users.find().limit(10).skip(20)
SELECT * FROM users WHERE a=1 or b=2
db.users.find( { $or : [ { a: 1 } , { b: 2 } ] } )
SELECT * FROM users LIMIT 1 db.users.findOne()
MySQL <--> MongoDB(Query)
{
"_id" : ObjectId("4dcd3ebc9278000000005158"),
"timestamp" : ISODate("2011-05-13T14:22:46.777Z"),
"binary" : BinData(0,""),
"string" : "abc",
"number" : 3,
"subobj" : {"subA": 1, "subB": 2 },
"array" : [1, 2, 3],
}
db.coll.find({"number": 3})
db.coll.find({"number": {$gt: 1}})
db.coll.find({"subobj.subA": 1})
db.coll.find({"subobj.subB": {$exists: true} })
db.coll.find({"string": "abc"})
db.coll.find({ "string" : /^a.*$/i })
db.coll.find({"array": {$all:[1, 2]} });
db.coll.find({"array": {$in:[2, 4, 6]} });
Document (Query)
・JavaScript Base Commands
・Schema Free: JSON-style format
・Index on any Attribute
・Many Kinds of Queries
→ These are very important features for analytics, quick document access(aggregate) and watch from many point of views.
Summary
1.2. Scalability
Single Servermongod
DatabaseA
CollectionA
Doc Doc DocDoc Doc DocDoc Doc DocDoc Doc Doc
Doc Doc DocDoc Doc Doc
Doc Doc DocDoc Doc Doc
Doc Doc DocDoc Doc Doc
CollectionB CollectionC CollectionD
DatabaseB
Doc Doc DocDoc Doc DocDoc Doc DocDoc Doc Doc
Doc Doc DocDoc Doc Doc
Doc Doc DocDoc Doc Doc
Doc Doc DocDoc Doc Doc
Doc Doc DocDoc Doc DocDoc Doc DocDoc Doc Doc
Doc Doc DocDoc Doc Doc
Doc Doc DocDoc Doc Doc
Doc Doc DocDoc Doc Doc
Doc Doc DocDoc Doc DocDoc Doc DocDoc Doc Doc
Doc Doc DocDoc Doc Doc
Doc Doc DocDoc Doc Doc
Doc Doc DocDoc Doc Doc
Replica Set
SyncSync
Sync
PrimarySecondary Secondary
Always Primary exists only 1 or 0
Auto Failover
AsynchronousSync
[Auto Failover]
Member1Primary
Member2Secondary
Member4Secondary
Sync
SyncSync
Member3Secondary
1. Current Primary
Replica Set
[Auto Failover]
Member1Primary
Member2Secondary
Member4Secondary
Member3Secondary
2. Primary Down
Replica Set
[Auto Failover]
Member1Primary
Member2Secondary
Member4Secondary
Member3Secondary
3. Election Time
Negotiate Negotiate
Negotiate
Replica Set
[Auto Failover]
Member2Primary
Member4Secondary
Member3Secondary
4. New Primary
SyncSync
Replica Set
Sharding
mongod 1
DatabaseA DatabaseB
CollectionA Coll CollDoc Doc DocDoc Doc DocDoc Doc DocDoc Doc Doc
Doc Doc DocDoc Doc Doc
Doc Doc DocDoc Doc Doc
DocDocDocDoc
Doc
Doc Doc DocDoc Doc Doc
DatabaseACollectionADoc Doc DocDoc Doc DocDoc Doc Doc
mongod 2
DatabaseACollectionADoc Doc DocDoc Doc DocDoc Doc Doc
mongod 3
mongod 1
DatabaseA DatabaseB
CollectionA Coll CollDoc Doc DocDoc Doc Doc
DocDocDocDoc
Doc
分割
Shard1
Shard2 Shard3
Sharding Architecture
config Servers (Shard Configration)
mongos Servers (Routers)
Shard Servers (Data)
[ a, f )
[ f, k )
[ k, n)
[ n, o )
[ o, t )
[ t, } )
shard1 shard2 shard3
Chunk
Cluster
クライアントのアクセスは 必ず mongos に対して行う
該当データのある Shard にのみ クエリの送出・データの取得を行う
cinfig サーバーから Shard に関する情報を取得する
各Shardごとに書き込み先は1つ
( mongosがprimaryを自動選択 )
primary
Sharding Architecture
MapReduce
Translate SQL to MongoDB MapReduce
Exec on all Shard Servers
MongoDB & Big Data
・Large disk and memory are required
・Indexes tend to become large
・It needs 3GB disk for Journaling,
・5% of the disk for Replica Set,
・Shard Server Num × Replica Set
Bad: Need Large Storages
・Unstable: mongos, config
・Long dist lock when insert large data
・Bias between shards
・Consistency problems(ex. double counting)
Bad: Unstable Sharding
Don’t use Mongo as a
master data storage
and raw data storage
Use Mongo as a
aggregated data storage
or specific data storage(For Analytics)
Save data which is (in-
place) updatable , which
don’t know what to do next
Primary
Secondary
Slave Delay
Back Up --journal
Secondary
DC1 DC2
Slave Delay in Preparation for
User Error
Every Server: Large HDD(500GB~)Large Memory(16GB~)
Master Data on S3Non Sharding, Replica Set
Master Data on AmazonS3
Safe & Practical Architecture
Case1. Our interest is aggregated(result) data,
not raw data
Ex.1) First aggregation is done by Hadoop, second aggregation by MongoDB
Data Analytic Flow
1.Data Gathering
3.Data Analysis
5.Data Mining
6.Data Sharing
2.Data Pre-processing
4.Reslt Data Strage
Temporary Raw data storage is HDFS, not MongoDB
We only need result data to discover new features
・From Text Logs: (Large)・From mySQL: (Small)・From Other NoSQL: (Middle)
Payement,RegistrationSave DataAction
LogAccess Log
2. Data Pre-processing3. Data Analysis 4. ResultData
Storage
AmazonS3
1. Data Gathering(Large)
Cassandra MySQL
HDFS
1. Data Gathering(Small)
5. Data Sharing6. Data Mining
Data Analytic Flow
First Aggregation
Second Aggregation
Reduction
解析対象データ
Graph Mining
Web UI
Data Mining
5. Data Sharing
6. Data Mining
ScientificPython
Data Analytic Flow
Case2. We can change
from Batch Processing
to Streaming
Sadayuki Furuhashi
Fluent
@frsyuki
!e Event Collector Service
Treasure Data, Inc.
Structured logging
Pluggable architecture
Reliable forwarding
Ex.2) Fluent
Sadayuki Furuhashi
Fluent
@frsyuki
!e Event Collector Service
Treasure Data, Inc.
Structured logging
Pluggable architecture
Reliable forwarding
http://www.scribd.com/doc/70897187/Fluent-event-collector-update
What’s Fluent?
“2011-04-01 host1 myapp: cmessage size=12MB user=me”
2011-04-01 myapp.message { “on_host”: ”host1”, ”combined”: true, “size”: 12000000, “user”: “me”}
What’s Fluent?
2011-04-01 myapp.message { “on_host”: ”host1”, ”combined”: true, “size”: 12000000, “user”: “me”}
time tag
record
http://www.scribd.com/doc/70897187/Fluent-event-collector-update
Log collection
FluentApps
Client library
Apache log
HTTP+JSON(in_http)
TCP+MessagePack(in_tcp)
follow growth of files (in_tail)
http://www.scribd.com/doc/70897187/Fluent-event-collector-update
Case2-1. In fact, we use only
30% of all data, 70% is
unnecessary
App Server
Handle Filtered Data
App Server App Server App Server
log log log log
Relay Server
Relay Server
Analyze Server
Output to Mongo CollectionFluentd can output
each record as JSON:suitable for Mongo!!!
Fluentd parses each record as a structured data and serialize it by Message Pack
Flitering Flitering FliteringFlitering
Fluent Plugin Mongo
Case2-2. We need not to wait
to aggregate records until
reaching at local server
aggregate
key1 key2 key3 shuffle
aggregate perday, hour,
minute, second
aggregate
App Serverlog
aggregate
App Serverlog
aggregate
App Serverlog
aggregate
App Serverlog
aggregate
Relay Server
Relay Server
Relay Server
Analyze Server
aggregate aggregate
AggregatedData
Fluent Plugin Aggregation
Summary & Comment・MongoDB has many user friendly features
・Also has scalability, but sharding is still unstable
・Don’t use as a master & raw data storage, but it’s not critical because in many cases it is sufficient for having aggregated data
・Find good partner(tools) such as Hadoop, Fluent
・MongoDB & Big Data & Partner → very strong tools for analytics
Thank You !!