communication cost in big data processing - mmds...

46
Communication Cost in Big Data Processing Dan Suciu University of Washington Joint work with Paul Beame and Paris Koutris and the Database Group at the UW 1

Upload: truongthu

Post on 25-Aug-2018

234 views

Category:

Documents


0 download

TRANSCRIPT

Page 1: Communication Cost in Big Data Processing - mmds …mmds-data.org/presentations/2014/suciu_mmds14.pdf · Output: • Each server computes and outputs the local join R(x,y) ⋈ S(y,z)

Communication Cost in Big Data Processing

Dan Suciu University of Washington

Joint work with Paul Beame and Paris Koutris

and the Database Group at the UW

1

Page 2: Communication Cost in Big Data Processing - mmds …mmds-data.org/presentations/2014/suciu_mmds14.pdf · Output: • Each server computes and outputs the local join R(x,y) ⋈ S(y,z)

Queries on Big Data

Big Data processing on distributed clusters •  General programs: MapReduce, Spark •  SQL: PigLatin,, BigQuery, Shark, Myria…

Traditional Data Processing •  Main cost = disk I/O Big Data Processing •  Main cost = communication

2

Page 3: Communication Cost in Big Data Processing - mmds …mmds-data.org/presentations/2014/suciu_mmds14.pdf · Output: • Each server computes and outputs the local join R(x,y) ⋈ S(y,z)

This Talk •  How much communication is needed to solve

a “problem” on p servers?

•  “Problem”: –  In this talk = Full Conjunctive Query CQ (≈ SQL) – Future work = extend to linear algebra, ML, etc.

•  Some techniques discussed in this talk are implemented in Myria (Bill Howe’s talk)

3

Page 4: Communication Cost in Big Data Processing - mmds …mmds-data.org/presentations/2014/suciu_mmds14.pdf · Output: • Each server computes and outputs the local join R(x,y) ⋈ S(y,z)

Models of Communication

•  Combined communication/computation cost: – PRAM à MRC [Karloff’2010] – BSP [Valiant] à LogP Model [Culler]

•  Explicit communication cost –  [Hong&Kung’81] red/blue pebble games, for I/O

communication complexity à [Ballard’2012] extension to parallel algorithms

– MapReduce model [DasSarma’2013] MPC [Beame’2013,Beame’2014] – this talk

4

Page 5: Communication Cost in Big Data Processing - mmds …mmds-data.org/presentations/2014/suciu_mmds14.pdf · Output: • Each server computes and outputs the local join R(x,y) ⋈ S(y,z)

Outline

•  The MPC Model

•  Communication Cost for Triangles

•  Communication Cost for CQ’s

•  Discussion

5

Page 6: Communication Cost in Big Data Processing - mmds …mmds-data.org/presentations/2014/suciu_mmds14.pdf · Output: • Each server computes and outputs the local join R(x,y) ⋈ S(y,z)

Massively Parallel Communication Model (MPC) Extends BSP [Valiant]

Server 1 Server p . . . .

Input (size=M)

Input data = size M bits

Number of servers = p

O(M/p) O(M/p)

6

Page 7: Communication Cost in Big Data Processing - mmds …mmds-data.org/presentations/2014/suciu_mmds14.pdf · Output: • Each server computes and outputs the local join R(x,y) ⋈ S(y,z)

Massively Parallel Communication Model (MPC) Extends BSP [Valiant]

Server 1 Server p . . . .

Server 1 Server p . . . .

Input (size=M)

Round 1

Input data = size M bits

One round = Compute & communicate

Number of servers = p

O(M/p) O(M/p)

7

Page 8: Communication Cost in Big Data Processing - mmds …mmds-data.org/presentations/2014/suciu_mmds14.pdf · Output: • Each server computes and outputs the local join R(x,y) ⋈ S(y,z)

Massively Parallel Communication Model (MPC) Extends BSP [Valiant]

Server 1 Server p . . . .

Server 1 Server p . . . .

Server 1 Server p . . . .

Input (size=M)

. . . .

Round 1

Round 2

Round 3

. . . .

Input data = size M bits

Algorithm = Several rounds

One round = Compute & communicate

Number of servers = p

O(M/p) O(M/p)

8

Page 9: Communication Cost in Big Data Processing - mmds …mmds-data.org/presentations/2014/suciu_mmds14.pdf · Output: • Each server computes and outputs the local join R(x,y) ⋈ S(y,z)

