Distributed event aggregation for content-based Publish/Subscribe systems
Navneet Kumar Pandey1
Stéphane Weiss1
Roman Vitenberg1
Kaiwen Zhang2
Hans-Arno Jacobsen2
2University of Toronto1University of Oslo
Motivation: Intelligent Transport System (ITS)
• Information providers: road sensors, crowdsourced mobile apps
• Information seekers: commuters, police, first responders, radio networks etc.
2http://www.wired.com/images_blogs/autopia/2012/08/12A914.jpg
• Aggregate subscriptions
• Count number of cars passing a street light per hour
• Average speed of cars on a road segment per day
• Non-aggregate subscriptions
• Accident reports
• Traffic violation reports
Aggregation in pub/sub
3
• Pub/sub is well known for efficient content filtering and dissemination for distributed event sources and sinks.
• However, pub/sub does not support aggregation, which is required in emerging applications.
• Our primary objective is to retain the traditional pub/sub focus on low communication cost, while adding support for aggregation.
Contributions: aggregation in pub/sub
4
• We propose a framework and baseline approaches for aggregation in content-based pub/sub systems (CBPS).
• We show how the relative performance of the baseline approaches varies with workload properties.
• We propose a per-broker distributed adaptive approach.
BI
P[val,8]A[val, > ,4]
S[val, > ,3]
Bp
Bq
BSBI
B Broker
Subscription Delivery Tree (SDT)
Advertisement-based pub/sub model
5
Comparison with stream processing
6
Aggregation in stream processing Aggregation in pub/sub
Requires global view of topology Topology is not known to individual broker nodes
Requires a priori knowledge of publication sources
Publication sources and sinks are dynamic
Needs control layer Brokers are loosely coupled
Usually have a static query plan SDTs are dynamic and determined by the pub/sub implementation
Optimized for continuous data streams
Publications come at an irregular rate
Proposed aggregation framework
7
Publication filtering procedure (PFP)
Subscription: { RoadID = 101, speed > 10, op=‘avg’ , Duration (ω) = 2 hour, shift size (δ) = 1 hour}
NWR3
NWR1
NWR2
subscription
1 2 30 Time
Notification window ranges (NWR)
Pub1Pub2 Pub3
A single publication can participate in several NWRs, even for the same subscription.
Proposed aggregation framework
8
Initial computation procedure (ICP)
Publication filtering procedure (PFP)
Outgoing messages: { avg(Pub1, Pub2, Pub3), avg(Pub2, Pub3) }
Outgoing messages: { avg(Pub1, Pub2), avg(Pub2), Pub3 }
NWR3
NWR1
NWR2
subscription
1 2 30 Time
Notification window ranges (NWR)
Pub1Pub2 Pub3
x
Processing start time presents a trade-off between communication cost and end-to-end delay.
Proposed aggregation framework
9
Initial computation procedure (ICP)
Publication filtering procedure (PFP)
Recurrent processing procedure (RPP)
Bp
BI
Bq
Collection delayavgp
avgq
avgpq
Collection delay is another parameter affecting the delay-communication trade-off.
Late aggregation approach
1010
Bp
Bq
Bs
P[val,9]
P[val,2]
P[val,5]
P[val,3]
Smin[val,>,2]
P[Valmin,3]
Messages exchanged in Late aggregation: 6
PFS ICP RPP
BSBI
Late approach aggregates messages at subscriber-edge brokers.
Early aggregation approach
1111
BA
BI
P[val,9]
P[val,2]
P[val,5]
P[val,3]
Smin[val,>,2]
P[valmin,9]
P[valmin,3]
P[valmin,3]
P[valmin,3]
PFS ICP RPP
Messages exchanged in Early aggregation: 3Bp
Bq
BS
Messages exchanged in Late aggregation: 6
Early approach aggregates messages at publisher-edge brokers.
Early does not always outperform Late
12
BI
P[val,9]
P[val,2]
P[val,5]
P[val,3]
Smax[val,>,2]
Late aggregationMessages exchanged: 6
Scount[val,>,2]
Smin[val,>,2]
P[valmax,5]
P[valmin,3]
P[valcount,2]
Early aggregationMessages exchanged: 9
12
Bp
Bq
BS
P[valmax,9]
P[valmin,9]
P[valcount,1]
P[valmax,9]
P[valmin,3]
P[valcount,3]
Comparison between Early and Late
13
Reducing the communication cost requires an adaptive solution
Increasing parameter Favors
Publication matching rate Early
Matching number of NWRs Late
Overlap among aggregate subscriptions Late
Ratio between aggregate and regular subscriptions Early
Several parameters affect the performance of our baselines:
Benefits of adaptive aggregation
14
BA
P[val,9]
P[val,2]
P[val,5]
P[val,3]
S[val,>,6]
Smin[val,>,2]
P[valmin,3]
14
BA
BA
P[val,9]
P[val,9]
P[valmin,3]
Late
6
BF
Early
5
Bp
Bq
BI BS
P[valmin,9]
Benefits of adaptive aggregation
15
P[val,9]
P[val,2]
P[val,5]
P[val,3]
S[val,>,6]
Smin[val,>,2]
P[valmin,3]
15
BA
BA
P[val,9]
P[val,9]
P[valmin,3]
Late
6
Bq
Per-broker adaptation reduces communication cost
Early
5
Adaptive
4
Bp
Bq
BS
Adaptive
BIBI
Adaptation process (MAPE-K)
16
• Matching publications within sampling period
• Changes in subscription set
• Compare the ratio between Pubs vs. NWRs
• Estimate the notification rate
• Choose the suitable mode• Transition between aggregate
and forward mode
• Start/stop aggregation at broker
Monitor
Analyze Plan
Execute
Information at a broker•Registered subscriptions•Current execution mode
Knowledge
Experimental setup• Implemented in Java over the PADRES framework• Topology: 16 brokers
– Combination of publisher-edge only, subscriber-edge only and mixed brokers
• Real life datasets: • Traffic dataset from the ONE-ITS service1
• Yahoo! Finance Stock dataset• Metrics:
• Number of messages exchanged• Processing overhead• End-to-end delay
17
B B B B
BB
BB
BB
BB
BB
BB BB
BB
BB
BB BB
BB
1http://one-its-webapp1.transport.utoronto.ca
Results (Stock dataset)
18
Varying Publication/second Varying number of subscriptions
Decision becomes more accurate when available information is sufficient
• Adaptive aggregation performs close to the best among Early and Late for all settings.
• Early perform better at high pub rates whereas Late is better with large number of subscriptions.
Results (Traffic dataset)
19
Varying Publication/second Varying number of subscriptionsPer-Broker adaptation can cause individual brokers to make incorrect decisions
Processing overhead (Stock)
20
Predicate matching cost Aggregation-related overhead
Adaptation overhead is dominating the aggregation overhead
Conclusions
21
• We provide an aggregation framework for CBPS with baseline solutions.
• We demonstrate that neither baseline is dominant and depends upon workload parameters.
• We provide a generic adaptive aggregation framework.
• We experimentally demonstrate that our distributed adaptive solution performs close to the best baseline across all settings.