adaptive processing in data stream systems
DESCRIPTION
Adaptive Processing in Data Stream Systems. Shivnath Babu. Stanford University. st anfordst re amdat am anager. Data Streams. New applications -- data as continuous, rapid, time-varying data streams Sensor networks, RFID tags Network monitoring and traffic engineering - PowerPoint PPT PresentationTRANSCRIPT
Adaptive Processing in Data Adaptive Processing in Data Stream SystemsStream Systems
Shivnath BabuShivnath Babu
stanfordstreamdatamanager
Stanford University
stanfordstreamdatamanager 2
Data StreamsData Streams• New applications -- data as continuous, rapid,
time-varying data streamsdata streams– Sensor networks, RFID tags– Network monitoring and traffic engineering– Financial applications– Telecom call records– Web logs and click-streams– Manufacturing processes
• Traditional databases -- data stored in finite, persistent data sets data sets
stanfordstreamdatamanager 3
Using Traditional DatabaseUsing Traditional Database
User/ApplicationUser/Application
LoaderLoader
QueryQuery ResultResult
ResultResult……
QueryQuery……
Table R
Table S
stanfordstreamdatamanager 4
New Approach for Data StreamsNew Approach for Data Streams
User/ApplicationUser/Application
Register Register Continuous Continuous
QueryQuery
Stream QueryProcessor
ResultResult
Input streams
stanfordstreamdatamanager 5
Example Continuous QueriesExample Continuous Queries
• Web
– Amazon’s best sellers over last hour
• Network Intrusion Detection– Track HTTP packets with destination address
matching a prefix in given table and content matching “*\.ida”
• Finance – Monitor NASDAQ stocks between $20 and $200 that
have moved down more than 2% in the last 20 minutes
stanfordstreamdatamanager 6
Data StreamManagement
System (DSMS)
Data Stream Management System (DSMS)Data Stream Management System (DSMS)
Input Streams
RegisterContinuous
Query
StreamedResult
StoredResult
ArchiveStoredTables
stanfordstreamdatamanager 7
Primer on Database Query ProcessingPrimer on Database Query Processing
Preprocessing
Query Optimization
Query Execution
Best queryexecution plan
Canonical form
DeclarativeQuery
Results
DatabaseSystem
DataData
stanfordstreamdatamanager 8
Traditional Query OptimizationTraditional Query Optimization
Executor:Runs chosen plan to
completion
Chosen query plan
Optimizer: Finds “best” query plan to
process this query
Query
Statistics Manager: Periodically collects statistics,
e.g., data sizes, histograms
Which statisticsare required
Estimatedstatistics
Data, auxiliary
structures,statistics
stanfordstreamdatamanager 9
Optimizing Continuous Queries is Optimizing Continuous Queries is ChallengingChallenging
• Continuous queries are long-running
• Stream properties can change while query runs– Data properties: value distributions
– Arrival properties: bursts, delays
• System conditions can change
• Performance of a fixed plan can change significantly over time
Adaptive processing: use plan that is best for current conditions
stanfordstreamdatamanager 10
RoadmapRoadmap
• StreaMon: Our adaptive query processing engine
• Adaptive ordering of commutative filters
• Adaptive caching for multiway joins
• Current and future work– Similar techniques apply to conventional databases
stanfordstreamdatamanager 11
Traditional Optimization Traditional Optimization StreaMon StreaMon
Optimizer: Finds “best” query plan to
process this query
Executor:Runs chosen plan to
completion
Chosen query plan
Query
Statistics Manager: Periodically collects statistics, e.g., table sizes, histograms
Which statisticsare required
Estimatedstatistics
Re-optimizer:Ensures that plan is efficient
for current characteristics
Profiler: Monitors current stream and
system characteristics
Executor: Executescurrent plan on
incoming stream tuples
Decisions toadapt
Combined in part for efficiency
stanfordstreamdatamanager 12
Pipelined FiltersPipelined Filters
• Commutative filters over a stream
• Example: Track HTTP packets with destination address matching a prefix in given table and content matching “*\.ida”
• Simple to complex filters
– Boolean predicates
– Table lookups
– Pattern matching
– User-defined functions
Filter1
PacketsPacketsPacketsPackets
Bad packetsBad packetsBad packetsBad packets
Filter2
Filter3
stanfordstreamdatamanager 13
Pipelined Filters: Problem DefinitionPipelined Filters: Problem Definition
• Continuous Query: F1 Æ F2 … Æ … Fn
• Plan: Tuples F(1) F(2) … … F(n)
• Goal: Minimize expected cost to process a tuple
stanfordstreamdatamanager 14
Pipelined Filters: ExamplePipelined Filters: Example
1234
456
8
1 12 23
77
12
F1 F2 F3 F4
1
Input tuples Output tuples
Informal Goal: If tuple will be dropped, then drop it as cheaply as possible
stanfordstreamdatamanager 15
Why is Our Problem Hard?Why is Our Problem Hard?
• Filter drop-rates and costs can change over time
• Filters can be correlated• E.g., Protocol = HTTP and DestPort = 80
stanfordstreamdatamanager 16
Metrics for an Adaptive AlgorithmMetrics for an Adaptive Algorithm
• Speed of adaptivity– Detecting changes and finding
new plan
• Run-time overhead– Re-optimization, collecting
statistics, plan switching
• Convergence properties– Plan properties under stable
statistics
ProfilerProfilerProfilerProfiler Re-optimizerRe-optimizerRe-optimizerRe-optimizer
ExecutorExecutorExecutorExecutor
StreaMonStreaMonStreaMonStreaMon
stanfordstreamdatamanager 17
Pipelined Filters: Stable StatisticsPipelined Filters: Stable Statistics
• Assume statistics are not changing– Order filters by decreasing drop-rate/cost
[MS79,IK84,KBZ86,H94]
– Correlations NP-Hard
• Greedy algorithm: Use conditional statistics
– F(1) has maximum drop-rate/cost
– F(2) has maximum drop-rate/cost ratio for tuples not
dropped by F(1)
– And so on
stanfordstreamdatamanager 18
Adaptive Version of GreedyAdaptive Version of Greedy• Greedy gives strong guarantees
– 4-approximation, best poly-time approx. possible assuming P NP [MBM+05]
– For arbitrary (correlated) characteristics
– Usually optimal in experiments
• Challenge:– Online algorithm
– Fast adaptivity to Greedy ordering
– Low run-time overhead
A-Greedy: Adaptive Greedy
stanfordstreamdatamanager 19
A-GreedyA-Greedy
Profiler: Maintains conditionalfilter drop-rates and costs
over recent tuples
Executor:Processes tuples with
current Greedy ordering
Re-optimizer: Ensures thatfilter ordering is Greedy for
current statistics
statisticsEstimated
are requiredWhich statistics
Combined in part for
efficiency
Changes infilter ordering
stanfordstreamdatamanager 20
A-Greedy’s ProfilerA-Greedy’s Profiler
• Responsible for maintaining current statistics– Filter costs
– Conditional filter drop-rates: exponential!
• Profile Window: Sampled statistics from which required conditional drop-rates can be estimated
stanfordstreamdatamanager 21
Profile WindowProfile Window
1234
456
8
1 12 23
77
4
0 1 1 0
0 0 1 11 0 0 1
1 0 0 1 ProfileWindow
1
F1 F2 F3 F4
stanfordstreamdatamanager 22
Greedy Ordering Using Profile WindowGreedy Ordering Using Profile Window
1 0 1 0
0 0 0 1
1 0 1 0
0 1 0 0
0 1 0 0
0 0 1 1
F1 F2 F3 F4
2 2 3 2
F1 F2 F3 F4
3 2 2 2
F3 F1 F2 F4
0 2 1
3 2 2 2
F3 F2 F4 F1
2 0 1
1 0Matrix View Greedy Ordering
stanfordstreamdatamanager 23
A-Greedy’s Re-optimizerA-Greedy’s Re-optimizer
• Maintains Matrix View over Profile Window – Easy to incorporate filter costs
– Efficient incremental update
– Fast detection/correction of changes in Greedy order
Details in [BMM+04]: “Adaptive Processing of Pipelined Stream Filters”, SIGMOD 2004
stanfordstreamdatamanager 24
NextNext
• Tradeoffs and variations of A-Greedy
• Experimental results for A-Greedy
stanfordstreamdatamanager 25
TradeoffsTradeoffs
• Suppose:
– Changes are infrequent
– Slower adaptivity is okay
– Want best plans at very low run-time overhead
• Three-way tradeoff among speed of adaptivity, run-time overhead, and convergence properties
• Spectrum of A-Greedy variants
stanfordstreamdatamanager 26
Variants of A-GreedyVariants of A-GreedyAlgorithm Convergence
PropertiesRun-time Overhead
Adap.
A-Greedy 4-approx. High (relative to others)
Fast
Matrix View
1 0 1 0
0 0 0 1
1 0 1 0
0 1 0 0
0 1 0 0
0 0 1 1
3 2 2 2
2 0 1
0
1 0
Profile Window Matrix View
stanfordstreamdatamanager 27
Variants of A-GreedyVariants of A-GreedyAlgorithm Convergence
PropertiesRun-time Overhead
Adap.
A-Greedy 4-approx. High (relative to others)
Fast
Matrix View
Sweep 4-approx. Less work per sampling step
Slow
Local-Swaps May get caught in
local optima
Less work per sampling step
Slow
Independent Misses correlations
Lower sampling rate
Fast
stanfordstreamdatamanager 28
Experimental SetupExperimental Setup
• Implemented A-Greedy, Sweep, Local-Swaps, and Independent in StreaMon
• Studied convergence properties, run-time overhead, and adaptivity
• Synthetic testbed– Can control stream data and arrival properties
• DSMS server running on 700 MHz Linux machine, 1 MB L2 cache, 2 GB memory
stanfordstreamdatamanager 29
Converged Processing RateConverged Processing Rate
Optimal-Fixed
Sweep
A-Greedy
Independent
Local-Swaps
20000
25000
30000
35000
40000
45000
50000
55000
3 4 6 8 10
Number of filters
Avg
. pro
cess
ing
rate
(tup
les/
sec)
Optimal
Sweep
A-Greedy
Local-Swaps
Independent
stanfordstreamdatamanager 30
Effect of Filter Drop-RateEffect of Filter Drop-Rate
Optimal-Fixed
Sweep
A-Greedy
Independent
Local-Swaps
30000
35000
40000
45000
50000
55000
60000
65000
70000
75000
80000
0.1 0.2 0.3 0.4 0.5 0.6 0.7 0.8 0.9
Drop-rate for each of 3 filters
Avg
. pro
cess
ing
rate
(tup
les/
sec)
Optimal
A-Greedy
Independent
stanfordstreamdatamanager 31
Effect of CorrelationEffect of Correlation
Optimal-Fixed
Sweep
A-Greedy
Independent
Local-Swaps
20000
25000
30000
35000
40000
45000
50000
55000
1 2 3 4
Correlation factor
Avg
. pro
cess
ing
rate
(tup
les/
sec)
Optimal
A-Greedy
Independent
stanfordstreamdatamanager 32
Run-time OverheadRun-time Overhead
0
50
100
150
200
250
Optimal A-Greedy Sweep Local-Swaps Independent
Ave
rage
tim
e/tu
ple
(mic
rose
cs)
Tuple processing Profiling + Reopt. Overhead
stanfordstreamdatamanager 33
AdaptivityAdaptivity
30003200340036003800400042004400460048005000
Progress of time
#Filt
ers
eval
uate
d pe
r 20
00 tu
ples
Sweep
Local-Swaps
A-Greedy
Independent
Permute selectivitieshere
Progress of time (x1000 tuples processed)
stanfordstreamdatamanager 34
RoadmapRoadmap
• StreaMon: Our adaptive processing engine
• Adaptive ordering of commutative filters
• Adaptive caching for multiway joins
• Current and future work
stanfordstreamdatamanager 35
Stream JoinsStream Joins
Sensor RSensor RSensor RSensor R Sensor SSensor SSensor SSensor S Sensor TSensor TSensor TSensor T
DSMS
observationsin the
last minute
join results
stanfordstreamdatamanager 36
MJoins (VNB04)MJoins (VNB04)
⋈R
⋈T
Window on R Window on S Window on T
⋈S
⋈T ⋈S
⋈R
stanfordstreamdatamanager 37
Excessive Recomputation in MJoinsExcessive Recomputation in MJoins
⋈R
⋈T
Window on R Window on S Window on T
stanfordstreamdatamanager 38
Materializing Join SubexpressionsMaterializing Join Subexpressions
Window on R Window on S Window on T
⋈
Fully-materialized
joinsubexpression
⋈
stanfordstreamdatamanager 39
Tree Joins: Trees of Binary JoinsTree Joins: Trees of Binary Joins
RRRR
SSSS
TTTT
⋈
⋈
Fully-materializedjoin subexpression
Window on R
Window on T
Window on S
stanfordstreamdatamanager 40
Hard State Hinders AdaptivityHard State Hinders Adaptivity
RRRR
SSSS
TTTT
⋈
⋈
WR WT⋈
SSSS TTTT
⋈
⋈
WS WT⋈
RRRR
Plan switch
stanfordstreamdatamanager 41
Can we get best of both worlds?Can we get best of both worlds?
MJoin Tree Join
Θ Recomputation Θ Less adaptive
Θ Higher memory use
⋈
⋈
WR WT⋈⋈S
⋈T
R S T
⋈T
⋈R
⋈R
⋈S
R
S
T
stanfordstreamdatamanager 42
MJoins + CachesMJoins + Caches
⋈R
⋈T
Window on R Window on S Window on T
WR WT⋈ S tuple Cache
Probe
Bypasspipelinesegment
stanfordstreamdatamanager 43
MJoins + Caches (contd.)MJoins + Caches (contd.)
• Caches are soft state– Adaptive
– Flexible with respect to memory usage
• Captures whole spectrum from MJoins to Tree Joins and plans in between
• Challenge: adaptive algorithm to choose join operator orders and caches in pipelines
stanfordstreamdatamanager 44
Adaptive Caching (A-Caching)Adaptive Caching (A-Caching)
• Adaptive join ordering with A-Greedy or variant– Join operator orders candidate caches
• Adaptive selection from candidate caches
• Adaptive memory allocation to chosen caches
stanfordstreamdatamanager 45
A-Caching (caching part only)A-Caching (caching part only)
Profiler:Estimates costs and benefits
of candidate caches
Executor:MJoins with caches
Re-optimizer: Ensures that maximum-benefit subset
of candidate caches is used
List of candidate caches
Estimatedstatistics
Combined in partfor efficiency
Add/removecaches
stanfordstreamdatamanager 46
Performance of Stream-Join Plans (1)Performance of Stream-Join Plans (1)
Arrival rates of streams are in the ratio 1:1:1:10, other details of input are given in [BMW+05]
⋈
⋈
R
T
S
⋈
U
0
50000
100000
150000
200000
250000
300000
350000
400000
450000
MJoin TreeJoin A-Caching
Avg.
pro
cess
ing
rate
(tup
les/
sec)
stanfordstreamdatamanager 47
Performance of Stream-Join Plans (2)Performance of Stream-Join Plans (2)
Arrival rates of streams are in the ratio 15:10:5:1, other details of input are given in [BMW+05]
0
50000
100000
150000
200000
250000
MJoin TreeJoin A-Caching
Avg
. p
rocessin
g r
ate
(tu
ple
s/s
ec)
stanfordstreamdatamanager 48
A-Caching: Results at a glanceA-Caching: Results at a glance
• Capture whole spectrum from Fully-pipelined MJoins to Tree-based joins adaptively
• Approximation algorithms scalable
• Different types of caches
• Up to 7x improvement with respect to MJoin and 2x improvement with respect to TreeJoin
• Details in [BMW+05]: “Adaptive Caching for Continuous Queries”, ICDE 2005 (To appear)
stanfordstreamdatamanager 49
Current and Future WorkCurrent and Future Work
• Broadening StreaMon’s scope, e.g.,– Shared computation among multiple queries
– Parallelism
• Rio: Adaptive query processing in conventional database systems
• Plan logging: A new overall approach to address certain “meta issues” in adaptive processing
stanfordstreamdatamanager 50
Related WorkRelated Work• Adaptive processing of continuous queries
– E.g., Eddies [AH00], NiagaraCQ [CDT+00]
• Adaptive processing in conventional databases
– Inter-query adaptivity, e.g., Leo [SLM+01], [BC03]
– Intra-query adaptivity, e.g., Re-Opt [KD98], POP [MRS+04]
• New approaches to query optimization
– E.g., parametric [GW89,INS+92,HS03], expected-cost based [CHS99,CHG02], error-aware [VN03]
stanfordstreamdatamanager 51
SummarySummary
• New trends demand adaptive query processing– New applications, e.g., continuous queries, data streams
– Increasing data size and query complexity
• CS-wide push towards autonomic computing
• Our goal: Adaptive Data Management System– StreaMon: Adaptive Data Stream Engine
– Rio: Adaptive Processing in Conventional DBMS
• Google keywords: shivnath, stanford stream
stanfordstreamdatamanager 52
Performance of Stream-Join PlansPerformance of Stream-Join Plans
0
50000
100000
150000
200000
250000
300000
350000
400000
450000
D1 D2 D3 D4 D5 D6 D7 D8
Sample points from spectrum of input properties
Avg
. pro
cess
ing
rate
(tup
les/
sec)
MJoin
TreeJoin
A-Caching
stanfordstreamdatamanager 53
Adaptivity to Memory AvailabilityAdaptivity to Memory Availability
10000
12000
14000
16000
18000
20000
22000
24000
26000
28000
0 10 20 32 40 50 60 70
Memory available for storing join subresults (KB)
Avg
. pro
cess
ing
rate
(tup
les/
sec)
TreeJoin
A-Caching
MJoin
stanfordstreamdatamanager 54
Plan LoggingPlan Logging• Log the profiling and re-optimization history
– Query is long-running
– Example view over log for R S T
Rate(R) ….. R,S) Plan Cost
1024 ….. 0.75 P112762
5642 ….. 0.72 P272332
934 ….. 0.76 P112003
⋈ ⋈
Plans lying in a Plans lying in a high- high- dimensional space of statisticsdimensional space of statistics Plans lying in a Plans lying in a high- high- dimensional space of statisticsdimensional space of statistics
Rate(R)
R,S
)
P1
P2
stanfordstreamdatamanager 55
Uses of Plan LoggingUses of Plan Logging
• Reducing re-optimization overhead– Create a cache of plans
• Reducing profiling overhead– Track how changes in a statistic contribute to
changes in best plan
Rate(R)
R
,S)
P1P2
stanfordstreamdatamanager 56
Uses of Plan Logging (contd.)Uses of Plan Logging (contd.)
• Tracking “Return of Investment” on adaptive processing– Track cost versus benefit of adaptivity
– Is there a single plan that would have good overall performance?
• Avoiding thrashing– Which statistics have transient changes?
stanfordstreamdatamanager 57
Adaptive Processing in Traditional DBMSAdaptive Processing in Traditional DBMS
Executor:Runs chosen plan to
completion
Chosen query plan
Optimizer: Finds “best” query plan to
process this query
Query
Statistics Manager: Periodically collects statistics,
e.g., data sizes, histograms
Which statisticsare required
Estimatedstatistics
Errors
stanfordstreamdatamanager 58
Proactive Re-optimization with RioProactive Re-optimization with Rio
QueryWhich statisticsare required
Estimates
“Robust“
plansCombined
for efficiency
Stats. Mgr. + Profiler: Collects statistics atrun-time based onrandom samples
(Re-)optimizer:Considers pairs of
(estimate, uncertainty)during optimization+
uncertainty
Statistics Manager: Periodically collects statistics,
e.g., data sizes, histograms
Optimizer: Finds “best” query plan to
process this query
Executor:Executes current plan
Executor:Runs chosen plan to
completion