Massively Parallel Communication Model (MPC) Extends BSP [Valiant]

Server 1 Server p . . . .

Server 1 Server p . . . .

Server 1 Server p . . . .

Input (size=M)

. . . .

Round 1

Round 2

Round 3

. . . .

Input data = size M bits

Max communication load per server = L

Algorithm = Several rounds

One round = Compute & communicate

Number of servers = p L

L

L

L

L

L

O(M/p) O(M/p)

9

Page 10: Communication Cost in Big Data Processing - mmds …mmds-data.org/presentations/2014/suciu_mmds14.pdf · Output: • Each server computes and outputs the local join R(x,y) ⋈ S(y,z)

Massively Parallel Communication Model (MPC) Extends BSP [Valiant]

Server 1 Server p . . . .

Server 1 Server p . . . .

Server 1 Server p . . . .

Input (size=M)

. . . .

Round 1

Round 2

Round 3

. . . .

Input data = size M bits

Max communication load per server = L

Ideal: L = M/p This talk: L = M/p * pε, where ε in [0,1] is called space exponent Degenerate: L = M (send everything to one server)

Algorithm = Several rounds

One round = Compute & communicate

Number of servers = p L

L

L

L

L

L

O(M/p) O(M/p)

10

Page 11: Communication Cost in Big Data Processing - mmds …mmds-data.org/presentations/2014/suciu_mmds14.pdf · Output: • Each server computes and outputs the local join R(x,y) ⋈ S(y,z)

Example: Q(x,y,z) = R(x,y) ⋈ S(y,z)

Server 1 Server p . . . . M/p M/p Input:

•  R, S uniformly partitioned on p servers

size(R)+size(S)=M

11

Page 12: Communication Cost in Big Data Processing - mmds …mmds-data.org/presentations/2014/suciu_mmds14.pdf · Output: • Each server computes and outputs the local join R(x,y) ⋈ S(y,z)

Example: Q(x,y,z) = R(x,y) ⋈ S(y,z)

Server 1 Server p . . . .

Server 1 Server p . . . .

Round 1 O(M/p) O(M/p)

M/p M/p

Round 1: each server •  Sends record R(x,y) to server h(y) mod p •  Sends record S(y,z) to server h(y) mod p

Input: •  R, S uniformly

partitioned on p servers

size(R)+size(S)=M

12

Page 13: Communication Cost in Big Data Processing - mmds …mmds-data.org/presentations/2014/suciu_mmds14.pdf · Output: • Each server computes and outputs the local join R(x,y) ⋈ S(y,z)

Example: Q(x,y,z) = R(x,y) ⋈ S(y,z)

Server 1 Server p . . . .

Server 1 Server p . . . .

Round 1 O(M/p) O(M/p)

M/p M/p

R(x,y) ⋈ S(y,z) R(x,y) ⋈ S(y,z)

Output: •  Each server computes and outputs

the local join R(x,y) ⋈ S(y,z)

Round 1: each server •  Sends record R(x,y) to server h(y) mod p •  Sends record S(y,z) to server h(y) mod p

Input: •  R, S uniformly

partitioned on p servers

size(R)+size(S)=M

13

Page 14: Communication Cost in Big Data Processing - mmds …mmds-data.org/presentations/2014/suciu_mmds14.pdf · Output: • Each server computes and outputs the local join R(x,y) ⋈ S(y,z)

Example: Q(x,y,z) = R(x,y) ⋈ S(y,z)

Server 1 Server p . . . .

Server 1 Server p . . . .

Round 1 O(M/p) O(M/p)

M/p M/p

R(x,y) ⋈ S(y,z) R(x,y) ⋈ S(y,z)

Output: •  Each server computes and outputs

the local join R(x,y) ⋈ S(y,z)

Round 1: each server •  Sends record R(x,y) to server h(y) mod p •  Sends record S(y,z) to server h(y) mod p

Input: •  R, S uniformly

partitioned on p servers

Assuming no Skew:∀y, degreeR(y), degreeS(y) ≤ M/p Rounds = 1 Load L = O(M/p) = O(M/p * p0)

size(R)+size(S)=M

Space exponent = 0

