building large-scale analytics platform with storm, kafka and cassandra - nyc storm user group...

38
INSTREAM PROCESSING WITH KAFKA, STORM, CASSANDRA Niagara Team Alexey Kharlamov Evelina Stepanova Rafael Bagmanov KonstanGn Golikov Anatoliy Vinogradov Integral Ad Science Kiril Tsemekhman Rahul Ratnakar

Upload: alexey-kharlamov

Post on 01-Dec-2014

4.692 views

Category:

Technology


1 download

DESCRIPTION

At Integral, we process heavy volumes of click-stream traffic. 50K QPS of ad impressions at peak and close to 200K QPS of all browser calls. We build analytics on this streams of data. There are two applications which require quite significant computational effort: 'sessionization' and fraud detection. Sessionization implies linking a series of requests from same browser into single record. There can be 5 or more total requests spread over 15-30 minutes which we need to link to each other. Fraud detection is a process looking at various signals in browser requests and at substantial historical evidence data classifying ad impression either as legitimate or as fraudulent. We've been doing both (as well as all other analytics) in batch mode once an hour at best. Both processes, and, in particular, fraud detection, are time sensitive and much more meaningful if done in near-real-time. This talk would be about our experience migrating a once-per-day offline batch processing of impression data using hadoop to in-memory stream processing using Kafka, Storm and Cassandra. We will touch upon our choices and our reasoning for selecting the products used for this solution. Hadoop is no longer the only or always preferred option in Big Data space. In-memory stream processing may be more effective for time series data preparation and aggregation. Ability to scale at a significantly lower cost means more customers, better accuracy and better business practices: since only in-stream processing allows for low-latency data and insight delivery it opens entirely new opportunities. However, transitioning of non-trivial data pipelines raises a number of questions hidden previously within the offline nature of batch processing. How will you join several data feeds? How will you implement failure recovery? In addition to handling terabytes of data per day our streaming system has to be guided by the following considerations: • Recovery time • Time relativity and continuity • Geographical distribution of data sources • Limit on data loss • Maintainability The system produces complex cross-correlational analysis of several data feeds and aggregation for client analytics with input feed frequency of up to 100K msg/sec. This presentation will benefit anyone interested in learning an alternate approach for big data analytics, especially the process of joining multiple streams in memory using Cassandra. Presentation will also highlight certain optimization patterns used those can be useful in similar situations.

