composing and executing parallel data flow graphs wth shell pipes
TRANSCRIPT
Composing and Executing Parallel Data-flow Graphs with
Shell Pipes
Edward Walker (TACC)
Weijia Xu (TACC)
Vinoth Chandar (Oracle Corp)
Agenda
• Motivation
• Shell language extensions
• Implementation
• Experimental evaluation
• Conclusions
Motivation• Distributed memory clusters are becoming pervasive in
industry and academia
• Shells are the default login environment on these systems
• Shell pipes are commonly used for composing extensible unix commands.
• There has been no change to the syntax/semantics of shell pipes since their invention over 30 years ago.
• Growing need to compose massively parallel jobs quickly, using existing software
Extending Shells for Parallel Computing
• Build a simple, powerful coordination layer at the Shell
• The coordination layer transparently manages the parallelism in the workflow
• User specifies parallel computation as a dataflow graph using extensions to the Shell
• Provides the ability to combine different tools and build interesting parallel programs quickly.
Shell pipe extensions
• Pipeline fork
A | B on n procs
• Pipeline join
A on n procs | B
• Pipeline cycles
(++ n A)
• Pipeline key-value aggregation
A | B on keys
Parallel shell tasks extensions
> function foo(){ echo “hello world”}
> foo on all procs # foo() on all CPUs
> foo on all nodes # foo() on all nodes
> foo on 10:2 procs # 10 tasks, 2 tasks on each node
> foo on 10:2:2 procs # 10 tasks, 2 tasks on alternative node
stride
span
Composing data-flow graphs
• Example 1:
function B1() {}
function B2() {}
function B(){
if (($_ASPECT_TASKID == 0 )) ; then B1
else B2endif
}
A | B on 2 procs | C
A
B1
B2
C
Composing data-flow graphs
• Example 2:
function map() {
emit_tuple –k key –v value}
function reduce(){
consume_tuple –k key –v value
num=${#value[@]}for ((i=0; i < $num; i++)) ; do
# process key=$key, value=${value[$i]}done
}
map on all procs | reduce on keys
map
map reduce
reduce
Key-valueDHT
BASH Implementation
Startup Overlay• Script may have many instances requiring
startup of parallel tasks
• Motivation for overlay:– Fast startup of parallel shell workers– Handles node failures gracefully
• Two level hierarchy: sectors and proxies
• Overlay node addressing:
Proxy idSector id
07
Compute node ID
Fault-Tolerance
• Proxy nodes monitor peers within sector, and sector heads monitor peer sectors
• Node 0 maintains a list of available nodes in the overlay in a master_node file
Proxy exec
Proxy exec
Proxy exec
Proxy exec
Overlay sector 0
Proxy exec
Proxy exec
Proxy exec
Proxy exec
Overlay sector 1
Node 0
Node 1Node 2
Node 3
Node 4 Node 5
Node 7Node 6
master_node
Starting shell workers with startup overlay
1. Bash spawns agent.2. Agent queries master_node and spawns node I/O multiplexor
Proxy exec
Proxy exec
Proxy exec
Proxy exec
Overlay sector 0
Proxy exec
Proxy exec
Proxy exec
Proxy exec
Overlay sector 1
Node 0
Node 1Node 2
Node 3
Node 4 Node 5
Node 7Node 6
Node I/O MUX
agentBASH
(2)
(1)
master_node
(2)
3. Agent Invokes overlay to spawnCPU I/O multiplexor on node
Proxy exec
Proxy exec
Proxy exec
Proxy exec
Overlay sector 0
Proxy exec
Proxy exec
Proxy exec
Proxy exec
Overlay sector 1
Node 0
Node 1Node 2
Node 3
Node 4 Node 5
Node 7Node 6
Node I/O MUX
agent
CPU I/O MUX
BASH
(3)
(3)
(2)
(1)
4. CPU I/O multiplexor spawns a shell worker per CPU on node
Proxy exec
Proxy exec
Proxy exec
Proxy exec
Overlay sector 0
Proxy exec
Proxy exec
Proxy exec
Proxy exec
Overlay sector 1
Node 0
Node 1Node 2
Node 3
Node 4 Node 5
Node 7Node 6
Node I/O MUX
agent
CPU I/O MUX
CPU CPUCPU
BASH
(3)
(3)
(2)
(1)
(4)
5. CPU I/O multiplexor calls back to node I/O multiplexor
Proxy exec
Proxy exec
Proxy exec
Proxy exec
Overlay sector 0
Proxy exec
Proxy exec
Proxy exec
Proxy exec
Overlay sector 1
Node 0
Node 1Node 2
Node 3
Node 4 Node 5
Node 7Node 6
Node I/O MUX
agent
CPU I/O MUX
CPU CPUCPU
BASH
(3)
(3)
(2)
(1)
(4)
(5)
Implementation of pipeline fork
1. Process B pipes stdin into stdin_file
BASH
stdin_file
stdin reader
A | B on N procs
A
stdinstdout (1)pipe
aspect-agent B
2. Constructs command files for each task
BASH
Cmd dispatcher
(2)stdin_file
stdin reader
A | B on N procs
A
stdinstdout (1)pipe
B
Cmd files
cat stdin_file | B
aspect-agent B
3. 4. and 5. Execute command files in shell workers and marshal results back to shell
BASH
NodeMUXNode
MUX
Shell worker
Shell worker
B B
flusherflusher
flusher
Cmd dispatcher
Compute node
(2)
(3)
(5)
stdout
I/OMUX
stdin_file
stdin reader
A | B on N procs
co n tro l
NodeMUX
queue
A
stdinstdout (1)pipe
B
Cmd files
cat stdin_file | B
aspect-agent B
(4)
6. Replay command files on failure
BASH
NodeMUXNode
MUX
Shell worker
Shell worker
B B
flusherflusher
flusher
Cmd dispatcher
replayer
Compute node
Shell worker
Shell worker
B B
(2)
(3)
(5)
(6)Local compute node
stdout
I/OMUX
stdin_file
stdin reader
A | B on N procs
co n tro l
NodeMUX
queue
A
stdinstdout (1)pipe
B
Cmd files
cat stdin_file | B
aspect-agent B
(4)
Implementation of key-value aggregation
1. Agent inspects and hashes key
BASH
AKey
dispatcher
control control
A | B on keys
(1)
pipe
aspect-agent B
2. Routes key-value to compute node based on key hash, and stored in hash table
BASH
AKey
dispatcher
control control
A | B on keys
Compute node Compute node
Hash table
Hash tablegdbmgdbm
Distributed Hash Table
(1)
(2)
Node MUX
pipe
aspect-agent B
3. Each node constructs command files to pipe the key-value entry from its hash table into process B
BASH
AKey
dispatcher
control control
A | B on keys
Compute node Compute node
emit_tuple emit_tuple
Hash table
B
Hash tablegdbm
B
gdbm
Distributed Hash Table
(1)
(2)
(3)
Node MUX
pipe
aspect-agent B
4. Results from the command files execution are marshaled back to the shell
BASH
AKey
dispatcher
control control
A | B on keys
Compute node Compute node
emit_tuple emit_tuple
Hash table
B
Hash tablegdbm
B
gdbm
Distributed Hash Table
(1)
(2)
(3)
(4)
I/O MUX
stdout
co ntrol
Node MUX
pipe
aspect-agent B
Experimental Evaluation
Startup overlay performance (when compared to SSH default mechanism)
Syntactic benchmark I: performance of pipeline join
Syntactic benchmark II: performance of key-value aggregation
TeraSort benchmark: Parallel bucket sort
• Step 1: spawn the data generator in parallel on each compute node, partitioning data across N nodes for task T if the first 2 bytes fall in the range:
• Step 2: perform sort on local data on each node
• Step 3: merge results onto global file system
N
T
N
T 12,2 1616
TeraSort benchmark: Sorting rate
Related Work
• Ptolemy – embedded system design
• Yahoo Pipes – web content filtering
• Hadoop – Java implementation of MapReduce
• Dryad - distributed DAG data flow computation
Conclusion
• A debugger would be extremely helpful. Working on bashdb implementation.
• Run-time simulator would be helpful to predict performance based on characteristics of cluster.
• Still thinking about how to incorporate our extensions for named pipes (i.e. mkfifo).
Questions ?