composing and executing parallel data flow graphs wth shell pipes

35
Composing and Executing Parallel Data-flow Graphs with Shell Pipes Edward Walker (TACC) Weijia Xu (TACC) Vinoth Chandar (Oracle Corp)

Upload: vinoth-chandar

Post on 30-Jun-2015

1.095 views

Category:

Technology


2 download

TRANSCRIPT

Page 1: Composing and Executing Parallel Data Flow Graphs wth Shell Pipes

Composing and Executing Parallel Data-flow Graphs with

Shell Pipes

Edward Walker (TACC)

Weijia Xu (TACC)

Vinoth Chandar (Oracle Corp)

Page 2: Composing and Executing Parallel Data Flow Graphs wth Shell Pipes

Agenda

• Motivation

• Shell language extensions

• Implementation

• Experimental evaluation

• Conclusions

Page 3: Composing and Executing Parallel Data Flow Graphs wth Shell Pipes

Motivation• Distributed memory clusters are becoming pervasive in

industry and academia

• Shells are the default login environment on these systems

• Shell pipes are commonly used for composing extensible unix commands.

• There has been no change to the syntax/semantics of shell pipes since their invention over 30 years ago.

• Growing need to compose massively parallel jobs quickly, using existing software

Page 4: Composing and Executing Parallel Data Flow Graphs wth Shell Pipes

Extending Shells for Parallel Computing

• Build a simple, powerful coordination layer at the Shell

• The coordination layer transparently manages the parallelism in the workflow

• User specifies parallel computation as a dataflow graph using extensions to the Shell

• Provides the ability to combine different tools and build interesting parallel programs quickly.

Page 5: Composing and Executing Parallel Data Flow Graphs wth Shell Pipes

Shell pipe extensions

• Pipeline fork

A | B on n procs

• Pipeline join

A on n procs | B

• Pipeline cycles

(++ n A)

• Pipeline key-value aggregation

A | B on keys

Page 6: Composing and Executing Parallel Data Flow Graphs wth Shell Pipes

Parallel shell tasks extensions

> function foo(){ echo “hello world”}

> foo on all procs # foo() on all CPUs

> foo on all nodes # foo() on all nodes

> foo on 10:2 procs # 10 tasks, 2 tasks on each node

> foo on 10:2:2 procs # 10 tasks, 2 tasks on alternative node

stride

span

Page 7: Composing and Executing Parallel Data Flow Graphs wth Shell Pipes

Composing data-flow graphs

• Example 1:

function B1() {}

function B2() {}

function B(){

if (($_ASPECT_TASKID == 0 )) ; then B1

else B2endif

}

A | B on 2 procs | C

A

B1

B2

C

Page 8: Composing and Executing Parallel Data Flow Graphs wth Shell Pipes

Composing data-flow graphs

• Example 2:

function map() {

emit_tuple –k key –v value}

