Integrating Scale Out and Fault Tolerance in Stream Processing using Operator State Management
Author: Raul Castro Fernandez, Matteo Migliavacca, et al.Published conference: SIGMOD’13Reporter: Ma Yuanwen
Introduction
• Stream– A sequence of tuples– Sensor network, stock trading system
• Query plan– A query is specified as a directed acyclic graph
• Distributed stream processing– A query is deployed on a set of nodes
• State management– Checkpoint, back up, restore, partition
• Scale out– Split the instance, when the instance is overload
• Fault tolerance– Recover from failures without affecting processing results
Outline
• Background– Problem Statement– System Model
• State Management– Query state– State Operations
• Scale out and fault tolerance– Fault tolerance scale out algorithm– System architecture– Bottleneck detection and scaling policy
• Evaluation• Conclusions
Problem Statement
• Operators– Stateless operators (e.g. filter or map) and stateful operators(e.g. join
or aggregation )– Sliding window based state– Entire history based state
• Intra-query parallelism– Query graph and execution graph
• Fault tolerance– Passive standby strategy– Active standby strategy– Upstream backup strategy
Report the words frequencies in the recent 1 hour about every 10 minutes
System model(1)
• Data model– A infinite sequence of tuples– Tuple
Where is timestamp, is key field, is a payload, is not unique and usedto partition tuples
• Operator model– Tuples are processed by operators– Operator function
where is the input streams, is the output streams, is operator state are the timestamps of the most recent tuple already processed, specifies the oldest tuples that affected the state
name Age sex
Li Lei 16 maleHan Mei 15 female
Jim 17 male
name age sexLi Lei 16 male
name age sexHan Mei 15 female
name age sexJim 17 male
System model(2)
• Query model– A query is specified as a directed acyclic query graph , where is the set
of operators and is the set of streams– is upstream to, , when – is downstream to , ,
• Query execution model– A query is deployed on a set of nodes– maybe parallelized into a set of partitioned operators , and is the
parallelization level of. takes as input a partitioned stream
Query state (1)
• Processing state, denoted by – is an internal summary of history of input tuples– is computed from all past tuples with – is a set of key/value pairs, – User defined function:
Query state (2)
• Buffer state– Store tuples that have not been processed by downstream operators– Reprocessed tuples in the buffer after failure– Buffer state with – Tuples are discarded after they are no longer needed for recovery
• Routing state– Use to decide to which partitioned operator to route a tuple– Routing state: which maps the keys to partitioned downstream
operator
State operations
• Operator state backup and restore– Checkpoint the state of an operator and backup the state to an
upstream operator– Restore state for failure and scale out
• Operator state partitioning– When a stateful operator scales out, it’s processing state must be split
across the new partitioned operators
Operator state backup and restore
• Checkpoint state– A representation of processing state and the buffer state
– Triggered every checkpoint interval – Routing state is maintained by query manager
• Backup state– Backup the operator state to an upstream operator
• Restore state– Restore the backup state to another operator to recover a failed
operator or to redistribute state across partitioned operators
Operator state partitioning
• Partition state for scale out– Partition routing state– Partition an operator for new partitioned operators – Partition buffer state
Scale out and Fault Tolerance
• Scale out– SPS partitions operator on-demand in response to bottleneck
operators
• Fault Tolerance– If a node hosting an operator fails, the SPS must replace it with an operatoron a new node
Operator recover becomes special case of scale out, in which a failed Operator is scale out to a parallelization of 1
Fault-tolerant scale out algorithm
System architecture
• Query manager– Perform a mapping of query operators to nodes and maintain the
execution graph
• Deployment manager– use the execution graph to initialize nodes, deploy operators, set up
stream communication and start processing
Bottleneck detection and scaling policy
• Collect monitoring information– Every seconds, nodes hosting operators submit CPU utilization reports
to the bottleneck detector
• Scaling policy– When consecutive reports from an operator are above a user-defined
threshold , the bottleneck detector notifies the scale out coordinator to parallelize the operator
Goals and deployment of evaluation
• The goals of experimental evaluation are to investigate– The effectiveness of stateful operator scale out approach – The recovery time of the stateful recovery mechanism– The impact of state management approach on tuple processing
latency
• Experiment deployment
Experiment data
• Linear road benchmark (LRB)– It models a road toll network– Queries: (1) Provide toll notifications to vehicles within 5s; (2) detect
accidents within 5s; (3) answer balance account queries about paid toll amounts
– The input rate for a single express-way (L=1) begins at 15 tuples/s and increase to 1700 tuples/s
• Wikipedia– A map/reduce-style top-k queryThat outputs every 30 seconds the ranking of the most visited Wikipedia language versions based on Wikipedia data traces
Dynamic scale out (1)
Dynamic scale out (2)
Failure recovery
• Word count
State management overhead
Conclusions
• Provide state management of stateful operators– Checkpoint, back up, restore, partition
• Present an integrated approach for scale out and failure recovery
Thank you