communication cost in big data processing - mmds...

Post on 25-Aug-2018

234 Views

Category:

Documents

0 Downloads

Preview:

Click to see full reader

TRANSCRIPT

Communication Cost in Big Data Processing

Dan Suciu University of Washington

Joint work with Paul Beame and Paris Koutris

and the Database Group at the UW

1

Queries on Big Data

Big Data processing on distributed clusters •  General programs: MapReduce, Spark •  SQL: PigLatin,, BigQuery, Shark, Myria…

Traditional Data Processing •  Main cost = disk I/O Big Data Processing •  Main cost = communication

2

This Talk •  How much communication is needed to solve

a “problem” on p servers?

•  “Problem”: –  In this talk = Full Conjunctive Query CQ (≈ SQL) – Future work = extend to linear algebra, ML, etc.

•  Some techniques discussed in this talk are implemented in Myria (Bill Howe’s talk)

3

Models of Communication

•  Combined communication/computation cost: – PRAM à MRC [Karloff’2010] – BSP [Valiant] à LogP Model [Culler]

•  Explicit communication cost –  [Hong&Kung’81] red/blue pebble games, for I/O

communication complexity à [Ballard’2012] extension to parallel algorithms

– MapReduce model [DasSarma’2013] MPC [Beame’2013,Beame’2014] – this talk

4

Outline

•  The MPC Model

•  Communication Cost for Triangles

•  Communication Cost for CQ’s

•  Discussion

5

Massively Parallel Communication Model (MPC) Extends BSP [Valiant]

Server 1 Server p . . . .

Input (size=M)

Input data = size M bits

Number of servers = p

O(M/p) O(M/p)

6

Massively Parallel Communication Model (MPC) Extends BSP [Valiant]

Server 1 Server p . . . .

Server 1 Server p . . . .

Input (size=M)

Round 1

Input data = size M bits

One round = Compute & communicate

Number of servers = p

O(M/p) O(M/p)

7

Massively Parallel Communication Model (MPC) Extends BSP [Valiant]

Server 1 Server p . . . .

Server 1 Server p . . . .

Server 1 Server p . . . .

Input (size=M)

. . . .

Round 1

Round 2

Round 3

. . . .

Input data = size M bits

Algorithm = Several rounds

One round = Compute & communicate

Number of servers = p

O(M/p) O(M/p)

8

Massively Parallel Communication Model (MPC) Extends BSP [Valiant]

Server 1 Server p . . . .

Server 1 Server p . . . .

Server 1 Server p . . . .

Input (size=M)

. . . .

Round 1

Round 2

Round 3

. . . .

Input data = size M bits

Max communication load per server = L

Algorithm = Several rounds

One round = Compute & communicate

Number of servers = p L

L

L

L

L

L

O(M/p) O(M/p)

9

Massively Parallel Communication Model (MPC) Extends BSP [Valiant]

Server 1 Server p . . . .

Server 1 Server p . . . .

Server 1 Server p . . . .

Input (size=M)

. . . .

Round 1

Round 2

Round 3

. . . .

Input data = size M bits

Max communication load per server = L

Ideal: L = M/p This talk: L = M/p * pε, where ε in [0,1] is called space exponent Degenerate: L = M (send everything to one server)

Algorithm = Several rounds

One round = Compute & communicate

Number of servers = p L

L

L

L

L

L

O(M/p) O(M/p)

10

Example: Q(x,y,z) = R(x,y) ⋈ S(y,z)

Server 1 Server p . . . . M/p M/p Input:

•  R, S uniformly partitioned on p servers

size(R)+size(S)=M

11

Example: Q(x,y,z) = R(x,y) ⋈ S(y,z)

Server 1 Server p . . . .

Server 1 Server p . . . .

Round 1 O(M/p) O(M/p)

M/p M/p

Round 1: each server •  Sends record R(x,y) to server h(y) mod p •  Sends record S(y,z) to server h(y) mod p

Input: •  R, S uniformly

partitioned on p servers

size(R)+size(S)=M

12

Example: Q(x,y,z) = R(x,y) ⋈ S(y,z)