function reduce(){

consume_tuple –k key –v value

num=${#value[@]}for ((i=0; i < $num; i++)) ; do

# process key=$key, value=${value[$i]}done

}

map on all procs | reduce on keys

map

map reduce

reduce

Key-valueDHT

Page 9: Composing and Executing Parallel Data Flow Graphs wth Shell Pipes

BASH Implementation

Page 10: Composing and Executing Parallel Data Flow Graphs wth Shell Pipes

Startup Overlay• Script may have many instances requiring

startup of parallel tasks

• Motivation for overlay:– Fast startup of parallel shell workers– Handles node failures gracefully

• Two level hierarchy: sectors and proxies

• Overlay node addressing:

Proxy idSector id

07

Compute node ID

Page 11: Composing and Executing Parallel Data Flow Graphs wth Shell Pipes

Fault-Tolerance

• Proxy nodes monitor peers within sector, and sector heads monitor peer sectors

• Node 0 maintains a list of available nodes in the overlay in a master_node file

Proxy exec

Proxy exec

Proxy exec

Proxy exec

Overlay sector 0

Proxy exec

Proxy exec

Proxy exec

Proxy exec

Overlay sector 1

Node 0

Node 1Node 2

Node 3

Node 4 Node 5

Node 7Node 6

master_node

Page 12: Composing and Executing Parallel Data Flow Graphs wth Shell Pipes

Starting shell workers with startup overlay

Page 13: Composing and Executing Parallel Data Flow Graphs wth Shell Pipes

1. Bash spawns agent.2. Agent queries master_node and spawns node I/O multiplexor

Proxy exec

Proxy exec

Proxy exec

Proxy exec

Overlay sector 0

Proxy exec

Proxy exec

Proxy exec

Proxy exec

Overlay sector 1

Node 0

Node 1Node 2

Node 3

Node 4 Node 5

Node 7Node 6

Node I/O MUX

agentBASH

(2)

(1)

master_node

(2)

Page 14: Composing and Executing Parallel Data Flow Graphs wth Shell Pipes

3. Agent Invokes overlay to spawnCPU I/O multiplexor on node

Proxy exec

Proxy exec

Proxy exec

Proxy exec

Overlay sector 0

Proxy exec

Proxy exec

Proxy exec

Proxy exec

Overlay sector 1

Node 0

Node 1Node 2

Node 3

Node 4 Node 5

Node 7Node 6

Node I/O MUX

agent

CPU I/O MUX

BASH

(3)

(3)

(2)

(1)

Page 15: Composing and Executing Parallel Data Flow Graphs wth Shell Pipes

4. CPU I/O multiplexor spawns a shell worker per CPU on node

Proxy exec

Proxy exec

Proxy exec

Proxy exec

Overlay sector 0

Proxy exec

Proxy exec

Proxy exec

Proxy exec

Overlay sector 1

Node 0

Node 1Node 2

Node 3

Node 4 Node 5

Node 7Node 6

Node I/O MUX

agent

CPU I/O MUX

CPU CPUCPU

BASH

(3)

(3)

(2)

(1)

(4)

Page 16: Composing and Executing Parallel Data Flow Graphs wth Shell Pipes

5. CPU I/O multiplexor calls back to node I/O multiplexor

Proxy exec

Proxy exec

Proxy exec

Proxy exec

Overlay sector 0

Proxy exec

Proxy exec

Proxy exec

Proxy exec

Overlay sector 1

Node 0

Node 1Node 2

Node 3

Node 4 Node 5

Node 7Node 6

Node I/O MUX

agent

CPU I/O MUX

CPU CPUCPU

BASH

(3)

(3)

(2)

(1)

(4)

(5)

Page 17: Composing and Executing Parallel Data Flow Graphs wth Shell Pipes

Implementation of pipeline fork

Page 18: Composing and Executing Parallel Data Flow Graphs wth Shell Pipes

1. Process B pipes stdin into stdin_file

BASH

stdin_file

stdin reader

A | B on N procs

A

stdinstdout (1)pipe

aspect-agent B

Page 19: Composing and Executing Parallel Data Flow Graphs wth Shell Pipes

2. Constructs command files for each task

BASH

Cmd dispatcher

(2)stdin_file

stdin reader

A | B on N procs

A

stdinstdout (1)pipe

B

Cmd files

cat stdin_file | B

aspect-agent B

Page 20: Composing and Executing Parallel Data Flow Graphs wth Shell Pipes

3. 4. and 5. Execute command files in shell workers and marshal results back to shell

BASH

NodeMUXNode

MUX

Shell worker

Shell worker

B B

flusherflusher

flusher

Cmd dispatcher

Compute node

(2)

(3)

(5)

stdout

I/OMUX

stdin_file

stdin reader

A | B on N procs

co n tro l

NodeMUX

queue

A

stdinstdout (1)pipe

B

Cmd files

cat stdin_file | B

aspect-agent B

(4)

Page 21: Composing and Executing Parallel Data Flow Graphs wth Shell Pipes

6. Replay command files on failure

BASH

NodeMUXNode

MUX

Shell worker

Shell worker

B B

flusherflusher

flusher

Cmd dispatcher

replayer

Compute node

Shell worker

Shell worker

B B

(2)

(3)

(5)

(6)Local compute node

stdout

I/OMUX

stdin_file

stdin reader

A | B on N procs

co n tro l

NodeMUX

queue

A

stdinstdout (1)pipe

B

Cmd files

cat stdin_file | B

aspect-agent B

(4)

Page 22: Composing and Executing Parallel Data Flow Graphs wth Shell Pipes

Implementation of key-value aggregation

Page 23: Composing and Executing Parallel Data Flow Graphs wth Shell Pipes

1. Agent inspects and hashes key

BASH

AKey

dispatcher

control control

A | B on keys

(1)

pipe

aspect-agent B

Page 24: Composing and Executing Parallel Data Flow Graphs wth Shell Pipes

2. Routes key-value to compute node based on key hash, and stored in hash table

BASH

AKey

dispatcher

control control

A | B on keys

Compute node Compute node

Hash table

Hash tablegdbmgdbm

Distributed Hash Table

(1)

(2)

Node MUX

pipe

aspect-agent B

Page 25: Composing and Executing Parallel Data Flow Graphs wth Shell Pipes

3. Each node constructs command files to pipe the key-value entry from its hash table into process B

BASH

AKey

dispatcher

control control

A | B on keys

Compute node Compute node

emit_tuple emit_tuple

Hash table

B

Hash tablegdbm

B

gdbm

Distributed Hash Table

(1)

(2)

(3)

Node MUX

pipe

aspect-agent B

Page 26: Composing and Executing Parallel Data Flow Graphs wth Shell Pipes

4. Results from the command files execution are marshaled back to the shell

BASH

AKey

dispatcher

control control

A | B on keys

Compute node Compute node

emit_tuple emit_tuple

Hash table

B

Hash tablegdbm

B

gdbm

Distributed Hash Table

(1)

(2)

(3)

(4)

I/O MUX

stdout

co ntrol

Node MUX

pipe

aspect-agent B

Page 27: Composing and Executing Parallel Data Flow Graphs wth Shell Pipes

Experimental Evaluation

Page 28: Composing and Executing Parallel Data Flow Graphs wth Shell Pipes

Startup overlay performance (when compared to SSH default mechanism)

Page 29: Composing and Executing Parallel Data Flow Graphs wth Shell Pipes

Syntactic benchmark I: performance of pipeline join

Page 30: Composing and Executing Parallel Data Flow Graphs wth Shell Pipes

Syntactic benchmark II: performance of key-value aggregation

Page 31: Composing and Executing Parallel Data Flow Graphs wth Shell Pipes

TeraSort benchmark: Parallel bucket sort

• Step 1: spawn the data generator in parallel on each compute node, partitioning data across N nodes for task T if the first 2 bytes fall in the range:

• Step 2: perform sort on local data on each node

• Step 3: merge results onto global file system

N

T

N

T 12,2 1616

Page 32: Composing and Executing Parallel Data Flow Graphs wth Shell Pipes

TeraSort benchmark: Sorting rate

Page 33: Composing and Executing Parallel Data Flow Graphs wth Shell Pipes

Related Work

• Ptolemy – embedded system design

• Yahoo Pipes – web content filtering

• Hadoop – Java implementation of MapReduce

• Dryad - distributed DAG data flow computation

Page 34: Composing and Executing Parallel Data Flow Graphs wth Shell Pipes

Conclusion

• A debugger would be extremely helpful. Working on bashdb implementation.

• Run-time simulator would be helpful to predict performance based on characteristics of cluster.

• Still thinking about how to incorporate our extensions for named pipes (i.e. mkfifo).

Page 35: Composing and Executing Parallel Data Flow Graphs wth Shell Pipes

Questions ?