Page 15: Communication Cost in Big Data Processing - mmds …mmds-data.org/presentations/2014/suciu_mmds14.pdf · Output: • Each server computes and outputs the local join R(x,y) ⋈ S(y,z)

Example: Q(x,y,z) = R(x,y) ⋈ S(y,z)

Server 1 Server p . . . .

Server 1 Server p . . . .

Round 1 O(M/p) O(M/p)

M/p M/p

R(x,y) ⋈ S(y,z) R(x,y) ⋈ S(y,z)

Output: •  Each server computes and outputs

the local join R(x,y) ⋈ S(y,z)

Round 1: each server •  Sends record R(x,y) to server h(y) mod p •  Sends record S(y,z) to server h(y) mod p

Input: •  R, S uniformly

partitioned on p servers

Assuming no Skew:∀y, degreeR(y), degreeS(y) ≤ M/p Rounds = 1 Load L = O(M/p) = O(M/p * p0)

size(R)+size(S)=M

Space exponent = 0

SQL is embarrassingly

parallel!

Page 16: Communication Cost in Big Data Processing - mmds …mmds-data.org/presentations/2014/suciu_mmds14.pdf · Output: • Each server computes and outputs the local join R(x,y) ⋈ S(y,z)

Questions

Fix a query Q and a number of servers p

•  What is the communication load L needed to compute Q in one round? This talk based on [Beame’2013, Beame’2014]

•  Fix a maximum load L (space exponent ε) How many rounds are needed for Q?Preliminary results in [Beame’2013]

16

Page 17: Communication Cost in Big Data Processing - mmds …mmds-data.org/presentations/2014/suciu_mmds14.pdf · Output: • Each server computes and outputs the local join R(x,y) ⋈ S(y,z)

Outline

•  The MPC Model

•  Communication Cost for Triangles

•  Communication Cost for CQ’s

•  Discussion

17

Page 18: Communication Cost in Big Data Processing - mmds …mmds-data.org/presentations/2014/suciu_mmds14.pdf · Output: • Each server computes and outputs the local join R(x,y) ⋈ S(y,z)

The Triangle Query

•  Input: three tables R(X, Y), S(Y, Z), T(Z, X) size(R)+size(S)+size(T)=M

•  Output: compute Q(x,y,z) = R(x,y) ⋈ S(y,z) ⋈ T(z,x)

Z X

Fred Alice

Jack Jim

Fred Jim

Carol Alice

Y Z

Fred Alice

Jack Jim

Fred Jim

Carol Alice

X Y

Fred Alice

Jack Jim

Fred Jim

Carol Alice

R

S

T

18

Page 19: Communication Cost in Big Data Processing - mmds …mmds-data.org/presentations/2014/suciu_mmds14.pdf · Output: • Each server computes and outputs the local join R(x,y) ⋈ S(y,z)

Triangles in Two Rounds

•  Round 1: compute temp(X,Y,Z) = R(X,Y) ⋈ S(Y,Z)

•  Round 2: compute Q(X,Y,Z) = temp(X,Y,Z) ⋈ T(Z,X)

Load L depends on the size of temp. Can be much larger than M/p

Q(X,Y,Z) = R(X,Y) ⋈ S(Y,Z) ⋈ T(Z,X) size(R)+size(S)+size(T)=M

Page 20: Communication Cost in Big Data Processing - mmds …mmds-data.org/presentations/2014/suciu_mmds14.pdf · Output: • Each server computes and outputs the local join R(x,y) ⋈ S(y,z)

Triangles in One Round

•  Factorize p = p⅓ × p⅓ × p⅓

•  Each server identified by (i,j,k)

i

j k

(i,j,k)

P⅓

Place the servers in a cube: Server (i,j,k)

[Ganguli’92, Afrati&Ullman’10, Suri’11, Beame’13]

20

Q(X,Y,Z) = R(X,Y) ⋈ S(Y,Z) ⋈ T(Z,X) size(R)+size(S)+size(T)=M

Page 21: Communication Cost in Big Data Processing - mmds …mmds-data.org/presentations/2014/suciu_mmds14.pdf · Output: • Each server computes and outputs the local join R(x,y) ⋈ S(y,z)

Triangles in One Round

k

(i,j,k)

P⅓

Z X

Fred Alice

Jack Jim

Fred Jim

Carol Alice

Jack Jim Y Z

Fred Alice

Jack Jim

Fred Jim

Carol Alice