TRANSCRIPT

  • 1. IN-STREAM PROCESSING WITH KAFKA, STORM, CASSANDRA Integral Ad Science Niagara Team Kiril Tsemekhman Alexey Kharlamov Rahul Ratnakar Evelina Stepanova Rafael Bagmanov KonstanGn Golikov Anatoliy Vinogradov
  • 2. Business goals Real-Gme data availability Near-real-Gme update of fraud models Controlled data delay BePer hardware scaling Summarizing data as close to source as possible BePer network uGlizaGon and reliability
  • 3. Data ow 1.6B records/day Sustained 100KQPS Peak 200KQPS s og QL Score Join Filter Initial Events Aggregate Reporting Database DT Events Ev ida Reports nc e
  • 4. Data ow Hadoop ineciency hypothesis Large batch architecture for oine processing Hadoops shue phase dumps data to disks Several Gmes in some cases!!! AcGve dataset ts into cluster memory SessionizaGon 10s of GB AggregaGon 10s of GB RAM is 1000s Gmes faster than HDD
  • 5. In-stream processing Benets Immediately available results Results are delivered with controlled delay (15 mins 1hour) Time-sensiGve models (e.g. fraud) are updated in near real-Gme Data can be delivered to clients immediately Ecient resource uGlizaGon BePer scaling coecient Smoother workload and bandwidth distribuGon Less resource overprovisioning
  • 6. Non-FuncGonal Requirements Horizontal scalability Limit on data loss (less than 0.1%) Tolerance to single node failure Ops guys will sleep bePer at night It happens!!! Easy recovery Maintenance No data loss on deployment Monitoring & alerGng
  • 7. Storm/Trident/Kaea/C* Hybrid soluGon Ev en ts Events n Eve Storm QLogs ts Kafka Kafka Exporter Cassandra Reports Reporting DB
  • 8. Storm/Trident/Kaea/C* Reliable processing Storm & Trident transacGons Data are processed by micro-batches (transacGons) External storage used to keep state between transacGons AutomaGc rollback to last checkpoint Kaea distributed queue manager Data feed replay for retry or recovery Load spikes are smoothed Cross-DC replicaGon Cassandra Key-value store for de-duplicaGon Resilience based on replicaGon
  • 9. Our Storm distribuGon Storm High-Availability Share Nimbus state through distributed cache Metrics streaming to Graphite Bug xes Packaging into RPM/DEB
  • 10. Data Sources Frontend Server Server Tailer Agent Msg ... Msg Mark Log Files Check point Hard latency requirements 10ms response Read logs produced by front- end servers Periodic checkpoints Older data dropped
  • 11. Message JiPer Time Server 1 Server 2 Server 3 Data feed Messages are arbitrarily reordered Use jiPer buer and drop outliers
  • 12. Data ow 1.6B records/day Sustained 100KQPS Peak 200KQPS s og QL Score Join Filter Initial Events Aggregate Reporting Database DT Events Ev ida Reports nc e
  • 13. Robot ltering Blacklist/whitelist based Simultaneous mulG-paPern matching with Aho-Corasick algorithm 60x improvement over frequency opGmized String.indexOf
  • 14. Join (SessionizaGon) Algorithm Init DT DT Timeout DT Impression 1 Join Window Init DT DT DT Unload Emit Impression 2 Emit DT DT DT Timeout Impression 3 Drop Transaction 1 Transaction 2 Time
  • 15. Recovery strategy Rollback Transaction N Processing/Failure point Data Feed Checkpoint N - 1 Checkpoint N Read data in batches (transacGon) On success write checkpoint On failure return to previous checkpoint On catastrophic failure rewind data feed to a point before the problem started
  • 16. Logical Gme 12:04 12:01 12:00 12:05 12:00 11:59 11:58 12:00 11:59 12:00 Wall-clock does not work Late Delivery Read point Load spikes Recovery rewinds data feed to previous Gme Logical clock Maximum Gmestamp seen by Bolt New messages with smaller Gmestamp are late No clock synchronizaGon All bolts are in weak synchrony
  • 17. Join topology Reads data from Kaea Parse logs and gets ASID Bolt Joins events with DT by ASID tx1:ASID1:Event Store joins for fast failover Bolt ASID1: Event,DT tx1:event logs batch Spout Bolt tx1:ASID1:DT ASID1 : Event, DTs ASID2 : Event, DTs ... tx1:event logs batch ASID2:Event Bolt Bolt tx1:ASID2:Event
  • 18. Join (SessionizaGon) topology Flow Microbatches of iniGal and ping events are read from Kaea Map ASID -> TTL, {IniGal, ping_x} is stored in memory STATE is mirrored to Cassandra!!! Failure recovery Lost state is recovered from Cassandra FinalizaGon on each transacGon: Process state is commiPed and messages are evicted to Kaea Evicted messages are deleted from Cassandra
  • 19. Failure TX: 1 Retry TX: 2 TX: 2R APC-1-1-1 100 APC-1-1-2 57 37 . . . . . . . . . APC-2-1-4 14 214 Transaction 1 TX: 3 32 Transaction 2 46 37 28 . . . 214 . . . 82 Transaction 3
  • 20. AggregaGon Flow Group events by reporGng dimensions Count events in groups Write counters to C*: Row key = PREFIX(GROUPID) Column key = SUFFIX(GROUPID) + TXID Value = COUNT(*) Failure recovery Overwrite results of failed batch in Cassandra FinalizaGon Read data from Cassandra by parallel extractor
  • 21. Cassandra Data Export Delayed data export to accommodate ji;er Parallel read from several points on the ring On major incident recovery re-export data Dirty hack - aggregaEon topology should stream data to analyEcal database directly Read point Re p ad oin t Cassandra Nodes R ea d po i nt
  • 22. APC aggregaGon topology (snippet) Stream stream = topology. newStream("input-topic-reading-spout", new OpaqueTridentKaOaSpout(cong.groupingTridentKaOaCong())). shue(). each(elds("transformed-event"), new ExtractFields("niagara.storm.adpoc.group.ApcGroupingFieldsParser"), elds("grouping")) stream.groupBy(elds("bucket", "grouping")).name("aggregaEon"). persistentAggregate( /* cassandra store */), elds("transformed-event"), new FirewallRecordCount(false), elds("value"))
  • 23. Storm Storm Storm hi1.4xlarge hi1.4xlarge Amazon S3 Reader hi1.4xlarge hi1.4xlarge C* hi1.4xlarge hi1.4xlarge hi1.4xlarge Performance environment C* C*
  • 24. Performance environment - VMs Instance type Spec vCPU cores Memory (GB) Storage (GB) 6 hi1.4xlarge 16 60 2 x 1,024 SSD Network 10Gbit/sec SSD 16000 IOPS SequenGal write 300MB/sec x 2
  • 25. AggregaGon: Apc + S05 + L04
  • 26. AggregaGon Write performance Report Storm Cassandra Size, KNet, Net, MB/ Disk W, Name T, K/s CPU, % CPU, % rows MB/s s MB/s apc 29 220 83 127 101.5/1.5 1,3 l04 249 200 80 125 125/2.5 5 s05 7400 120 85 80 4015/7 30 United 7678 110 87 70 5015/7 36 Network saturaGon observed Throughput 10x required value
  • 27. AggregaGon Read performance Report S05 S05 APC L04 Size, Time, Write, KM/ Krows Threads sec sec 7400 10 95 60 7400 3 150 80 29 3 15 120 249 3 25 120 Read of 1 hour worth of data takes 2.5 minutes Moderate performance degradaGon observed
  • 28. AggregaGon Long transacGons Pros BePer aggregaGon raGo Faster report export Cons Batch type of workload (spiky) Longer recovery Test results Performance up to 200 Kmsg/sec Cassandra unstable due to node failures (GC)
  • 29. Join Topology Output Stream Krecord/sec 30 28 25 20 20 15 12.5 13 C: ON, U: OFF, S:ON C: ON, U: ON, S:ON KMsg/psec 10 5 0 C: OFF, U: ON, S:ON C: OFF, U: ON, S:OFF
  • 30. Join Topology In-memory state size Stream frequency 9.7 Kmsg/lsec Join window 15 minutes State, State size Unload K/task Mb/task Total heap, GB Total state, GB OFF 500 500 60 24 ON 230 240 36 11.5
  • 31. m2.4xlarge hi1.4xlarge Storm hi1.4xlarge m2.4xlarge hi1.4xlarge hi1.4xlarge m2.4xlarge hi1.4xlarge C* hi1.4xlarge Scalability Test environment Storm Storm C* C*
  • 32. Scalability Test results Output Stream(vCPU), Kmsg/sec 50 45 43 40 35 30 27 25 C:OFF C:ON 20 16 15 10 9 6 5 0 24 48 72
  • 33. Join topology - Conclusion Linear scaling limited by CPU 72GB RAM for 10Kmsg/sec 30 min window 6 hi1.4xlarge instances can process 16 Kmsg/sec (lower bound) 28 Kmsg/sec (upper bound) OpGmizaGon Stateless join topology
  • 34. Topology performance consideraGons Avoid text and regular expressions. POJO are friends Network bandwidth is important (saturated at 1Gb/s) Parsing is heavy (deserializaGon included) Track cases when tuple crosses worker boundary Keep spouts separate from parsers Lazy parsing (extract elds when you really have to) Less garbage with shorter lifeGme Easier proling
  • 35. Cassandra performance Garbage Collector is main performance limit Cassandra uses Concurrent-Mark-Sweep (CMS) by default If CMS cannot free enough space, JVM will fall back to single- threaded serial collector (stop the world) With 12 GB heap size serial collector might take up to 60 seconds
  • 36. Cassandra: conclusions Garbage collecGon Spiky (batch type) workload is bad for C* The smaller the cluster, the less heavy-write column families you can have Wide tables are bePer than narrow tables 1 row with 10 columns is bePer than 10 rows with 1 column in terms of throughput
  • 37. QuesGons? Alexey Kharlamov @aih1013 [email protected] Kiril Tsemekhman [email protected] Rahul Ratnakar [email protected]