Server 1 Server p . . . .

Server 1 Server p . . . .

Round 1 O(M/p) O(M/p)

M/p M/p

R(x,y) ⋈ S(y,z) R(x,y) ⋈ S(y,z)

Output: •  Each server computes and outputs

the local join R(x,y) ⋈ S(y,z)

Round 1: each server •  Sends record R(x,y) to server h(y) mod p •  Sends record S(y,z) to server h(y) mod p

Input: •  R, S uniformly

partitioned on p servers

size(R)+size(S)=M

13

Example: Q(x,y,z) = R(x,y) ⋈ S(y,z)

Server 1 Server p . . . .

Server 1 Server p . . . .

Round 1 O(M/p) O(M/p)

M/p M/p

R(x,y) ⋈ S(y,z) R(x,y) ⋈ S(y,z)

Output: •  Each server computes and outputs

the local join R(x,y) ⋈ S(y,z)

Round 1: each server •  Sends record R(x,y) to server h(y) mod p •  Sends record S(y,z) to server h(y) mod p

Input: •  R, S uniformly

partitioned on p servers

Assuming no Skew:∀y, degreeR(y), degreeS(y) ≤ M/p Rounds = 1 Load L = O(M/p) = O(M/p * p0)

size(R)+size(S)=M

Space exponent = 0

Example: Q(x,y,z) = R(x,y) ⋈ S(y,z)

Server 1 Server p . . . .

Server 1 Server p . . . .

Round 1 O(M/p) O(M/p)

M/p M/p

R(x,y) ⋈ S(y,z) R(x,y) ⋈ S(y,z)

Output: •  Each server computes and outputs

the local join R(x,y) ⋈ S(y,z)

Round 1: each server •  Sends record R(x,y) to server h(y) mod p •  Sends record S(y,z) to server h(y) mod p

Input: •  R, S uniformly

partitioned on p servers

Assuming no Skew:∀y, degreeR(y), degreeS(y) ≤ M/p Rounds = 1 Load L = O(M/p) = O(M/p * p0)

size(R)+size(S)=M

Space exponent = 0

SQL is embarrassingly

parallel!

Questions

Fix a query Q and a number of servers p

•  What is the communication load L needed to compute Q in one round? This talk based on [Beame’2013, Beame’2014]

•  Fix a maximum load L (space exponent ε) How many rounds are needed for Q?Preliminary results in [Beame’2013]

16

Outline

•  The MPC Model

•  Communication Cost for Triangles

•  Communication Cost for CQ’s

•  Discussion

17

The Triangle Query

•  Input: three tables R(X, Y), S(Y, Z), T(Z, X) size(R)+size(S)+size(T)=M

•  Output: compute Q(x,y,z) = R(x,y) ⋈ S(y,z) ⋈ T(z,x)

Z X

Fred Alice

Jack Jim

Fred Jim

Carol Alice

Y Z

Fred Alice

Jack Jim

Fred Jim

Carol Alice

X Y

Fred Alice

Jack Jim

Fred Jim

Carol Alice

R

S

T

18

Triangles in Two Rounds

•  Round 1: compute temp(X,Y,Z) = R(X,Y) ⋈ S(Y,Z)

•  Round 2: compute Q(X,Y,Z) = temp(X,Y,Z) ⋈ T(Z,X)

Load L depends on the size of temp. Can be much larger than M/p

Q(X,Y,Z) = R(X,Y) ⋈ S(Y,Z) ⋈ T(Z,X) size(R)+size(S)+size(T)=M

Triangles in One Round

•  Factorize p = p⅓ × p⅓ × p⅓

•  Each server identified by (i,j,k)

i

j k

(i,j,k)

P⅓

Place the servers in a cube: Server (i,j,k)

[Ganguli’92, Afrati&Ullman’10, Suri’11, Beame’13]

20

Q(X,Y,Z) = R(X,Y) ⋈ S(Y,Z) ⋈ T(Z,X) size(R)+size(S)+size(T)=M

Triangles in One Round

k

(i,j,k)

P⅓

Z X

Fred Alice

Jack Jim