Jim Jack Jim Jack

X Y

Fred Alice

Jack Jim

Fred Jim

Carol Alice

R

S

T

i = h(Fred)

j = h(Jim)

Fred Jim Fred Jim

Fred Jim Fred Jim

Jim Jack

Jim Jack

Fred Jim Jim Jack

Jim Jack Fred Jim

Fred Jim

Jack Jim Jack Jim

Round 1: Send R(x,y) to all servers (h(x),h(y),*) Send S(y,z) to all servers (*, h(y), h(z)) Send T(z,x) to all servers (h(x), *, h(z))

Output: compute locally R(x,y) ⋈ S(y,z) ⋈ T(z,x)

21

Q(X,Y,Z) = R(X,Y) ⋈ S(Y,Z) ⋈ T(Z,X) size(R)+size(S)+size(T)=M

Page 22: Communication Cost in Big Data Processing - mmds …mmds-data.org/presentations/2014/suciu_mmds14.pdf · Output: • Each server computes and outputs the local join R(x,y) ⋈ S(y,z)

Upper Bound is Lalgo = O(M/p⅔)

22

Theorem Assume data has no skew: all degree ≤ O(M/p⅓). Then the algorithm computes Q in one round, with communication load Lalgo = O(M/p⅔)

Compare to two rounds: •  No more large intermediate result •  Skew-free up to degrees = O(M/p⅓) (from O(M/p)) •  BUT: space exponent increased to ε = ⅓ (from ε = 1)

Q(X,Y,Z) = R(X,Y) ⋈ S(Y,Z) ⋈ T(Z,X)

M = size(R)+size(S)+size(T) (in bits)

size(R)+size(S)+size(T)=M

Page 23: Communication Cost in Big Data Processing - mmds …mmds-data.org/presentations/2014/suciu_mmds14.pdf · Output: • Each server computes and outputs the local join R(x,y) ⋈ S(y,z)

Lower Bound is Llower = M / p⅔

# without this assumption need to add a multiplicative factor 3 * Beame, Koutris, Suciu, Communication Steps for Parallel Query Processing, PODS 2013

M = size(R)+size(S)+size(T) (in bits)

23

Q(X,Y,Z) = R(X,Y) ⋈ S(Y,Z) ⋈ T(Z,X) size(R)+size(S)+size(T)=M

Page 24: Communication Cost in Big Data Processing - mmds …mmds-data.org/presentations/2014/suciu_mmds14.pdf · Output: • Each server computes and outputs the local join R(x,y) ⋈ S(y,z)

Lower Bound is Llower = M / p⅔

# without this assumption need to add a multiplicative factor 3 * Beame, Koutris, Suciu, Communication Steps for Parallel Query Processing, PODS 2013

M = size(R)+size(S)+size(T) (in bits)

24

Theorem* Any one-round deterministic algorithm A for Q requires Llower bits of communication per server

Q(X,Y,Z) = R(X,Y) ⋈ S(Y,Z) ⋈ T(Z,X)

Assume that the three input relations R, S, T are stored on disjoint servers#

size(R)+size(S)+size(T)=M

Page 25: Communication Cost in Big Data Processing - mmds …mmds-data.org/presentations/2014/suciu_mmds14.pdf · Output: • Each server computes and outputs the local join R(x,y) ⋈ S(y,z)

Lower Bound is Llower = M / p⅔

M = size(R)+size(S)+size(T) (in bits)

25

Theorem* Any one-round deterministic algorithm A for Q requires Llower bits of communication per server

We prove a stronger claim, about any algorithm communicating < Llower bits Let L be the algorithm’s max communication load per server We prove: if L < Llower then, over random permutations R, S, T, the algorithm returns at most a fraction (L / Llower)3/2 of the answers to Q

Q(X,Y,Z) = R(X,Y) ⋈ S(Y,Z) ⋈ T(Z,X)

# without this assumption need to add a multiplicative factor 3 * Beame, Koutris, Suciu, Communication Steps for Parallel Query Processing, PODS 2013

Assume that the three input relations R, S, T are stored on disjoint servers#

size(R)+size(S)+size(T)=M

Page 26: Communication Cost in Big Data Processing - mmds …mmds-data.org/presentations/2014/suciu_mmds14.pdf · Output: • Each server computes and outputs the local join R(x,y) ⋈ S(y,z)

