a multicore parallelization of continuous skyline queries on data streams
TRANSCRIPT
A Multicore Parallelization of Continuous Skyline Queries on Data Streams
University of PisaItaly
Europar 2015 - Vienna
Tiziano De Matteis, Salvatore Di Girolamo,Gabriele Mencagli
INTRODUCTION
Skyline queries are used to retrieve interesting points from a large dataset according to multiple criteria (Pareto optimal).
Example: “Find cheap hotels near the City Center”
dis
tan
ceprice
Traditionally used in static DBMS, they have become a commonplace in real-time applications working on input data on the fly such as financial applications, social network analysis, sensor networks… and so on.
INTRODUCTION(Skyline) queries over data streams are challenging:
○ no control on how elements arrive;○ due to unbounded input, the query is evaluated on windows that
contains the most recent tuples;○ performance requirements in term of throughput and latency.
Parallelism is unavoidable
Goal: parallelization of continuous skyline query over multicores:
○ map-reduce pattern implementation;○ taking into account optimizations such as asynchronous reduce
and load-balancing.
PRELIMINARIESEach point p is represented as a tuple of d≥1 attributes {p1,p2,...,pd}
Given two points p and r, we say that p dominates r (p≺ r) iff:
∀i ∈ [1,d] pi ≤ ri and ∃ j | pj< rj
A sliding window is used to maintain the most recent tuples. Its length is expressed by the user in Tw time units (e.g. seconds, minutes):
○ the skyline at time t is computed over all points arrived in [t-Tw,t];○ a point p arrived at time tp
arr expires at time tpexp=tp
arr +Tw
Given a set of points , its skyline is the subset of all the points not dominated by any other point in
p
v r
CONTINUOUS SKYLINE OPERATORThe Skyline Operator has to maintain the skyline set of the points contained in the current window (i.e. received in the last Tw time units)
OP...v, s, r... (act, p, t)...
○ in input we have a stream of points;○ in output a stream of skyline updates, that
indicates whether a point p enter (ADD) or exit from (DEL) the skyline set at a given time t;
The operator has to maintain the set of live (non-obsolete) points in an internal spatial data structure (DB) on which performs insertions, deletions and searches (e.g. vector, R-Tree,...)
Two type of activations:
○ external: due to point arrivals;○ internal: due to point expirations.
DB
EAGER ALGORITHM
Due to Tao and Papadias [2006], performs most of the work at points arrival
Definition: the Skyline Influence Time of a point p (SITp) is the expiring
time of the youngest point r dominating p (i.e. its critical dominator)
The algorithm maintains an event list EL with two type of events:
○ skytime(p,t): indicates the entering of p into the skyline at time t;○ expire(p,t): indicates the expiring of p at time t.
EXTERNAL ACTIVATION
1. Pruning: all the points in DB dominated by p must be removed and their associated events cleared by EL. DEL updates for skyline points are emitted;
At the reception of the point p: Skyline
p
2. Insertion: the point p is inserted in DB;
3. Search the critical dominator r of p:○ if it exists, add the event skytime(p,tr
exp) to EL;○ otherwise p is a skyline point: ADD update in output stream and
expire(p,tpexp) in EL.
r
INTERNAL ACTIVATION
The events in EL are processed by using an internal timer. When an event is triggered:
○ skytime(p,t): the point p is added to the skyline and an ADD update is emitted. A new event expire(p,tp
exp) is inserted in EL.
Skyline
○ expire(p,t): p is removed from DB and a DEL update is emitted;
p
pr
PARALLELIZATION
It is based on a Map pattern with a Reduce phase. DB and EL are partitioned among a set of Workers
For each received point p the Emitter:
1. assigns the timestamp tparr according to current system time;
2. assigns the ownership of p to a specific Worker;3. p is multicasted to all the Workers.
...v, s, r...(p,owner)
(p,owner)
E
W
W
C
PARALLELIZATIONA generic Worker W
i will:
2. prune points dominated by p from DBi;E
Wi
C
(p,owner)
3. Wi computes the local SITp
i on the local DBi send it to the Collector;
4. if it is not the owner discards the point. Otherwise has to wait the result of the reduce from the Collector: a. if p does not have a dominator it is a skyline point: expire(p,tp
exp) in ELi and ADD update to Collector;
b. otherwise add a skytime(p,SITp) to the event list
1. execute the events in ELi with timestamp smaller than tp
arr (updates are sent);
(act,p,t)
SITp
i
SITp
PARALLELIZATION
The Collector receives two type of messages from Workers:
○ reduce messages: once it receives the local SITp
i from any Workers, compute SIT
p=max{SIT
pi } and send the result to the owner of p;
○ skyline updates: the Collector reorder the updates and transmit them onto the output stream.
Straightforward solution… but it has two main problems:
○ synchronous reduce phase: owner has to wait for a reply from the Collector;
○ load unbalancing due to pruning and wrong owner selection policies.
ASYNCHRONOUS REDUCE
Reduce can be done also in an asynchronous fashion.
Each Worker will wait a message from the Emitter (points) or from the Collector (reduce results). When the reduce result is received, ELk is properly updated
Wk (owner of point p) can process subsequent points while SIT
p is not
available. For each point r:
○ searches the youngest dominator in DBk (independent from SITp);
○ prune all the points dominated in DBk: if p is one of them, when the SIT is received produce the proper updates.
OWNER SELECTION POLICIES
Emitter has to assign points’ ownership in order to keep DBi evenly sized. Four heuristics, independent from the spatial coordinates of the points:
○ Round Robin (RR): ownership is interleaved among Workers;○ On Demand (OD): ownership is assigned to the first Worker able to
accommodate it into its input queue;○ Least Loaded Worker (LLW): the point is assigned to the Worker with the
smaller DBi;○ Least Loaded Worker with Ownership (LLW+): for each Worker we take
into account the number of enqueued points for which it has been designed as the owner.
EXPERIMENTSA prototypal implementation of the parallelization has been done targeting shared memory architecture:
○ parallel entities have been implemented as pthreads, pinned on cores;○ they interact through non-blocking lock-free queues provided by the
Fastflow library.
Target architecture: dual CPU Intel Sandy Bridge Xeon E5-265016 cores (32 with HT) running at 2GHz. 32 GB of Ram
In addition to the entities required by the parallelization, we have a Generator and a Consumer threads. Therefore we can have up to 12 Workers (if we don’t use HT)
G C
w
E
w
C
EXPERIMENTAL EVALUATIONTo study the effect of pruning, we considered three different point distributions
○ in any case the number of points in DB is three order of magnitude lower wrt the number of points received;
○ save memory at the expense of increased proc. time per point.
OWNER SELECTION POLICIESWe use a configuration with 4.5K non obsolete points distributed in 12 Workers. We measured the =|DBmax| - |DBmin|. Indep. Point dimension = 5
Strategy avg 2
RR 66.02 229
OD 34.13 1791
LLW 3.15 4.28
LLW+ 2.55 4.05
○ load-aware policies obtain smaller avg with lower variance;○ LLW+ is able to achieve a 20% improvement wrt LLW
ASYNCHRONOUS REDUCEWe measure the benefit of the asynchronous reduce on throughput, with LLW+
Scenario with rate of 100K points/sec, anticorr. distribution and Tw
=20s
The average gain is ~10% (higher with high parallelism degree)
THROUGHPUT AND SCALABILITYDifferent execution scenarios for different point distributions
Anticorrelated: Tw
=10s, =80Kp/s Independent: Tw
=10s, =100Kp/s Correlated: Tw
=60s, =250Kp/s
Ant. Indep. Corr.
B(12) 28Kp/s 78Kp/s 237Kp/s
S(12) 11.65 10.7 8.16
|DB| 4598 4226 1192
CONCLUSIONS
We have presented a map-reduce parallelization of the skyline operator on data stream. Optimized for what concern:
○ reduce phase: asynchronous reduce;○ owner-selection policies.
Both of them improved the performance of the solution
Future works:
○ investigate on the correlated case;○ enhance the implementation with autonomic features.
Thank you!Questions?
BKP-REORDERING WITH SYNC. REDUCE
If we adopt a synchronous reduce:
○ the results are produced by each Worker in order;○ but the Collector has to re order them in order to respect their
chronological order. To do that:○ buffers the updates and keeps them ordered by timestamp using a
priority queue;○ maintains the timestamp of the last received update from each
Worker;○ the buffered updates with timestamp smaller or equal than min{lst-ti
} can be safely transmitted
BKP-REODERING WITH ASYNC. REDUCE
Under this assumption, updates produced by each Worker can be disordered: for example the point p has to be inserted in the skyline by the owner, but while waiting for its reduce results other points arrive an possibly updates with a greater timestamp are produced.
The solution that we have adopted is to use punctuations:
○ when the Worker receive a result for the reduce state to the Collector that all the future results will have timestamp greater than that;
○ the Collector use this info for re ordering the result (clearly a little bit slower than sync. reduce)