scalejoin: a deterministic, disjoint-parallel and skew-resilient stream join

28
ScaleJoin: a Deterministic, Disjoint-Parallel and Skew- Resilient Stream Join Vincenzo Gulisano , Yiannis Nikolakopoulos, Marina Papatriantafilou, Philippas Tsigas 2022-06-28 1 Chalmers University of technology

Upload: vincenzo-gulisano

Post on 12-Apr-2017

327 views

Category:

Science


0 download

TRANSCRIPT

Page 1: ScaleJoin: a Deterministic, Disjoint-Parallel and Skew-Resilient Stream Join

2023-05-02 1

ScaleJoin: a Deterministic, Disjoint-Parallel and Skew-Resilient

Stream Join Vincenzo Gulisano, Yiannis Nikolakopoulos,

Marina Papatriantafilou, Philippas Tsigas

Chalmers University of technology

Page 2: ScaleJoin: a Deterministic, Disjoint-Parallel and Skew-Resilient Stream Join

2023-05-02 2

Agenda

• What is a stream join?• Which are the challenges of a parallel stream join?• Why ScaleJoin?• How well does ScaleJoin addresses stream joins’

challenges?• Conclusions

Page 3: ScaleJoin: a Deterministic, Disjoint-Parallel and Skew-Resilient Stream Join

2023-05-02 3

Agenda

• What is a stream join?• Which are the challenges of a parallel stream join?• Why ScaleJoin?• How well does ScaleJoin addresses stream joins’

challenges?• Conclusions

Page 4: ScaleJoin: a Deterministic, Disjoint-Parallel and Skew-Resilient Stream Join

2023-05-02 4

MotivationApplications in sensor networks, cyber-physical systems:• large and fluctuating volumes of data generated

continuouslydemand for:• Continuous processing of data streams• In a real-time fashion

Store-then-process is not feasible!!!

Page 5: ScaleJoin: a Deterministic, Disjoint-Parallel and Skew-Resilient Stream Join

2023-05-02 5

What is a stream join?

Data stream: unbounded sequence of tuples

t1

t2

t3

t4

t1

t2

t3

t4

t1

t2

t3

t4

R S

Sliding window Window

size WSWSWR

Predicate P

Page 6: ScaleJoin: a Deterministic, Disjoint-Parallel and Skew-Resilient Stream Join

2023-05-02 6

Why parallel stream joins?• WS = 600 seconds

• R receives 500 tuples/second• S receives 500 tuples/second

• WR will contain 300,000 tuples

• WS will contain 300,000 tuples

• Each new tuple from R gets compared with all the tuples in WS

• Each new tuple from S gets compared with all the tuples in WR

… 300,000,000 comparisons/second!

t1

t2

t3

t4

t1

t2

t3

t4

R S

WSWR

Page 7: ScaleJoin: a Deterministic, Disjoint-Parallel and Skew-Resilient Stream Join

2023-05-02 7

Agenda

• What is a stream join?• Which are the challenges of a parallel stream join?• Why ScaleJoin?• How well does ScaleJoin addresses stream joins’

challenges?• Conclusions

Page 8: ScaleJoin: a Deterministic, Disjoint-Parallel and Skew-Resilient Stream Join

2023-05-02 8

Which are the challenges of a parallel stream join?

Scalability

High throughput

Low latency

Disjoint parallelism

Skew resilience

Determinism

Page 9: ScaleJoin: a Deterministic, Disjoint-Parallel and Skew-Resilient Stream Join

2023-05-02 9

Agenda

• What is a stream join?• Which are the challenges of a parallel stream join?• Why ScaleJoin?• How well does ScaleJoin addresses stream joins’

challenges?• Conclusions

Page 10: ScaleJoin: a Deterministic, Disjoint-Parallel and Skew-Resilient Stream Join

2023-05-02 10

The 3-step procedure (sequential stream join)

For each incoming tuple t:1. compare t with all tuples in opposite window given predicate P2. add t to its window3. remove stale tuples from t’s window

Add tuples to S

Add tuples to R

Prod R

Prod S

Consume resultsConsPU

We assume each producer delivers tuples

in timestamp order

Page 11: ScaleJoin: a Deterministic, Disjoint-Parallel and Skew-Resilient Stream Join

2023-05-02 11

The 3-step procedure, is it enough?

Scalability

High throughput

Low latency

Disjoint parallelism

Skew resilience

Determinism

t1

t2

t1

t2

R S

WSWR

t3

t1

t2

t1

t2

R S

WSWR

t4

t3

Page 12: ScaleJoin: a Deterministic, Disjoint-Parallel and Skew-Resilient Stream Join

2023-05-02 12

Enforcing determinism in sequential stream joins

• Next tuple to process = earliest(tS,tR)

• The earliest(tS,tR) tuple is referred to as the next ready tuple

• Process ready tuples in timestamp order Determinism

PU

tS tR

Page 13: ScaleJoin: a Deterministic, Disjoint-Parallel and Skew-Resilient Stream Join

2023-05-02 13

Deterministic 3-step procedure

Pick the next ready tuple t:1. compare t with all tuples in opposite window given predicate P2. add t to its window3. remove stale tuples from t’s window

Add tuples to S

Add tuples to R

Prod R

Prod S

Consume resultsConsPU

Page 14: ScaleJoin: a Deterministic, Disjoint-Parallel and Skew-Resilient Stream Join

2023-05-02 14

Shared-nothing parallel stream join(state-of-the-art)

Prod R

Prod S

PU1

PU2

PUN

ConsAdd tuple to PUi S

Add tuple to PUi R

Consume results

