The Design of the Borealis Stream Processing Engine
Brandeis University, Brown University, MIT
Magdalena Balazinska Nesime Tatbul
MIT Brown
Distributed Stream Processing
DataSources
ClientApps
U
Node 1 Node 2
Node 3 Node 4
U
Data sources push tuples continuously Operators process windows of tuples
Where are we today? Data models, operators, query languages Efficient single-site processing Single-site resource management [STREAM, TelegraphCQ, NiagaraCQ, Gigascope, Aurora]
Basic distributed systems Servers [TelegraphCQ, Medusa/Aurora] or sensor networks [TinyDB, Cougar] Tolerance to node failures Simple load management
Challenges Tuple revisions
Revisions on input streams Fault-tolerance
Dynamic & distributed query optimization ...
Causes for Tuple Revisions Data sources revise input streams: “On occasion, data feeds put out a faulty price
[...] and send a correction within a few hours” [MarketBrowser]
Temporary overloads cause tuple drops Temporary failures cause tuple drops
Averageprice
1 hour
(1pm,$10)(2pm,$12)(3pm,$11)(4:05,$11)(4:15,$10)(2:25,$9)
Current Data Model
time: tuple timestamp
header data
(time,a1,...,an)
New Data Model for Revisions
time: tuple timestamp type: tuple type
insertion, deletion, replacement id: unique identifier of tuple on stream
header data
(time,type,id,a1,...,an)
Revisions: Design Options Restart query network from a checkpoint Let operators deal with revisions
Operators must keep their own history (Some) streams can keep history
Revision Processing in Borealis Closed model: revisions produce revisions
Averageprice
1 hour
(1pm,$10)(2pm,$12)(3pm,$11)(2:25,$9)
Averageprice
1 hour
(2pm,$12)(3pm,$11)(2pm,$11)
Revision Processing in Borealis Connection points (CPs) store history Operators pull the history they need
Operator
CP
Oldest tuplein history
Most recent tuple in history
Revision
Input queueStream
Fault-Tolerance through Replication Goal: Tolerate node and network failures
U
Node 1’
Node 2’
Node 3’
U
s1
s2
Node 1
U
Node 3
U Node 2
s3
s4
s3’
Reconciliation State reconciliation alternatives
Propagate tuples as revisions Restart query network from a checkpoint Propagate UNDO tuple
Output stream revision alternativesCorrect individual tuplesStream of deletions followed by insertions Single UNDO tuple followed by insertions
Fault-Tolerance Approach If an input stream fails, find another replica No replica available, produce tentative tuples Correct tentative results after failures
STABLEUPSTREAM
FAILURE
STABILIZING
Missing or tentative inputs
Failu
re h
eals
Anoth
er u
pstre
am
failu
re in
pro
gres
sReconcile state
Corrected output
Challenges Tuple revisions
Revisions on input streams Fault-tolerance
Dynamic & distributed query optimization ...
Optimization in a Distributed SPE Goal: Optimized resource allocation Challenges:
Wide variation in resources High-end servers vs. tiny sensors
Multiple resources involved CPU, memory, I/O, bandwidth, power
Dynamic environment Changing input load and resource availability
Scalability Query network size, number of nodes
Quality of Service A mechanism to drive resource allocation Aurora model
QoS functions at query end-points Problem: need to infer QoS at upstream nodes
An alternative model Vector of Metrics (VM) carried in tuples Operators can change VM A Score Function to rank tuples based on VM Optimizers can keep and use statistics on VM
Example Application: Warfighter Physiologic Status Monitoring (WPSM)
Area State Confidence
ThermalHydrationCognitiveLife SignsWound Detection
90%60%100%90%80%
Physiologic Models
Ranking Tuples in WPSM
SF(VM) = VM.confidence x ADF(VM.age)
age decay function
Score Function
Sensors
Model1
Model2
([age, confidence], value)
VM
Merge
Models may change confidence.
HRate
RRate
Temp
Borealis Optimizer Hierarchy
End-point Monitor(s) Global Optimizer
Local Monitor
Local Optimizer
Neighborhood Optimizer
Borealis Node
Optimization Tactics
Priority scheduling Modification of query plans
Commuting operatorsUsing alternate operator implementations
Allocation of query fragments to nodes Load shedding
Correlation-based Load Distribution Goal: Minimize end-to-end latency Key ideas:
Balance load across nodes to avoid overload Group boxes with small load correlation together Maximize load correlation among nodes
Connected Plan
A BS1
C DS2
r
2r
2cr
4cr
Cut Plan
S1
S2
A B
C D
r
2r
3cr 3cr
Load Shedding
Goal: Remove excess load at all nodes and links Shedding at node A relieves its descendants Distributed load shedding
Neighbors exchange load statistics Parent nodes shed load on behalf of children Uniform treatment of CPU and bandwidth problem
Load balancing or Load shedding?
Local Load Shedding
Plan Loss
Local App1: 25% App2: 58%
CPU Load = 5Cr1, Cap = 4Cr1 CPU Load = 4Cr2, Cap = 2Cr2
A must shed 20% B must shed 50%
4C C
C 3C
r1
r1
r2
r2
App1
App2
Node A Node B
Goal:Minimize total loss
25%
58%
CPU Load = 3.75Cr2, Cap = 2Cr2
B must shed 47% No overload No overload
A must shed 20% A must shed 20%
r2’
Distributed Load Shedding
CPU Load = 5Cr1, Cap = 4Cr1 CPU Load = 4Cr2, Cap = 2Cr2
A must shed 20% B must shed 50%
4C C
C 3C
r1
r1
r2
r2
App1
App2
Node A Node B
9%
64%
Plan Loss
Local App1: 25% App2: 58%
Distributed App1: 9% App2: 64%smaller
total loss!
No overload No overload
A must shed 20% A must shed 20%
r2’
r2’’
Goal:Minimize total loss
Extending Optimization to Sensor Nets Sensor proxy as an interface Moving operators in and out of the sensor net Adjusting sensor sampling rates
The WPSM Example:
Proxy
data
control
Merge
Sen
sors
Models
control message from the optimizer
Conclusions Next generation streaming applications require a flexible
processing model Distributed operation Dynamic result and query modification Dynamic and scalable optimization Server and sensor network integration Tolerance to node and network failures
Borealis has been iteratively designed, driven by real applications
First prototype release planned for Spring’05 http://nms.lcs.mit.edu/projects/borealis