building a scalable distributed stats infrastructure with storm and kairosdb
Embed Size (px)
DESCRIPTION
Building a Scalable Distributed Stats Infrastructure with Storm and KairosDB Many startups collect and display stats and other time-series data for their users. A supposedly-simple NoSQL option such as MongoDB is often chosen to get started... which soon becomes 50 distributed replica sets as volume increases. This talk describes how we designed a scalable distributed stats infrastructure from the ground up. KairosDB, a rewrite of OpenTSDB built on top of Cassandra, provides a solid foundation for storing time-series data. Unfortunately, though, it has some limitations: millisecond time granularity and lack of atomic upsert operations which make counting (critical to any stats infrastructure) a challenge. Additionally, running KairosDB atop Cassandra inside AWS brings its own set of challenges, such as managing Cassandra seeds and AWS security groups as you grow or shrink your Cassandra ring. In this deep-dive talk, we explore how we've used a mix of open-source and in-house tools to tackle these challenges and build a robust, scalable, distributed stats infrastructure.TRANSCRIPT

1 Stat, 2 Stat, 3 StatA Trillion
Cody A. RayDev-Ops @ BrightTag

Outline1. Initial Attempt: MongoDB2. Ideal Stats System: KairosDB?3. Making KairosDB Work for Us

What Kind of Stats?Counting!
sum, min, max, etc
Any recurrence relation:yn = f(x, y0, …, yn-1)

The First Pass: MongoDB● JSON Documents, Schema-less, Flexible● Aggregation Pipeline, MapReduce● Master-Slave Replication● Atomic Operators!

http://fearlessdeveloper.com/race-condition-java-concurrency/
read countercounter = 0
read countercounter = 0
increment value by 1
increment value by 1
write value to counter = 1
write value to counter = 1
incorrect value of counter = 1




Simple, Right? What’s the Problem?
Only 3500 writes/second! (m1.large)
up to 7000 wps (with m1.xlarge)

Scale Horizontally?

Redundancy → Mongo Explosion!!!


Feel the Pain● Scale 3x. 3x != x. Big-O be damned.● Managing 50+ Mongo replica sets globally ● 10s of $1000s of dollars “wasted” each year

Ideal Stats System?● Linearly scalable time-series database● Store arbitrary metrics and metadata● Support aggregations, other complex
queries
● Bonus points foro good for storing both application and system metricso Graphite web integration

Enter KairosDB● “fast distributed scalable time series” db● General metric storage and retrieval● Based upon Cassandra
o linearly scalableo tuned for fast writeso eventually consistent, tunable replication

Adding Data
[ { "name": "archive_file_tracked", "datapoints": [[1359788400000, 123]], "tags": { "host": "server1", "data_center": "DC1" } }]

Querying Data
{ "start_absolute": 1357023600000, "end_absolute": 1357455600000 "metrics": [{ "name": "abc.123", "tags": { "host": ["foo", "foo2"], "type": ["bar"] }, "aggregators": [{ "name": "sum", "sampling": { "value": 10, "unit": "minutes" }}]}]}

The Catch(es)● Lack of atomic operations
o + millisecond time granularity
● Bad support for high cardinality “tags”
● Headache managing Cassandra in AWS

The Catch(es)● Lack of atomic operations
o + millisecond time granularity
● Bad support for high cardinality “tags”
● Headache managing Cassandra in AWS

Cassandra on AWS

Agathon

The Catch(es)● Lack of atomic operations
o + millisecond time granularity
● Bad support for high cardinality “tags”
● Headache managing Cassandra in AWS

Cassandra Schema
http://prezi.com/ajkjic0jdws3/kairosdb-cassandra-schema/

Cassandra Schema
http://prezi.com/ajkjic0jdws3/kairosdb-cassandra-schema/

Custom Data
[ { "name": "archive_file_tracked", "datapoints": [[1359788400000, "value,metadata,...", "string"]], "tags": { "host": "server1", "data_center": "DC1" } }]
https://github.com/proofpoint/kairosdb/tree/feature/custom_data

Custom Data
[ { "name": "archive_file_tracked", "datapoints": [[1359788400000, "value,metadata,...", "string"]], "tags": { "host": "server1", "data_center": "DC1" } }]
https://github.com/proofpoint/kairosdb/tree/feature/custom_data

The Catch(es)● Lack of atomic operations
o + millisecond time granularity
● Bad support for high cardinality “tags”
● Headache managing Cassandra in AWS

Pieces of the Solution● Shard the data
o avoids concurrency race conditions
● Pre-aggregationo solves time-granularity issue
● Stream processing, exactly-once semantics

Queue/Worker Stream Processing
https://www.youtube.com/watch?v=bdps8tE0gYo

Enter Storm/Trident

StormStormStorms
StormStormKafkas
StormStormZoos
StormStormStormStormApp Server 1
suro
StormStormMongosStormStormKairoses
StormStormStormStormApp Server 2
suro
Stats 1.5 Stats 2.0
Stats 1.5 Stats 2.0
STA
TS P
IPE
LIN
E
1.5 2.0
1.5 2.0