Pick the next ready tuple t:1. compare t with all tuples in opposite window given P2. add t to its window3. remove stale tuples from t’s window

Chose a PU

Chose a PU

Take the next ready output tuple

Scalability

High throughput

Low latency

Disjoint parallelism

Skew resilience

Determinism

Merge

Page 15: ScaleJoin: a Deterministic, Disjoint-Parallel and Skew-Resilient Stream Join

2023-05-02 15

Shared-nothing parallel stream join(state-of-the-art)

Prod R

Prod S

PU1

PU2

PUN…

enqueue()dequeue()

ConsMerge

Page 16: ScaleJoin: a Deterministic, Disjoint-Parallel and Skew-Resilient Stream Join

2023-05-02 16

From coarse-grained to fine-grained synchronization

Prod R

Prod S

PU1

PU2

PUN

… Cons

Page 17: ScaleJoin: a Deterministic, Disjoint-Parallel and Skew-Resilient Stream Join

2023-05-02 17

ScaleGate

addTuple(tuple,sourceID)allows a tuple from sourceID to be merged by ScaleGate in the resulting timestamp-sorted stream of ready tuples.

getNextReadyTuple(readerID)provides to readerID the next earliest ready tuple that has not been yet consumed by the former.

https://github.com/dcs-chalmers/ScaleGate_Java

Page 18: ScaleJoin: a Deterministic, Disjoint-Parallel and Skew-Resilient Stream Join

2023-05-02 18

ScaleJoin

Prod R

Prod S

PU1

PU2

PUN

… ConsAdd tuple SGin

Add tuple SGin

Get next ready output tuplefrom SGout

Get next ready input tuple from SGin

1. compare t with all tuples in opposite window given P2. add t to its window in a round-robin fashion3. remove stale tuples from t’s window

SGin SGout

Steps for PU

Page 19: ScaleJoin: a Deterministic, Disjoint-Parallel and Skew-Resilient Stream Join

2023-05-02 19

t1

t2

R S

WR

t3

t4

R S

t4

t1WR

R S

t4

t2

WR

R S

t4

WR

t3

Sequential stream join:

ScaleJoin with 3 PUs:

ScaleJoin (example)

Page 20: ScaleJoin: a Deterministic, Disjoint-Parallel and Skew-Resilient Stream Join

2023-05-02 20

ScaleJoin

Prod R

Prod S

PU1

PU2

PUN

… ConsAdd tuple SGin

Add tuple SGin

Get next ready output tuplefrom SGout

SGin SGout

Scalability

High throughput

Low latency

Disjoint parallelism

Skew resilience

Determinism

Prod S

Prod S

Prod R Get next ready input tuple from SGin

1. compare t with all tuples in opposite window given P2. add t to its window in a round robin fashion3. remove stale tuples from t’s window

Steps for PUi

Page 21: ScaleJoin: a Deterministic, Disjoint-Parallel and Skew-Resilient Stream Join

2023-05-02 21

Agenda

• What is a stream join?• Which are the challenges of a parallel stream join?• Why ScaleJoin?• How well does ScaleJoin addresses stream joins’

challenges?• Conclusions

Page 22: ScaleJoin: a Deterministic, Disjoint-Parallel and Skew-Resilient Stream Join

2023-05-02 22

Evaluation setup• Common benchmark

• Implemented in Java

• Evaluation platform– NUMA architecture: 2.6 GHz AMD Opteron 6230 (48 cores over 4 sockets), 64 GB of

memory– Architecture with Hyper Threading: 2.0 GHz Intel Xeon E5-2650 (16 cores over 2

sockets), 64 GB of memory

t1

t2

t3

t4

t1

t2

t3

t4

R S

R: <timestamp,x,y,z> S: <timestamp,a,b,c,d>

P: a−10≤x≤a+10 AND b−10≤y≤b+10

Page 23: ScaleJoin: a Deterministic, Disjoint-Parallel and Skew-Resilient Stream Join

2023-05-02 23

ScaleJoin Scalability – comparisons/second

Number of PUs

Page 24: ScaleJoin: a Deterministic, Disjoint-Parallel and Skew-Resilient Stream Join

2023-05-02 24

ScaleJoin latency – milliseconds

Number of PUs

Page 25: ScaleJoin: a Deterministic, Disjoint-Parallel and Skew-Resilient Stream Join

2023-05-02 25

ScaleJoin skew-resilienceConstant distinct rates with peaks

Page 26: ScaleJoin: a Deterministic, Disjoint-Parallel and Skew-Resilient Stream Join

2023-05-02 26

Agenda

• What is a stream join?• Which are the challenges of a parallel stream join?• Why ScaleJoin?• How well does ScaleJoin addresses stream joins’

challenges?• Conclusions

Page 27: ScaleJoin: a Deterministic, Disjoint-Parallel and Skew-Resilient Stream Join

2023-05-02 27

Conclusions

• ScaleJoin: a Deterministic, Disjoint-Parallel and Skew-Resilient Stream Join

• Challenges of parallel stream joins

• Fine-grained synchronization (ScaleGate)

• 4 billion comparisons/second, with latency lower than 60 milliseconds

Scalability

High throughput

Low latency

Disjoint parallelism

Skew resilience

Determinism

Page 28: ScaleJoin: a Deterministic, Disjoint-Parallel and Skew-Resilient Stream Join

2023-05-02 28

ScaleJoin: a Deterministic, Disjoint-Parallel and Skew-Resilient

Stream Join Vincenzo Gulisano, Yiannis Nikolakopoulos,

Marina Papatriantafilou, Philippas Tsigas

Thank you! Questions?