Lower Bound is Llower = M / p⅔ Discussion: •  An algorithm with space exponent ε < ⅓, has load

L = M/p1-ε and reports only (L / Llower)3/2 = 1/p(1-3ε)/2 fraction of all answers. Fewer, as p increases!

•  Lower bound holds only for random inputs: cannot hold for fixed input R, S, T since the algorithm may use a short bit encoding to signal this input to all servers

•  By Yao’s lemma, the result also holds for randomized algorithms, some fixed input

26

Q(X,Y,Z) = R(X,Y) ⋈ S(Y,Z) ⋈ T(Z,X) size(R)+size(S)+size(T)=M

Page 27: Communication Cost in Big Data Processing - mmds …mmds-data.org/presentations/2014/suciu_mmds14.pdf · Output: • Each server computes and outputs the local join R(x,y) ⋈ S(y,z)

[Balard’2012] consider Strassen’s matrix multiplication algorithm and provethe following theorem for the CDAG model. If each server has M c · n2

p2/lg7

memory, then the communication load per server:

IO(n, p,M) = ⌦

✓npM

◆lg7 M

p

!

Thus, if M = n2

p2/lg7 then IO(n, p) = ⌦⇣

n2

p2/lg7

Lower Bound is Llower = M / p⅔

•  Stronger than MPC: no restriction on rounds •  Weaker than MPC

– Applies only to an algorithm, not to a problem – CDAG model of computation v.s. bit-model 27

Q(X,Y,Z) = R(X,Y) ⋈ S(Y,Z) ⋈ T(Z,X)

M here same as n2 here

size(R)+size(S)+size(T)=M

Page 28: Communication Cost in Big Data Processing - mmds …mmds-data.org/presentations/2014/suciu_mmds14.pdf · Output: • Each server computes and outputs the local join R(x,y) ⋈ S(y,z)

Lower Bound is Llower = M / p⅔

Q(X,Y,Z) = R(X,Y) ⋈ S(Y,Z) ⋈ T(Z,X)

Proof

28

size(R)+size(S)+size(T)=M

Page 29: Communication Cost in Big Data Processing - mmds …mmds-data.org/presentations/2014/suciu_mmds14.pdf · Output: • Each server computes and outputs the local join R(x,y) ⋈ S(y,z)

Lower Bound is Llower = M / p⅔

Q(X,Y,Z) = R(X,Y) ⋈ S(Y,Z) ⋈ T(Z,X)

Proof

