optimus: a dynamic rewriting framework for data-parallel execution plans
DESCRIPTION
Optimus: A Dynamic Rewriting Framework for Data-Parallel Execution Plans. Qifa Ke, Michael Isard, Yuan Yu Microsoft Research Silicon Valley EuroSys 2013. Distributed Data-Parallel Computing. Distributed execution plan generated by query compiler ( DryadLINQ ) - PowerPoint PPT PresentationTRANSCRIPT
![Page 1: Optimus: A Dynamic Rewriting Framework for Data-Parallel Execution Plans](https://reader035.vdocuments.us/reader035/viewer/2022062520/568164da550346895dd72652/html5/thumbnails/1.jpg)
Optimus: A Dynamic Rewriting Framework for Data-Parallel Execution
PlansQifa Ke, Michael Isard, Yuan Yu
Microsoft Research Silicon ValleyEuroSys 2013
![Page 2: Optimus: A Dynamic Rewriting Framework for Data-Parallel Execution Plans](https://reader035.vdocuments.us/reader035/viewer/2022062520/568164da550346895dd72652/html5/thumbnails/2.jpg)
Distributed Data-Parallel Computing
• Distributed execution plan generated by query compiler (DryadLINQ)• Automatic distributed execution (Dryad)
![Page 3: Optimus: A Dynamic Rewriting Framework for Data-Parallel Execution Plans](https://reader035.vdocuments.us/reader035/viewer/2022062520/568164da550346895dd72652/html5/thumbnails/3.jpg)
Execution Plan Graph (EPG)• EPG: distributed execution plan
represented as a DAG:- Representing computation and
dataflow of data-parallel program
• Core data structure in distributed execution engines
- Task distribution- Job management- Fault tolerance
M
D
MG
G
R
X
M
D
M
D
MG
G
R
X
MG
G
R
X
Map
Distribute
Merge
GroupBy
Reduce
EPG of MapReduce
![Page 4: Optimus: A Dynamic Rewriting Framework for Data-Parallel Execution Plans](https://reader035.vdocuments.us/reader035/viewer/2022062520/568164da550346895dd72652/html5/thumbnails/4.jpg)
Outline• Motivational problems • Optimus system• Graph rewriters• Experimental evaluation• Summary & conclusion
![Page 5: Optimus: A Dynamic Rewriting Framework for Data-Parallel Execution Plans](https://reader035.vdocuments.us/reader035/viewer/2022062520/568164da550346895dd72652/html5/thumbnails/5.jpg)
Problem 1: Data Partitioning• Basic operation to achieve data parallelism• Example: MapReduce
- Number of partitions = number of reducers• More reducers: better load balancing but more
overheads in scheduling and disk I/O- Data skew: e.g., popular keys
• Require statistics of Mapper outputs- Hard to estimate at compile time- But available at runtime
M
D
MG
G
R
X
M
D
M
D
MG
G
R
X
MG
G
R
X
We need dynamic data partitioning.
![Page 6: Optimus: A Dynamic Rewriting Framework for Data-Parallel Execution Plans](https://reader035.vdocuments.us/reader035/viewer/2022062520/568164da550346895dd72652/html5/thumbnails/6.jpg)
Problem 2: Matrix Computation• Widely used in large-scale data analysis• Data model: sparse or dense matrix?
- Compile-time: unknown density of intermediate matrices
- Sparse input matrices: - Intermediate result may be dense
How to dynamically choose data model and alternative algorithms ?
• Alternative algorithms for a given matrix computation- Chosen based on runtime data statistics of input matrices
![Page 7: Optimus: A Dynamic Rewriting Framework for Data-Parallel Execution Plans](https://reader035.vdocuments.us/reader035/viewer/2022062520/568164da550346895dd72652/html5/thumbnails/7.jpg)
Problem 3: Iterative Computation• Required by machine learning and
data analysis• Problem: stop condition unknown
at compile time- Each job performs N iterative steps- Submit multiple jobs and check
convergence at client
• How to enable iterative computation in one single job ?
- Simplifies job monitoring and fault-tolerance
- Reduces job submission overhead
A
B
In Ctr
Iter 1
A
B
Iter 2 Job 1
Job 2
![Page 8: Optimus: A Dynamic Rewriting Framework for Data-Parallel Execution Plans](https://reader035.vdocuments.us/reader035/viewer/2022062520/568164da550346895dd72652/html5/thumbnails/8.jpg)
Problem 4: Fault Tolerance• Intermediate results can be re-generated
by re-executing vertices• Important intermediate results:
expensive to regenerate when lost- Compute-intensive vertices- Critical chain: a long chain of vertices reside
in same machine due to data locality
• How to identify and protect important intermediate results at runtime?
X
A
B
C
![Page 9: Optimus: A Dynamic Rewriting Framework for Data-Parallel Execution Plans](https://reader035.vdocuments.us/reader035/viewer/2022062520/568164da550346895dd72652/html5/thumbnails/9.jpg)
Problem 5: EPG Optimization
• Compile-time query optimization- Using data statistics available at compile time- EPG typically unchanged during execution
• Problems with compile-time optimization:- Data statistics of intermediate stages hard to estimate
• Complicated by user-defined functions
• How to optimize EPG at runtime?
Query compiler DryadLINQ EPG
Distributed execution
engine: Dryad
User program:
LINQ query
Client computer Compute cluster
![Page 10: Optimus: A Dynamic Rewriting Framework for Data-Parallel Execution Plans](https://reader035.vdocuments.us/reader035/viewer/2022062520/568164da550346895dd72652/html5/thumbnails/10.jpg)
Optimus: Dynamic Graph Rewriting• Dynamically rewrite EPG based on:
- Data statistics collected at runtime- Compute resources available at runtime
• Goal: extensible- Implement rewriters at language layer
• Without modifying execution engine (e.g., Dryad) - Allows users to specify rewrite logic
![Page 11: Optimus: A Dynamic Rewriting Framework for Data-Parallel Execution Plans](https://reader035.vdocuments.us/reader035/viewer/2022062520/568164da550346895dd72652/html5/thumbnails/11.jpg)
Example: MapReduceStatistics
collection at data plane
Rewrite message sent to graph
rewriter at control plane
• Merge small partitions• Split popular keys
H
M
D
H
M
D
H
M
D MG
GH
Graph rewriter
Rewrite message
MG
G
R
X
MG
G
R
X
MG
G
R
X
HM
DHM
DHM
D
MG
GRX
MG
MG
KRX
RX
GRX
GH
![Page 12: Optimus: A Dynamic Rewriting Framework for Data-Parallel Execution Plans](https://reader035.vdocuments.us/reader035/viewer/2022062520/568164da550346895dd72652/html5/thumbnails/12.jpg)
Outline• Motivational problems• Optimus system• Graph rewriters• Experimental evaluation• Summary & conclusion
![Page 13: Optimus: A Dynamic Rewriting Framework for Data-Parallel Execution Plans](https://reader035.vdocuments.us/reader035/viewer/2022062520/568164da550346895dd72652/html5/thumbnails/13.jpg)
Optimus System Architecture
• Build on DryadLINQ and Dryad• Modules
- Statistics collecting- Rewrite messaging
• Data plane control plane
- Graph rewriting
• Extensible- Statistics and rewrite
logic at language/user layers
- Rewriting operation at execution layer
Clie
nt c
ompu
ter User Program User-defined
StatisticsUser-defined Rewrite Logic
MessagingWorker
Vertex Code
Dryad Worker Vertex
Worker Vertex Harness
Clus
ter
…..
Dryad Job Manager (JM)
Core Execution Engine
Rewriter Module
Statistics
Rewrite Logic
EPG Worker Vertex Code Statistics Rewrite Logic
DryadLINQ Compiler with Optimus Extensions
![Page 14: Optimus: A Dynamic Rewriting Framework for Data-Parallel Execution Plans](https://reader035.vdocuments.us/reader035/viewer/2022062520/568164da550346895dd72652/html5/thumbnails/14.jpg)
Estimate/Collect Data Statistics• Low overhead: piggy-back into existing
vertices - Pipelining “H” into “M”
• Extensible- Statistics estimator/collector defined at
language layer or user-level
• All at data plane: avoid overwhelming control plane
- “H”: distributed statistics estimation/collection
- “MG” and “GH”: merge statistics into rewriting message
H
M
D
H
M
D
H
M
D MG
GH
Graph rewriter
Rewrite message
MG
G
R
X
MG
G
R
X
MG
G
R
X
![Page 15: Optimus: A Dynamic Rewriting Framework for Data-Parallel Execution Plans](https://reader035.vdocuments.us/reader035/viewer/2022062520/568164da550346895dd72652/html5/thumbnails/15.jpg)
Graph Rewriting Module• A set of primitives to query and modify EPG• Rewriting operation depends on vertex state:
- INACTIVE: all rewriting primitives applicable- RUNNING: killed and transited to INACTIVE, discarding
partial results- COMPLETED: redirect vertex I/O
![Page 16: Optimus: A Dynamic Rewriting Framework for Data-Parallel Execution Plans](https://reader035.vdocuments.us/reader035/viewer/2022062520/568164da550346895dd72652/html5/thumbnails/16.jpg)
Outline• Motivational problems• Optimus system• Graph rewriters• Experimental evaluation• Summary & conclusion
![Page 17: Optimus: A Dynamic Rewriting Framework for Data-Parallel Execution Plans](https://reader035.vdocuments.us/reader035/viewer/2022062520/568164da550346895dd72652/html5/thumbnails/17.jpg)
Dynamic Data (Co-)Partitioning• Co-partitioning:
- Use a common parameter set to partition multiple data sets - Used by multi-source operators, e.g., Join
• Co-range partition in Optimus:
• “H”: histogram at each partition• “GH”: merged histogram
• : composition, application specific
• “K”: estimate range keys based on
• Rewriting message: range keys• Rewriting operation: splitting merge nodes
I
H
D
MG
GH
I
H
D
MG
K
I
H
D
MG
I
H
D
MG
I
H
D
MGMGMG MG
Graph rewriter
Rewrite message
![Page 18: Optimus: A Dynamic Rewriting Framework for Data-Parallel Execution Plans](https://reader035.vdocuments.us/reader035/viewer/2022062520/568164da550346895dd72652/html5/thumbnails/18.jpg)
Hybrid JoinIH
IH
IH
IH
IH
GH
K
D D
MG MG MG MG
D D
MG MG MG MG
D
J J J JD1
JJ
• Co-partition to prepare data for partition-wise Join• Skew detected at runtime• Re-partition skewed
partition- Local broadcast join
![Page 19: Optimus: A Dynamic Rewriting Framework for Data-Parallel Execution Plans](https://reader035.vdocuments.us/reader035/viewer/2022062520/568164da550346895dd72652/html5/thumbnails/19.jpg)
Iterative Computation
• Optimus: enables iterative computation in a single job
- “C”: check stop condition
- Construct another loop if needed
A
B
In Ctr
Iter 1
A
B
Iter 2
C
Graph rewriter
Rew
rite
mes
sage
Out
C
![Page 20: Optimus: A Dynamic Rewriting Framework for Data-Parallel Execution Plans](https://reader035.vdocuments.us/reader035/viewer/2022062520/568164da550346895dd72652/html5/thumbnails/20.jpg)
Matrix Multiplication• Different ways to do
- Choose based on matrix sizes and density
A B C D E F G H
AE BG AF BH CE CF DHDG
AE+BG AF+BH CE+DG CF+DH
A B C D E F G H
AE BF CG DH
AE+BF+CG+DH
A B C D E F G H
AG BG CG DG
AE BE CE DE
AF BF CF GF
AH BH CH DH
A B C D V
AV BV CV DV
![Page 21: Optimus: A Dynamic Rewriting Framework for Data-Parallel Execution Plans](https://reader035.vdocuments.us/reader035/viewer/2022062520/568164da550346895dd72652/html5/thumbnails/21.jpg)
Matrix Computation• Systems dedicated to matrix computations: MadLINQ• Optimus: extensibility allows integrating matrix
computation with general-purpose DryadLINQ computations• Runtime decisions
- Data partitioning: subdivide matrices- Data model: sparse or dense- Implementation: a matrix operation often has many
algorithmic implementations
![Page 22: Optimus: A Dynamic Rewriting Framework for Data-Parallel Execution Plans](https://reader035.vdocuments.us/reader035/viewer/2022062520/568164da550346895dd72652/html5/thumbnails/22.jpg)
Reliability Enhancer for Fault Tolerance• Replication graph to protect important data
generated by “A”:
A
B
A
B
C
O
• “C” vertex: • copy output of “A” to
another computer• “O” vertex: • allow “B” choose one of
two inputs to “O”
![Page 23: Optimus: A Dynamic Rewriting Framework for Data-Parallel Execution Plans](https://reader035.vdocuments.us/reader035/viewer/2022062520/568164da550346895dd72652/html5/thumbnails/23.jpg)
Outline• Motivational problems• Optimus system• Graph rewriters• Experimental evaluation• Summary & conclusion
![Page 24: Optimus: A Dynamic Rewriting Framework for Data-Parallel Execution Plans](https://reader035.vdocuments.us/reader035/viewer/2022062520/568164da550346895dd72652/html5/thumbnails/24.jpg)
Evaluation: Product-Offer Matching by Join
• Input: 5M products + 4M offers- Matching function: compute intensive
• Algorithms:- Partition-wise GroupJoin- Broadcast-Join- CoGroup: specialized solution- Optimus
Baseline CoGroup Broadcast Optimus
0.82 0.72 0.55 0.81
Aggregated CPU utilization
Job completion time Cluster (machine) utilization
![Page 25: Optimus: A Dynamic Rewriting Framework for Data-Parallel Execution Plans](https://reader035.vdocuments.us/reader035/viewer/2022062520/568164da550346895dd72652/html5/thumbnails/25.jpg)
Evaluation: Matrix Multiplication• Movie recommendation by collaborative filtering:
- Dataset: Netflix challenge. • Matrix R: , sparsity
• Comparisons:- Mahout- MadLINQ- Optimus with sparse representation (S-S-S)- Optimus with data model adaption (S-D-D)
Job completion time in seconds
46800
![Page 26: Optimus: A Dynamic Rewriting Framework for Data-Parallel Execution Plans](https://reader035.vdocuments.us/reader035/viewer/2022062520/568164da550346895dd72652/html5/thumbnails/26.jpg)
Related Work• Dryad: system-level rewriting without semantics of code and data• Database: dynamic graph rewriting in a single server environment
- Eddies: fine-grain (record-level) optimization- Eddies + Optimus: combine record-level and vertex-level optimization
• CIEL: programming/execution model different from DryadLINQ/Dryad
- Dynamically expands EPG by scripts running at each worker- Hard to achieve some dynamic optimizations:
• Replacing a running task with a subgraph• Reliability enhancer.
- Ciel can incorporate Optimus-like components to support dynamic optimizations.
• RoPE: uses statistics of previously-executed queries to optimize new jobs using same queries
![Page 27: Optimus: A Dynamic Rewriting Framework for Data-Parallel Execution Plans](https://reader035.vdocuments.us/reader035/viewer/2022062520/568164da550346895dd72652/html5/thumbnails/27.jpg)
Summary & Conclusion• A flexible/extensible framework to modify EPG at
runtime• Enable runtime optimizations and specializations
hard to achieve in other systems• A rich set of graph rewriters
- Substantial performance benefit compared to statically generated plan
• A versatile addition to a data-parallel execution framework
![Page 28: Optimus: A Dynamic Rewriting Framework for Data-Parallel Execution Plans](https://reader035.vdocuments.us/reader035/viewer/2022062520/568164da550346895dd72652/html5/thumbnails/28.jpg)
Thanks!