using riak for events storage and analysis at...
TRANSCRIPT
Using Riak for Events storage and analysis at Booking.com
Damien Krotkine
Damien Krotkine
• Software Engineer at Booking.com
• github.com/dams
• @damsieboy
• dkrotkine
KEY FIGURES
• 600,000 hotels
• 212 countries
• 800,000 room nights every 24 hours
• guest reviews 43 million+
• offices worldwide 155+
• 8,600 people
• not a small website…
INTRODUCTION
www APImobi
www APImobi
front
end
www APIfro
nten
dba
cken
dmobi
events storage
events: info about subsystems status
back
end
web mobi api
databases
caches
load balancersavailability
cluster
etc…
WHAT IS AN EVENT ?
EVENT STRUCTURE
• Provides info about subsystems
• Data
• Deep HashMap
• Timestamp
• Type + Subtype
• The rest: specific data
• Schema-less
{ timestamp => 12345, type => 'WEB', subtype => 'app', dc => 1, action => { is_normal_user => 1, pageview_id => '188a362744c301c2', # ... }, tuning => { the_request => 'GET /display/...' bytes_body => 35, wallclock => 111, nr_warnings => 0, # ... }, # ... }
{ type => 'FAV', subtype => 'fav', timestamp => 1401262979, dc => 1, tuning => { flatav => { cluster => '205', sum_latencies => 21, role => 'fav', num_queries => 7 } } }
EVENTS FLOW PROPERTIES
• Read-only
• Schema-less
• Continuous, sequential, timed
• 15 K events per sec
• 1.25 Billion events per day
• peak at 70 MB/s, min 25MB/s
• 100 GB per hour
USAGE
ASSESS THE NEEDS
• Before thinking about storage
• Think about the usage
USAGE
1. GRAPHS 2. DECISION MAKING 3. SHORT TERM ANALYSIS 4. A/B TESTING
GRAPHS
• Graph in real-time ( few seconds lag )
• Graph as many systems as possible
• General platform health check
GRAPHS
GRAPHS
DASHBOARDS
META GRAPHS
USAGE
1. GRAPHS 2. DECISION MAKING 3. SHORT TERM ANALYSIS 4. A/B TESTING
DECISION MAKING
• Strategic decision ( use facts )
• Long term or short term
• Technical / Non technical Reporting
USAGE
1. GRAPHS 2. DECISION MAKING 3. SHORT TERM ANALYSIS 4. A/B TESTING
SHORT TERM ANALYSIS
• From 10 sec ago -> 8 days ago
• Code deployment checks and rollback
• Anomaly Detector
USAGE
1. GRAPHS 2. DECISION MAKING 3. SHORT TERM ANALYSIS 4. A/B TESTING
A/B TESTING
• Our core philosophy: use facts
• It means: do A/B testing
• Concept of Experiments
• Events provide data to compare
EVENT AGGREGATION
EVENT AGGREGATION
• Group events
• Granularity we need: second
SERIALIZATION
• JSON didn’t work for us (slow, big, lack features)
• Created Sereal in 2012
• « Sereal, a new, binary data serialization format that provides high-performance, schema-less serialization »
• https://github.com/Sereal/Sereal
eventeventevents storage
eventevent
even
t
eventevent
eventevent
event
even
t
event event
e ee e
ee
e e
e ee
e eee
ee
e
LOGGER
e
e
web apieeeeee
eee
e ee e
ee
e e
e ee
e eee
ee
ee
e
web api dbseeeeeeee
eeeeeeeee
eeeeeee
e ee e
ee
e e
e ee
e eee
ee
ee
e
web api dbseeeeeeee
eeeeeeeee
eeeeeee
e ee e
ee
e e
e ee
e eee
ee
e
1 sec
e
e
web api dbseeeeeeee
eeeeeeeee
eeeeeee
e ee e
ee
e e
e ee
e eee
ee
e
1 sec
e
e
web api dbseeeee
eeeee
eeeee
1 sec
events storage
web api dbs
ee
eeeee
eeeee
1 sec
events storage
ee e reserialize + compress
events storage
LOGGER …LOGGER LOGGER
STORAGE
WHAT WE WANT
• Storage security
• Mass write performance
• Mass read performance
• Easy administration
• Very scalable
WE CHOSE RIAK
• Security: cluster, distributed, very robust
• Good and predictable read / write performance
• The easiest to setup and administrate
• Advanced features (MapReduce, triggers, 2i, CRDTs …)
• Riak Search
• Multi Datacenter Replication
CLUSTER
• Commodity hardware • All nodes serve data • Data replication
• Gossip between nodes • No master • distributed system
Ring of servers
hash(key)
KEY VALUE STORE
• Namespaces: bucket
• Values: opaque or CRDTs
RIAK: ADVANCED FEATURES
• MapReduce
• Secondary indexes (2i)
• Riak Search
• Multi DataCenter Replication
MULTI-BACKEND FOR STORAGE
• Bitcask
• Eleveldb
• Memory
BACKEND: BITCASK
• Log-based storage backend
• Append-only files (AOF files)
• Advanced expiration
• Predictable performance (1 disk-seek max)
• Perfect for sequential data
CLUSTER CONFIGURATION
DISK SPACE NEEDED
• 8 days
• 100 GB per hour
• Replication 3
• 100 * 24 * 8 * 3
• Need 60 T
HARDWARE
• 12 then 16 nodes (soon 24)
• 12 CPU cores ( Xeon 2.5Ghz)
• 192 GB RAM
• network 1 Gbit/s
• 8 TB (raid 6)
• Cluster total space: 128 TB
DATA DESIGN
web api dbs
ee
eeeee
eeeee
1 sec
events storage
1 blob per EPOCH / DC / CELL / TYPE / SUBTYPE 500 KB max chunks
DATA
• Bucket name: “data“
• Key: “12345:1:cell0:WEB:app:chunk0“
• Value: List of events (Hashmaps), serialized & compressed
• 200 keys per seconds
METADATA
• Bucket name: “metadata“
• Key: <epoch>-<dc> “1428415043-2“
• Value: list of data keys:[ “1428415043:1:cell0:WEB:app:chunk0“, “1428415043:1:cell0:WEB:app:chunk1“ … “1428415043:4:cell0:EMK::chunk3“ ]
• As pipe separated value (PSV)
WRITE DATA
PUSH DATA IN
• In each DC, in each cell, Loggers push to Riak
• Use ProtoBuf
• Every seconds:
• Push data values to Riak, async
• Wait for success
• Push metadata
JAVA
Bucket DataBucket = riakClient.fetchBucket("data").execute(); DataBucket.store("12345:1:cell0:WEB:app:chunk0", Data1).execute(); DataBucket.store("12345:1:cell0:WEB:app:chunk1", Data2).execute(); DataBucket.store("12345:1:cell0:WEB:app:chunk2", Data3).execute();
Bucket MetaDataBucket = riakClient.fetchBucket("metadata").execute(); MetaDataBucket.store("12345-1", metaData).execute(); riakClient.shutdown();
Perl
my $client = Riak::Client->new(…);
$client->put(data => '12345:1:cell0:WEB:app:chunk0', $data1); $client->put(data => '12345:1:cell0:WEB:app:chunk1', $data2); $client->put(data => '12345:1:cell0:WEB:app:chunk2', $data3);
$client->put(metadata => '12345-1', $metadata, 'text/plain' );
READ DATA
READ ONE SECOND
• For one second (a given epoch)
• Request metadata for <epoch>-DC
• Parse value
• Filter out unwanted types / subtypes
• Fetch the keys from the “data” bucket
Perl
my $client = Riak::Client->new(…); my @array = split '\|', $client->get(metadata => '1428415043-1'); @filtered_array = grep { /WEB/ } @array; $client->get(data => $_) foreach @filtered_array;
READ ONE SECOND
• For an interval epoch1 -> epoch2
• Generate the list of epochs
• Fetch in parallel
RIAK CLUSTER
CPU
DISK IOPS
DISK IO %
DISK SPACE RECLAIMED
one day
REAL TIME PROCESSING OUTSIDE OF RIAK
STREAMING
• Fetch 1 second every second
• Or a range ( ex: last 10 min )
• Fetch all epochs from Riak
• Use data on the client side
EXAMPLES
• Streaming => Graphite ( every sec )
• Streaming => Anomaly Detector ( last 2 min )
• Streaming => Experiment analysis ( last day )
• Every minute => Hadoop
• Manual request => test, debug, investigate
• Batch fetch => ad hoc analysis
• => Huge numbers of read requests
events storage
graphite cluster
Anomaly detector
experimentcluster
hadoop cluster
mysql analysis
manual requests
50 MB/s
50 MB/s
50 M
B/s 50 MB/s
50 MB/s50 MB/s
THIS IS REALTIME
• 1 second of data
• Stored in < 1 sec
• Available after < 1 sec
• Issue : network saturation
REAL TIME PROCESSING INSIDE RIAK
THE IDEA
• Instead of
• Fetch data,
• Crunch data (ex: average),
• Produce a small result
• Do
• Bring code to data
• Crunch data on Riak
• Fetch the result
WHAT TAKES TIME
• Takes a lot of time
• Fetching data out: network issue
• Decompressing: CPU time issue
• Takes almost no time
• Crunching data
MAPREDUCE
• Input: epoch-dc
• Map1: metadata keys => data keys
• Map2: data crunching
• Reduce: aggregate
• Realtime: OK
• network usage: OK
• CPU time: NOT OK
HOOKS
• Every time metadata is written
• Post-Commit hook triggered
• Crunch data on the nodes
Riak post-commit hook
REST serviceRIAK service
key keysocket
result sent for storage
decompressprocess all tasks
NODE HOST
HOOK CODE
metadata_stored_hook(RiakObject) -> Key = riak_object:key(RiakObject), Bucket = riak_object:bucket(RiakObject), [ Epoch, DC ] = binary:split(Key, <<"-">>), MetaData = riak_object:get_value(RiakObject), DataKeys = binary:split(MetaData, <<"|">>, [ global ]), send_to_REST(Epoch, Hostname, DataKeys), ok.
send_to_REST(Epoch, Hostname, DataKeys) -> Method = post, URL = "http://" ++ binary_to_list(Hostname) ++ ":5000?epoch=" ++ binary_to_list(Epoch), HTTPOptions = [ { timeout, 4000 } ], Options = [ { body_format, string }, { sync, false }, { receiver, fun(ReplyInfo) -> ok end } ], Body = iolist_to_binary(mochijson2:encode( DataKeys )), httpc:request(Method, {URL, [], "application/json", Body}, HTTPOptions, Options), ok.
REST SERVICE
• In Perl, using PSGI (WSGI-like), Starman, preforks
• Allow to write data cruncher in Perl
• Also supports loading code on demand
ADVANTAGES
• CPU usage and execution time can be capped
• Data is local to processing
• Two systems are decoupled
• REST service written in any language
• Data processing done all at once
• Data is decompressed only once
DISADVANTAGES
• Only for incoming data (streaming), not old data
• Can’t easily use cross-second data
• What if the companion service goes down ?
FUTURE
• Use this companion to generate optional small values
• Use Riak Search to index and search those
THE BANDWIDTH PROBLEM
• PUT - bad case
• n_val = 3
• inside usage = 3 x outside usage
• PUT - good case
• n_val = 3
• inside usage = 2 x outside usage
• GET - bad case
• inside usage = 3 x outside usage
• GET - good case
• inside usage = 2 x outside usage
• network usage ( PUT and GET ): • 3 x 13/16+ 2 x 3/16= 2.81 • plus gossip • inside network > 3 x outside network
• Usually it’s not a problem • But in our case: • big values, constant PUTs, lots of GETs • sadly, only 1 Gbit/s
• => network bandwidth issue
THE BANDWIDTH SOLUTIONS
THE BANDWIDTH SOLUTIONS
1. Optimize GET for network usage, not speed 2. Don’t choose a node at random
• GET - bad case
• n_val = 1
• inside usage = 1 x outside
• GET - good case
• n_val = 1
• inside usage = 0 x outside
WARNING
• Possible only because data is read-only
• Data has internal checksum
• No conflict possible
• Corruption detected
RESULT
• practical network usage reduced by 2 !
THE BANDWIDTH SOLUTIONS
1. Optimize GET for network usage, not speed 2. Don’t choose a node at random
• bucket = “metadata” • key = “12345”
• bucket = “metadata”
• key = “12345”
Hash = hashFunction(bucket + key)
RingStatus = getRingStatus
PrimaryNodes = Fun(Hash, RingStatus)
hashFunction()
getRingStatus()
hashFunction()
getRingStatus()
THE IDEA
• Do the hashing on the client • By default:
• “chash_keyfun”:{"mod":"riak_core_util", "fun":"chash_std_keyfun"},
• look at the source on github • riak_core/src/chash.erl
THE HASH FUNCTION
• Easy to re-implement client-side
• sha1 as a BigInt
• example: sha1(bucket + key) = 2450
RING STATUS
• If hash = 2450 => nodes 16, 17, 18
$ curl -s -XGET http://$host:8098/riak_preflists/myring{ “0" :"[email protected]" “5708990770823839524233143877797980545530986496” :"[email protected]" “11417981541647679048466287755595961091061972992”:"[email protected]" “17126972312471518572699431633393941636592959488":“[email protected]” …etc…}
WARNING
• Possible only if • Nodes list is monitored • In case of failed node, default to random • Data is requested in an uniform way
RESULT
• Network usage even more reduced ! • Especially for GETs
CONCLUSION
CONCLUSION
• We used only Riak Open Source
• No training, self-taught, small team
• Riak is a great solution
• Robust, fast, scalable, easy
• Very flexible and hackable
• Helps us continue scaling
Q&A@damsieboy