Fred Jim

Carol Alice

Jack Jim Y Z

Fred Alice

Jack Jim

Fred Jim

Carol Alice

Jim Jack Jim Jack

X Y

Fred Alice

Jack Jim

Fred Jim

Carol Alice

R

S

T

i = h(Fred)

j = h(Jim)

Fred Jim Fred Jim

Fred Jim Fred Jim

Jim Jack

Jim Jack

Fred Jim Jim Jack

Jim Jack Fred Jim

Fred Jim

Jack Jim Jack Jim

Round 1: Send R(x,y) to all servers (h(x),h(y),*) Send S(y,z) to all servers (*, h(y), h(z)) Send T(z,x) to all servers (h(x), *, h(z))

Output: compute locally R(x,y) ⋈ S(y,z) ⋈ T(z,x)

21

Q(X,Y,Z) = R(X,Y) ⋈ S(Y,Z) ⋈ T(Z,X) size(R)+size(S)+size(T)=M

Upper Bound is Lalgo = O(M/p⅔)

22

Theorem Assume data has no skew: all degree ≤ O(M/p⅓). Then the algorithm computes Q in one round, with communication load Lalgo = O(M/p⅔)

Compare to two rounds: •  No more large intermediate result •  Skew-free up to degrees = O(M/p⅓) (from O(M/p)) •  BUT: space exponent increased to ε = ⅓ (from ε = 1)

Q(X,Y,Z) = R(X,Y) ⋈ S(Y,Z) ⋈ T(Z,X)

M = size(R)+size(S)+size(T) (in bits)

size(R)+size(S)+size(T)=M

Lower Bound is Llower = M / p⅔

# without this assumption need to add a multiplicative factor 3 * Beame, Koutris, Suciu, Communication Steps for Parallel Query Processing, PODS 2013

M = size(R)+size(S)+size(T) (in bits)

23

Q(X,Y,Z) = R(X,Y) ⋈ S(Y,Z) ⋈ T(Z,X) size(R)+size(S)+size(T)=M

Lower Bound is Llower = M / p⅔

# without this assumption need to add a multiplicative factor 3 * Beame, Koutris, Suciu, Communication Steps for Parallel Query Processing, PODS 2013

M = size(R)+size(S)+size(T) (in bits)

24

Theorem* Any one-round deterministic algorithm A for Q requires Llower bits of communication per server

Q(X,Y,Z) = R(X,Y) ⋈ S(Y,Z) ⋈ T(Z,X)

Assume that the three input relations R, S, T are stored on disjoint servers#

size(R)+size(S)+size(T)=M

Lower Bound is Llower = M / p⅔

M = size(R)+size(S)+size(T) (in bits)

25

Theorem* Any one-round deterministic algorithm A for Q requires Llower bits of communication per server

We prove a stronger claim, about any algorithm communicating < Llower bits Let L be the algorithm’s max communication load per server We prove: if L < Llower then, over random permutations R, S, T, the algorithm returns at most a fraction (L / Llower)3/2 of the answers to Q

Q(X,Y,Z) = R(X,Y) ⋈ S(Y,Z) ⋈ T(Z,X)

# without this assumption need to add a multiplicative factor 3 * Beame, Koutris, Suciu, Communication Steps for Parallel Query Processing, PODS 2013

Assume that the three input relations R, S, T are stored on disjoint servers#

size(R)+size(S)+size(T)=M

Lower Bound is Llower = M / p⅔ Discussion: •  An algorithm with space exponent ε < ⅓, has load

L = M/p1-ε and reports only (L / Llower)3/2 = 1/p(1-3ε)/2 fraction of all answers. Fewer, as p increases!

•  Lower bound holds only for random inputs: cannot hold for fixed input R, S, T since the algorithm may use a short bit encoding to signal this input to all servers

•  By Yao’s lemma, the result also holds for randomized algorithms, some fixed input

26

Q(X,Y,Z) = R(X,Y) ⋈ S(Y,Z) ⋈ T(Z,X) size(R)+size(S)+size(T)=M

