building a scalable distributed stats infrastructure with storm and kairosdb

Download Building a Scalable Distributed Stats Infrastructure with Storm and KairosDB

Post on 19-Aug-2014

1.017 views

Category:

Engineering

4 download

Embed Size (px)

DESCRIPTION

Building a Scalable Distributed Stats Infrastructure with Storm and KairosDB Many startups collect and display stats and other time-series data for their users. A supposedly-simple NoSQL option such as MongoDB is often chosen to get started... which soon becomes 50 distributed replica sets as volume increases. This talk describes how we designed a scalable distributed stats infrastructure from the ground up. KairosDB, a rewrite of OpenTSDB built on top of Cassandra, provides a solid foundation for storing time-series data. Unfortunately, though, it has some limitations: millisecond time granularity and lack of atomic upsert operations which make counting (critical to any stats infrastructure) a challenge. Additionally, running KairosDB atop Cassandra inside AWS brings its own set of challenges, such as managing Cassandra seeds and AWS security groups as you grow or shrink your Cassandra ring. In this deep-dive talk, we explore how we've used a mix of open-source and in-house tools to tackle these challenges and build a robust, scalable, distributed stats infrastructure.

TRANSCRIPT

  • 1 Stat, 2 Stat, 3 Stat A Trillion Cody A. Ray Dev-Ops @ BrightTag
  • Outline 1. Initial Attempt: MongoDB 2. Ideal Stats System: KairosDB? 3. Making KairosDB Work for Us
  • What Kind of Stats? Counting! sum, min, max, etc Any recurrence relation: yn = f(x, y0, , yn-1)
  • The First Pass: MongoDB JSON Documents, Schema-less, Flexible Aggregation Pipeline, MapReduce Master-Slave Replication Atomic Operators!
  • http://fearlessdeveloper.com/race-condition-java-concurrency/ read counter counter = 0 read counter counter = 0 increment value by 1 increment value by 1 write value to counter = 1 write value to counter = 1 incorrect value of counter = 1
  • Simple, Right? Whats the Problem? Only 3500 writes/second! (m1.large) up to 7000 wps (with m1.xlarge)
  • Scale Horizontally?
  • Redundancy Mongo Explosion!!!
  • Feel the Pain Scale 3x. 3x != x. Big-O be damned. Managing 50+ Mongo replica sets globally 10s of $1000s of dollars wasted each year
  • Ideal Stats System? Linearly scalable time-series database Store arbitrary metrics and metadata Support aggregations, other complex queries Bonus points for o good for storing both application and system metrics o Graphite web integration
  • Enter KairosDB fast distributed scalable time series db General metric storage and retrieval Based upon Cassandra o linearly scalable o tuned for fast writes o eventually consistent, tunable replication
  • Adding Data [ { "name": "archive_file_tracked", "datapoints": [[1359788400000, 123]], "tags": { "host": "server1", "data_center": "DC1" } } ]
  • Querying Data { "start_absolute": 1357023600000, "end_absolute": 1357455600000 "metrics": [{ "name": "abc.123", "tags": { "host": ["foo", "foo2"], "type": ["bar"] }, "aggregators": [{ "name": "sum", "sampling": { "value": 10, "unit": "minutes" }}]}]}
  • The Catch(es) Lack of atomic operations o + millisecond time granularity Bad support for high cardinality tags Headache managing Cassandra in AWS
  • The Catch(es) Lack of atomic operations o + millisecond time granularity Bad support for high cardinality tags Headache managing Cassandra in AWS
  • Cassandra on AWS
  • Agathon
  • The Catch(es) Lack of atomic operations o + millisecond time granularity Bad support for high cardinality tags Headache managing Cassandra in AWS
  • Cassandra Schema http://prezi.com/ajkjic0jdws3/kairosdb-cassandra-schema/
  • Cassandra Schema http://prezi.com/ajkjic0jdws3/kairosdb-cassandra-schema/
  • Custom Data [ { "name": "archive_file_tracked", "datapoints": [[1359788400000, "value,metadata,...", "string"]], "tags": { "host": "server1", "data_center": "DC1" } } ] https://github.com/proofpoint/kairosdb/tree/feature/custom_data
  • Custom Data [ { "name": "archive_file_tracked", "datapoints": [[1359788400000, "value,metadata,...", "string"]], "tags": { "host": "server1", "data_center": "DC1" } } ] https://github.com/proofpoint/kairosdb/tree/feature/custom_data
  • The Catch(es) Lack of atomic operations o + millisecond time granularity Bad support for high cardinality tags Headache managing Cassandra in AWS
  • Pieces of the Solution Shard the data o avoids concurrency race conditions Pre-aggregation o solves time-granularity issue Stream processing, exactly-once semantics
  • Queue/Worker Stream Processing https://www.youtube.com/watch?v=bdps8tE0gY o
  • Enter Storm/Trident
  • StormStormStorms StormStormKafkas StormStormZoos StormStormStormStorm App Server 1 suro StormStormMongos StormStormKairoses StormStormStormStorm App Server 2 suro Stats 1.5 Stats 2.0 Stats 1.5 Stats 2.0 1.5 2.0 1.5 2.0
  • groupBy(timeRange, metric, tags) Kafka Broker Partition Kafka Broker Partition Kafka Spout Kafka Spout Transforms 30s Writer Bolt30s Writer Bolt30m Writer Bolt 30s Writer Bolt30s Writer Bolt30s Writer Bolt Kafka Layer Spout Layer Transform Layer Persistence Layer shuffle() round-robin(?) round-robin (haproxy) KairosDB Layer KairosDB Cluster
  • Pieces of the Solution Shard the data o avoids concurrency race conditions Pre-aggregation o solves time-granularity issue Stream processing, exactly-once semantics
  • Pieces of the Solution Shard the data o avoids concurrency race conditions Pre-aggregation o solves time-granularity issue Stream processing, exactly-once semantics
  • Pieces of the Solution Shard the data o avoids concurrency race conditions Pre-aggregation o solves time-granularity issue Stream processing, exactly-once semantics
  • Pieces of the Solution Shard the data o avoids concurrency race conditions Pre-aggregation o solves time-granularity issue Stream processing, exactly-once semantics
  • Non-Transactional 123 Transactional [9, 123] Opaque Transactional [9, 123, 120]
  • Transactional Tags [ { "name": "archive_file_tracked", "datapoints": [[1359788400000, 123]], "tags": { "txid": 9, "prev": 120 } } ]
  • The Catch(es) Lack of atomic operations o + millisecond time granularity Bad support for high cardinality tags Headache managing Cassandra in AWS
  • Does It Work? the counts still match! (whew)
  • Average latency remains < 10 seconds
  • Stats 1.0 vs Stats 1.5 Performance Replacing 9 mongo sets with 2
  • Cody A. Ray, BrightTag cray@brighttag.com Open Source: github.com/brighttag Slides: bit.ly/gluecon-stats
  • These following slides werent presented at Gluecon. You may find them interesting anyway. :)
  • Bolt Bolt Spout Bolt spout each group by persistent aggregate eachshuffle Trident State each each each each each each group by persistent aggregate Trident State Trident Storm Topology Compilation
  • 2 3 2 4
  • Tuning Rules 1. Number of workers should be a multiple of number of machines 1. Number of partitions should be a multiple of spout parallelism 1. Parallelism should be a multiple of number of workers 1. Persistence parallelism should be equal to the number of workers
  • multi get: values of ((ts1, metric1), (ts2, metric2), (ts2, metric3)) reducer / combiner multi put: values of ((ts1, metric1), (ts2, metric2), (ts2, metric3)) multi get: values of ((ts2, metric1)) reducer / combiner multi put: values of ((ts1, metric1), (ts2, metric2), (ts2, metric3)) multi get: values of ((ts4, metric2), (ts3, metric4)) reducer / combiner multi put: values of ((ts1, metric1), (ts2, metric2), (ts2, metric3)) group by (ts, metric) http://svendvanderveken.wordpress.com/2013/07/30/scalable-real-time-state-update-with- storm/ Batch from Kafka Persistent Aggregate value = ... (ts1, metric1) value = ... (ts2, metric2) value = ... (ts2, metric3) value = ... (ts2, metric1) value = ... (ts4, metric2) value = ... (ts3, metric4)
  • multi get: values of ((ts1, metric1), (ts2, metric2), (ts2, metric3)) reducer / combiner multi put: values of ((ts1, metric1), (ts2, metric2), (ts2, metric3)) multi get: values of ((ts2, metric1)) reducer / combiner multi put: values of ((ts1, metric1), (ts2, metric2), (ts2, metric3)) multi get: values of ((ts4, metric2), (ts3, metric4)) reducer / combiner multi put: values of ((ts1, metric1), (ts2, metric2), (ts2, metric3)) group by (ts, metric) value = ... (ts1, metric1) value = ... (ts2, metr