groupBy(timeRange, metric, tags)
Kafka Broker
Partition
Kafka Broker
Partition
Kafka Spout Kafka Spout
Transforms
30s Writer Bolt30s Writer Bolt30m Writer Bolt30s Writer Bolt30s Writer Bolt30s Writer Bolt
Kafka Layer
Spout Layer
Transform Layer
Persistence Layer
shuffle()
round-robin(?)
round-robin(haproxy)
KairosDB Layer KairosDB Cluster






Pieces of the Solution● Shard the data
o avoids concurrency race conditions
● Pre-aggregationo solves time-granularity issue
● Stream processing, exactly-once semantics

Pieces of the Solution● Shard the data
o avoids concurrency race conditions
● Pre-aggregationo solves time-granularity issue
● Stream processing, exactly-once semantics


Pieces of the Solution● Shard the data
o avoids concurrency race conditions
● Pre-aggregationo solves time-granularity issue
● Stream processing, exactly-once semantics


Pieces of the Solution● Shard the data
o avoids concurrency race conditions
● Pre-aggregationo solves time-granularity issue
● Stream processing, exactly-once semantics

Non-Transactional 123
Transactional “[9, 123]”
Opaque Transactional “[9, 123, 120]”

Transactional Tags[ { "name": "archive_file_tracked", "datapoints": [[1359788400000, 123]], "tags": { "txid": 9, "prev": 120 } }]




The Catch(es)● Lack of atomic operations
o + millisecond time granularity
● Bad support for high cardinality “tags”
● Headache managing Cassandra in AWS

Does It Work?
… the counts still match! (whew)

Average latency remains < 10 seconds

Stats 1.0 vs Stats 1.5 Performance
Replacing 9 mongo sets with 2

These following slides weren’t presented at Gluecon.You may find them interesting anyway. :)

Bolt
Bolt
SpoutBolt
spout each
group bypersistent aggregate
eachshuffle
TridentState
each
each
each each
each each
group bypersistent aggregate
TridentState
Trident → Storm TopologyCompilation


23
24

Tuning Rules
1. Number of workers should be a multiple of number of machines
2. Number of partitions should be a multiple of spout parallelism
3. Parallelism should be a multiple of number of workers
4. Persistence parallelism should be equal to the number of workers

multi get:values of((ts1, metric1), (ts2, metric2), (ts2, metric3))
reducer / combiner
multi put:values of((ts1, metric1), (ts2, metric2), (ts2, metric3))
multi get:values of((ts2, metric1))
reducer / combiner
multi put:values of((ts1, metric1), (ts2, metric2), (ts2, metric3))
multi get:values of((ts4, metric2), (ts3, metric4))
reducer / combiner
multi put:values of((ts1, metric1), (ts2, metric2), (ts2, metric3))
group by (ts, metric)
http://svendvanderveken.wordpress.com/2013/07/30/scalable-real-time-state-update-with-storm/
Batch from Kafka Persistent Aggregate
value = ...
(ts1, metric1)
value = ...
(ts2, metric2)
value = ...
(ts2, metric3)
value = ...
(ts2, metric1)
value = ...
(ts4, metric2)
value = ...
(ts3, metric4)

multi get:values of((ts1, metric1), (ts2, metric2), (ts2, metric3))
reducer / combiner
multi put:values of((ts1, metric1), (ts2, metric2), (ts2, metric3))
multi get:values of((ts2, metric1))
reducer / combiner
multi put:values of((ts1, metric1), (ts2, metric2), (ts2, metric3))
multi get:values of((ts4, metric2), (ts3, metric4))
reducer / combiner
multi put:values of((ts1, metric1), (ts2, metric2), (ts2, metric3))
group by (ts, metric)
value = ...
(ts1, metric1)
value = ...
(ts2, metric2)
value = ...
(ts2, metric3)
value = ...
(ts2, metric1)
value = ...
(ts4, metric2)
value = ...
(ts3, metric4)
http://svendvanderveken.wordpress.com/2013/07/30/scalable-real-time-state-update-with-storm/
Persistent AggregateBatch from Kafka

multi get:values of((ts1, metric1), (ts2, metric2), (ts2, metric3))
reducer / combiner
reducer / combiner
reducer / combiner
value = ... value = ... value = ...
(ts1, metric1) (ts1, metric1) (ts1, metric1)
value = ... value = ...
(ts2, metric3)
value = ...
(ts2, metric2)
(ts2, metric3)
value = ...
(ts2, metric3)
value = ...
(ts2, metric2) multi put:values of((ts1, metric1), (ts2, metric2), (ts2, metric3))
http://svendvanderveken.wordpress.com/2013/07/30/scalable-real-time-state-update-with-storm/
From the batchFrom the underlying persistent state