[Balard’2012] consider Strassen’s matrix multiplication algorithm and provethe following theorem for the CDAG model. If each server has M c · n2

p2/lg7

memory, then the communication load per server:

IO(n, p,M) = ⌦

✓npM

◆lg7 M

p

!

Thus, if M = n2

p2/lg7 then IO(n, p) = ⌦⇣

n2

p2/lg7

Lower Bound is Llower = M / p⅔

•  Stronger than MPC: no restriction on rounds •  Weaker than MPC

– Applies only to an algorithm, not to a problem – CDAG model of computation v.s. bit-model 27

Q(X,Y,Z) = R(X,Y) ⋈ S(Y,Z) ⋈ T(Z,X)

M here same as n2 here

size(R)+size(S)+size(T)=M

Lower Bound is Llower = M / p⅔

Q(X,Y,Z) = R(X,Y) ⋈ S(Y,Z) ⋈ T(Z,X)

Proof

28

size(R)+size(S)+size(T)=M

Lower Bound is Llower = M / p⅔

Q(X,Y,Z) = R(X,Y) ⋈ S(Y,Z) ⋈ T(Z,X)

Proof

E [#answers to Q] = Σi,j,k Pr [(i,j,k) ∈ R⋈S⋈T] = n3 * (1/n)3 = 1

29

Input: size = M = 3 log n! •  R, S, T random permutations on [n]: Pr [(i,j) ∈R] = 1/n, same for S, T

size(R)+size(S)+size(T)=M

Lower Bound is Llower = M / p⅔

Q(X,Y,Z) = R(X,Y) ⋈ S(Y,Z) ⋈ T(Z,X)

Proof

E [#answers to Q] = Σi,j,k Pr [(i,j,k) ∈ R⋈S⋈T] = n3 * (1/n)3 = 1

30

One server v ∈[p]: receives L(v) = LR(v) + LS(v) + LT(v) bits •  Denote aij = Pr [(i,j)∈R and v knows it]

Then (1) aij ≤ 1/n (obvious) (2) Σi,j aij ≤ LR(v) / log n (formal proof: entropy) •  Denote similarly bjk,cki for S and T

Input: size = M = 3 log n! •  R, S, T random permutations on [n]: Pr [(i,j) ∈R] = 1/n, same for S, T

size(R)+size(S)+size(T)=M

Lower Bound is Llower = M / p⅔

Q(X,Y,Z) = R(X,Y) ⋈ S(Y,Z) ⋈ T(Z,X)

Proof

E [#answers to Q] = Σi,j,k Pr [(i,j,k) ∈ R⋈S⋈T] = n3 * (1/n)3 = 1

31

One server v ∈[p]: receives L(v) = LR(v) + LS(v) + LT(v) bits •  Denote aij = Pr [(i,j)∈R and v knows it]

Then (1) aij ≤ 1/n (obvious) (2) Σi,j aij ≤ LR(v) / log n (formal proof: entropy) •  Denote similarly bjk,cki for S and T •  E [#answers to Q reported by server v] =

Σi,j,k aij bjk cki ≤ [(Σi,j aij2) * (Σj,k bjk

2) * (Σk,i cki2)]½ [Friedgut]

≤ (1/n) 3/2 * [ LR(v) * LS(v) * LT(v) / log3 n ]½ by (1), (2)

≤ (1/n) 3/2 * [ L(v) / 3* log n ]3/2 geometric/arithmetic = [ L(v) / M ]3/2 expression for M above

Input: size = M = 3 log n! •  R, S, T random permutations on [n]: Pr [(i,j) ∈R] = 1/n, same for S, T

size(R)+size(S)+size(T)=M

Lower Bound is Llower = M / p⅔

Q(X,Y,Z) = R(X,Y) ⋈ S(Y,Z) ⋈ T(Z,X)

Proof

E [#answers to Q] = Σi,j,k Pr [(i,j,k) ∈ R⋈S⋈T] = n3 * (1/n)3 = 1

E [#answers to Q reported by all servers] ≤ Σv [ L(v) / M ]3/2 = p * [ L / M ] 3/2 = [ L / Llower ]3/2

32

Input: size = M = 3 log n! •  R, S, T random permutations on [n]: Pr [(i,j) ∈R] = 1/n, same for S, T

One server v ∈[p]: receives L(v) = LR(v) + LS(v) + LT(v) bits •  Denote aij = Pr [(i,j)∈R and v knows it]

Then (1) aij ≤ 1/n (obvious) (2) Σi,j aij ≤ LR(v) / log n (formal proof: entropy) •  Denote similarly bjk,cki for S and T •  E [#answers to Q reported by server v] =

Σi,j,k aij bjk cki ≤ [(Σi,j aij2) * (Σj,k bjk

2) * (Σk,i cki2)]½ [Friedgut]

≤ (1/n) 3/2 * [ LR(v) * LS(v) * LT(v) / log3 n ]½ by (1), (2)

≤ (1/n) 3/2 * [ L(v) / 3* log n ]3/2 geometric/arithmetic = [ L(v) / M ]3/2 expression for M above

size(R)+size(S)+size(T)=M

Outline

•  The MPC Model

•  Communication Cost for Triangles

•  Communication Cost for CQ’s

•  Discussion

33

General Formula Consider an arbitrary conjunctive query (CQ): We will: •  Give a lower bound Llower formula •  Give an algorithm with load Lalgo •  Prove that Llower = Lalgo

34

Q(x1, . . . , xk) = S1(x1) on . . . on S`(x`)

size(S1) = M1, size(S2) = M2, . . . , size(S`) = M`

Cartesian Product Q(x,y) = S1(x) × S2(y) size(S1)=M1, size(S2) = M2

Cartesian Product

S1(x) à

S2 (y) à

Q(x,y) = S1(x) × S2(y) size(S1)=M1, size(S2) = M2

Lalgo

= 2

✓M

1

·M2

p

◆ 12

Algorithm: factorize p = p1 × p2

•  Send S1(x) to all servers (h(x),*) Send S2(y) to all servers (*, h(y))

•  Load = M1 / p1 + M2 / p2; Minimized when M1 / p1 = M2 / p2 = (M1M2/p)½

Cartesian Product

37

S1(x) à

S2 (y) à

Q(x,y) = S1(x) × S2(y) size(S1)=M1, size(S2) = M2

Lalgo

= 2

✓M

1

·M2

p

◆ 12

Llower

= 2

✓M

1

·M2

p

◆ 12

Lower bound: •  Let L(v) = L1(v) + L2(v) = load at server v •  The p servers must report all answers to Q:

M1 M2 ≤ Σv L1(v) * L2(v) ≤ ½ Σv (L(v))2 ≤ ½ p maxv (L(v))2

Algorithm: factorize p = p1 × p2

•  Send S1(x) to all servers (h(x),*) Send S2(y) to all servers (*, h(y))

•  Load = M1 / p1 + M2 / p2; Minimized when M1 / p1 = M2 / p2 = (M1M2/p)½

A Simple Observation Q(x1, . . . , xk) = S1(x1) on . . . on S`(x`) size(S1) = M1, size(S2) = M2, . . . , size(S`) = M`

Proof similar to cartesian product on previous slide

Definition An edge packing for Q is a set of atoms with no common variables:

U ={Sj1(xj1), Sj2(xj2), . . . , Sj|U|(xj|U|)} 8i, k : xji \ xjk = ;

Claim Any one-round algorithm for Q must also compute the cartesian product:

Q0(xj1 , . . . ,xj|U|) = Sj1(xj1) on Sj2(xj2) on . . . on Sj|U|(xj|U|)

Proof Because the algorithm doesn’t know the other Sj ’s.

Llower

= |U | ·✓Mj1 ·Mj2 · · ·Mj|U|

p

◆ 1|U|

Assume that the three input relations S1, S2, … are stored on disjoint servers#

#otherwise, add another factor |U|

The Lower Bound for CQ

39

Q(x1, . . . , xk) = S1(x1) on . . . on S`(x`) size(S1) = M1, size(S2) = M2, . . . , size(S`) = M`

Theorem* Any one-round deterministic algorithm A for Q requires O(Llower) bits of communication per server:

Llower

=

✓M

1

u1 ·M2

u2 · · ·M`u`

p

◆ 1u1+u2+···+u`

*Beame, Koutris, Suciu, Skew in Parallel Data Processing, PODS 2014

Note that we ignore constant factors (like |U| on the previous slide)

Definition A fractional edge packing are real numbers u1, u2, . . . , u`

s.t.:

8j 2 [`] : uj

� 0

8i 2 [k] :X

j2[`]:xi2vars(Sj(xj))

uj

1

Triangle Query Revisited

40

Q(X,Y,Z) = R(X,Y) ⋈ S(Y,Z) ⋈ T(Z,X) size(R) = M1, size(S) = M2, size(T) = M3

Packing u1, u2, u3 L = (M1u1 * M2

u2 * M3u3 / p)1/(u1+u2+u3)

½, ½, ½ (M1 M2 M3)⅓ / p⅔

1, 0, 0 M1 / p 0, 1, 0 M2 / p 0, 0, 1 M3 / p

Llower = the largest of these four values When M1=M2=M3=M, then the largest value is the first: M / p⅔

x y

z

½

½ ½

An Algorithm for CQ [Afrati&Ullman’2010] Factorize p = p1 × p2 × … × pk; a server is (v1, …, vk) ∈[p1] × … × [pk] •  Send S1(x11, x12, …) to servers with coordinates h(x11), h(x12), … •  Send S2(x21, x22, …) to servers with coordinates h(x21), h(x22), … •  …

•  Compute the query locally at each server

•  Load is O(Lalgo), where:

Q(x1, . . . , xk) = S1(x1) on . . . on S`(x`) size(S1) = M1, size(S2) = M2, . . . , size(S`) = M`

[A&U] use sum instead of max. With max we can compute the optimal p1, p2, …, pk

Lalgo

= max

j2[`]

MjQ

i2[k]:xi2vars(Sj(xj))pi

Llower = Lalgo

Q(x1, . . . , xk) = S1(x1) on . . . on S`(x`) size(S1) = M1, size(S2) = M2, . . . , size(S`) = M`

Llower

=

✓Qj Mj

uj

p

◆ 1Pj uj

Apply log in base p. Denote µj = logp Mj, ei = logp pi

Lalgo

= max

j

MjQ

i:xi2vars(Sj)pi

Llower = Lalgo

Q(x1, . . . , xk) = S1(x1) on . . . on S`(x`) size(S1) = M1, size(S2) = M2, . . . , size(S`) = M`

Llower

=

✓Qj Mj

uj

p

◆ 1Pj uj

Apply log in base p. Denote µj = logp Mj, ei = logp pi

minimize �X

i

ei

1

8j : �+X

j:xi2vars(Sj)

ei

� µj

Lalgo

= max

j

MjQ

i:xi2vars(Sj)pi

Llower = Lalgo

Q(x1, . . . , xk) = S1(x1) on . . . on S`(x`) size(S1) = M1, size(S2) = M2, . . . , size(S`) = M`

Llower

=

✓Qj Mj

uj

p

◆ 1Pj uj

Apply log in base p. Denote µj = logp Mj, ei = logp pi

minimize �X

i

ei

1

8j : �+X

j:xi2vars(Sj)

ei

� µj

maximize (X

j

fj

µj

� f0)

X

j

fj

1

8i :X

j:xi2vars(Sj)

fj

f0Primal/Dual uj = fj / f0

u0 = 1 / f0 at optimality: u0 = Σj uj

Lalgo

= max

j

MjQ

i:xi2vars(Sj)pi

Outline

•  The MPC Model

•  Communication Cost for Triangles

•  Communication Cost for CQ’s

•  Discussion

45

Discussion Communication costs for multiple rounds •  Lower/upper bounds in [Beame’13] connect the

number of rounds to log of the query diameter •  Weaker communication model: algorithm sends

only tuples, not arbitrary bits Communication cost for skewed data •  Lower/upper bounds in [Beame’14] have a gap of

poly log p Communication costs beyond full CQ’s •  Open!

46

top related