E [#answers to Q] = Σi,j,k Pr [(i,j,k) ∈ R⋈S⋈T] = n3 * (1/n)3 = 1

29

Input: size = M = 3 log n! •  R, S, T random permutations on [n]: Pr [(i,j) ∈R] = 1/n, same for S, T

size(R)+size(S)+size(T)=M

Page 30: Communication Cost in Big Data Processing - mmds …mmds-data.org/presentations/2014/suciu_mmds14.pdf · Output: • Each server computes and outputs the local join R(x,y) ⋈ S(y,z)

Lower Bound is Llower = M / p⅔

Q(X,Y,Z) = R(X,Y) ⋈ S(Y,Z) ⋈ T(Z,X)

Proof

E [#answers to Q] = Σi,j,k Pr [(i,j,k) ∈ R⋈S⋈T] = n3 * (1/n)3 = 1

30

One server v ∈[p]: receives L(v) = LR(v) + LS(v) + LT(v) bits •  Denote aij = Pr [(i,j)∈R and v knows it]

Then (1) aij ≤ 1/n (obvious) (2) Σi,j aij ≤ LR(v) / log n (formal proof: entropy) •  Denote similarly bjk,cki for S and T

Input: size = M = 3 log n! •  R, S, T random permutations on [n]: Pr [(i,j) ∈R] = 1/n, same for S, T

size(R)+size(S)+size(T)=M

Page 31: Communication Cost in Big Data Processing - mmds …mmds-data.org/presentations/2014/suciu_mmds14.pdf · Output: • Each server computes and outputs the local join R(x,y) ⋈ S(y,z)

Lower Bound is Llower = M / p⅔

Q(X,Y,Z) = R(X,Y) ⋈ S(Y,Z) ⋈ T(Z,X)

Proof

E [#answers to Q] = Σi,j,k Pr [(i,j,k) ∈ R⋈S⋈T] = n3 * (1/n)3 = 1

31

One server v ∈[p]: receives L(v) = LR(v) + LS(v) + LT(v) bits •  Denote aij = Pr [(i,j)∈R and v knows it]

Then (1) aij ≤ 1/n (obvious) (2) Σi,j aij ≤ LR(v) / log n (formal proof: entropy) •  Denote similarly bjk,cki for S and T •  E [#answers to Q reported by server v] =

Σi,j,k aij bjk cki ≤ [(Σi,j aij2) * (Σj,k bjk

2) * (Σk,i cki2)]½ [Friedgut]

≤ (1/n) 3/2 * [ LR(v) * LS(v) * LT(v) / log3 n ]½ by (1), (2)

≤ (1/n) 3/2 * [ L(v) / 3* log n ]3/2 geometric/arithmetic = [ L(v) / M ]3/2 expression for M above

Input: size = M = 3 log n! •  R, S, T random permutations on [n]: Pr [(i,j) ∈R] = 1/n, same for S, T

size(R)+size(S)+size(T)=M

Page 32: Communication Cost in Big Data Processing - mmds …mmds-data.org/presentations/2014/suciu_mmds14.pdf · Output: • Each server computes and outputs the local join R(x,y) ⋈ S(y,z)

Lower Bound is Llower = M / p⅔

Q(X,Y,Z) = R(X,Y) ⋈ S(Y,Z) ⋈ T(Z,X)

Proof

E [#answers to Q] = Σi,j,k Pr [(i,j,k) ∈ R⋈S⋈T] = n3 * (1/n)3 = 1

E [#answers to Q reported by all servers] ≤ Σv [ L(v) / M ]3/2 = p * [ L / M ] 3/2 = [ L / Llower ]3/2

32

Input: size = M = 3 log n! •  R, S, T random permutations on [n]: Pr [(i,j) ∈R] = 1/n, same for S, T

One server v ∈[p]: receives L(v) = LR(v) + LS(v) + LT(v) bits •  Denote aij = Pr [(i,j)∈R and v knows it]

Then (1) aij ≤ 1/n (obvious) (2) Σi,j aij ≤ LR(v) / log n (formal proof: entropy) •  Denote similarly bjk,cki for S and T •  E [#answers to Q reported by server v] =

Σi,j,k aij bjk cki ≤ [(Σi,j aij2) * (Σj,k bjk

2) * (Σk,i cki2)]½ [Friedgut]

≤ (1/n) 3/2 * [ LR(v) * LS(v) * LT(v) / log3 n ]½ by (1), (2)

≤ (1/n) 3/2 * [ L(v) / 3* log n ]3/2 geometric/arithmetic = [ L(v) / M ]3/2 expression for M above

size(R)+size(S)+size(T)=M

Page 33: Communication Cost in Big Data Processing - mmds …mmds-data.org/presentations/2014/suciu_mmds14.pdf · Output: • Each server computes and outputs the local join R(x,y) ⋈ S(y,z)

Outline

•  The MPC Model

•  Communication Cost for Triangles

•  Communication Cost for CQ’s

•  Discussion

33

Page 34: Communication Cost in Big Data Processing - mmds …mmds-data.org/presentations/2014/suciu_mmds14.pdf · Output: • Each server computes and outputs the local join R(x,y) ⋈ S(y,z)

General Formula Consider an arbitrary conjunctive query (CQ): We will: •  Give a lower bound Llower formula •  Give an algorithm with load Lalgo •  Prove that Llower = Lalgo

34

Q(x1, . . . , xk) = S1(x1) on . . . on S`(x`)

size(S1) = M1, size(S2) = M2, . . . , size(S`) = M`

Page 35: Communication Cost in Big Data Processing - mmds …mmds-data.org/presentations/2014/suciu_mmds14.pdf · Output: • Each server computes and outputs the local join R(x,y) ⋈ S(y,z)

Cartesian Product Q(x,y) = S1(x) × S2(y) size(S1)=M1, size(S2) = M2

Page 36: Communication Cost in Big Data Processing - mmds …mmds-data.org/presentations/2014/suciu_mmds14.pdf · Output: • Each server computes and outputs the local join R(x,y) ⋈ S(y,z)

Cartesian Product

S1(x) à

S2 (y) à

Q(x,y) = S1(x) × S2(y) size(S1)=M1, size(S2) = M2

Lalgo

= 2

✓M

1

·M2

p

◆ 12

Algorithm: factorize p = p1 × p2

•  Send S1(x) to all servers (h(x),*) Send S2(y) to all servers (*, h(y))

•  Load = M1 / p1 + M2 / p2; Minimized when M1 / p1 = M2 / p2 = (M1M2/p)½

Page 37: Communication Cost in Big Data Processing - mmds …mmds-data.org/presentations/2014/suciu_mmds14.pdf · Output: • Each server computes and outputs the local join R(x,y) ⋈ S(y,z)

Cartesian Product

37

S1(x) à

S2 (y) à

Q(x,y) = S1(x) × S2(y) size(S1)=M1, size(S2) = M2

Lalgo

= 2

✓M

1

·M2

p

◆ 12

Llower

= 2

✓M

1

·M2

p

◆ 12

Lower bound: •  Let L(v) = L1(v) + L2(v) = load at server v •  The p servers must report all answers to Q:

M1 M2 ≤ Σv L1(v) * L2(v) ≤ ½ Σv (L(v))2 ≤ ½ p maxv (L(v))2

Algorithm: factorize p = p1 × p2

•  Send S1(x) to all servers (h(x),*) Send S2(y) to all servers (*, h(y))

•  Load = M1 / p1 + M2 / p2; Minimized when M1 / p1 = M2 / p2 = (M1M2/p)½

Page 38: Communication Cost in Big Data Processing - mmds …mmds-data.org/presentations/2014/suciu_mmds14.pdf · Output: • Each server computes and outputs the local join R(x,y) ⋈ S(y,z)

A Simple Observation Q(x1, . . . , xk) = S1(x1) on . . . on S`(x`) size(S1) = M1, size(S2) = M2, . . . , size(S`) = M`

Proof similar to cartesian product on previous slide

Definition An edge packing for Q is a set of atoms with no common variables:

U ={Sj1(xj1), Sj2(xj2), . . . , Sj|U|(xj|U|)} 8i, k : xji \ xjk = ;

Claim Any one-round algorithm for Q must also compute the cartesian product:

Q0(xj1 , . . . ,xj|U|) = Sj1(xj1) on Sj2(xj2) on . . . on Sj|U|(xj|U|)

Proof Because the algorithm doesn’t know the other Sj ’s.

Llower

= |U | ·✓Mj1 ·Mj2 · · ·Mj|U|

p

◆ 1|U|

Assume that the three input relations S1, S2, … are stored on disjoint servers#

#otherwise, add another factor |U|

Page 39: Communication Cost in Big Data Processing - mmds …mmds-data.org/presentations/2014/suciu_mmds14.pdf · Output: • Each server computes and outputs the local join R(x,y) ⋈ S(y,z)

The Lower Bound for CQ

39

Q(x1, . . . , xk) = S1(x1) on . . . on S`(x`) size(S1) = M1, size(S2) = M2, . . . , size(S`) = M`

Theorem* Any one-round deterministic algorithm A for Q requires O(Llower) bits of communication per server:

Llower

=

✓M

1

u1 ·M2

u2 · · ·M`u`

p

◆ 1u1+u2+···+u`

*Beame, Koutris, Suciu, Skew in Parallel Data Processing, PODS 2014

Note that we ignore constant factors (like |U| on the previous slide)

Definition A fractional edge packing are real numbers u1, u2, . . . , u`

s.t.:

8j 2 [`] : uj

� 0

8i 2 [k] :X

j2[`]:xi2vars(Sj(xj))

uj

1

Page 40: Communication Cost in Big Data Processing - mmds …mmds-data.org/presentations/2014/suciu_mmds14.pdf · Output: • Each server computes and outputs the local join R(x,y) ⋈ S(y,z)

Triangle Query Revisited

40

Q(X,Y,Z) = R(X,Y) ⋈ S(Y,Z) ⋈ T(Z,X) size(R) = M1, size(S) = M2, size(T) = M3

Packing u1, u2, u3 L = (M1u1 * M2

u2 * M3u3 / p)1/(u1+u2+u3)

½, ½, ½ (M1 M2 M3)⅓ / p⅔

1, 0, 0 M1 / p 0, 1, 0 M2 / p 0, 0, 1 M3 / p

Llower = the largest of these four values When M1=M2=M3=M, then the largest value is the first: M / p⅔

x y

z

½

½ ½

Page 41: Communication Cost in Big Data Processing - mmds …mmds-data.org/presentations/2014/suciu_mmds14.pdf · Output: • Each server computes and outputs the local join R(x,y) ⋈ S(y,z)

An Algorithm for CQ [Afrati&Ullman’2010] Factorize p = p1 × p2 × … × pk; a server is (v1, …, vk) ∈[p1] × … × [pk] •  Send S1(x11, x12, …) to servers with coordinates h(x11), h(x12), … •  Send S2(x21, x22, …) to servers with coordinates h(x21), h(x22), … •  …

•  Compute the query locally at each server

•  Load is O(Lalgo), where:

Q(x1, . . . , xk) = S1(x1) on . . . on S`(x`) size(S1) = M1, size(S2) = M2, . . . , size(S`) = M`

[A&U] use sum instead of max. With max we can compute the optimal p1, p2, …, pk

Lalgo

= max

j2[`]

MjQ

i2[k]:xi2vars(Sj(xj))pi

Page 42: Communication Cost in Big Data Processing - mmds …mmds-data.org/presentations/2014/suciu_mmds14.pdf · Output: • Each server computes and outputs the local join R(x,y) ⋈ S(y,z)

Llower = Lalgo

Q(x1, . . . , xk) = S1(x1) on . . . on S`(x`) size(S1) = M1, size(S2) = M2, . . . , size(S`) = M`

Llower

=

✓Qj Mj

uj

p

◆ 1Pj uj

Apply log in base p. Denote µj = logp Mj, ei = logp pi

Lalgo

= max

j

MjQ

i:xi2vars(Sj)pi

Page 43: Communication Cost in Big Data Processing - mmds …mmds-data.org/presentations/2014/suciu_mmds14.pdf · Output: • Each server computes and outputs the local join R(x,y) ⋈ S(y,z)

Llower = Lalgo

Q(x1, . . . , xk) = S1(x1) on . . . on S`(x`) size(S1) = M1, size(S2) = M2, . . . , size(S`) = M`

Llower

=

✓Qj Mj

uj

p

◆ 1Pj uj

Apply log in base p. Denote µj = logp Mj, ei = logp pi

minimize �X

i

ei

1

8j : �+X

j:xi2vars(Sj)

ei

� µj

Lalgo

= max

j

MjQ

i:xi2vars(Sj)pi

Page 44: Communication Cost in Big Data Processing - mmds …mmds-data.org/presentations/2014/suciu_mmds14.pdf · Output: • Each server computes and outputs the local join R(x,y) ⋈ S(y,z)

Llower = Lalgo

Q(x1, . . . , xk) = S1(x1) on . . . on S`(x`) size(S1) = M1, size(S2) = M2, . . . , size(S`) = M`

Llower

=

✓Qj Mj

uj

p

◆ 1Pj uj

Apply log in base p. Denote µj = logp Mj, ei = logp pi

minimize �X

i

ei

1

8j : �+X

j:xi2vars(Sj)

ei

� µj

maximize (X

j

fj

µj

� f0)

X

j

fj

1

8i :X

j:xi2vars(Sj)

fj

f0Primal/Dual uj = fj / f0

u0 = 1 / f0 at optimality: u0 = Σj uj

Lalgo

= max

j

MjQ

i:xi2vars(Sj)pi

Page 45: Communication Cost in Big Data Processing - mmds …mmds-data.org/presentations/2014/suciu_mmds14.pdf · Output: • Each server computes and outputs the local join R(x,y) ⋈ S(y,z)

Outline

•  The MPC Model

•  Communication Cost for Triangles

•  Communication Cost for CQ’s

•  Discussion

45

Page 46: Communication Cost in Big Data Processing - mmds …mmds-data.org/presentations/2014/suciu_mmds14.pdf · Output: • Each server computes and outputs the local join R(x,y) ⋈ S(y,z)

Discussion Communication costs for multiple rounds •  Lower/upper bounds in [Beame’13] connect the

number of rounds to log of the query diameter •  Weaker communication model: algorithm sends

only tuples, not arbitrary bits Communication cost for skewed data •  Lower/upper bounds in [Beame’14] have a gap of

poly log p Communication costs beyond full CQ’s •  Open!

46