grappa: enabling next-generation analytics tools via latency...
TRANSCRIPT
Grappa is a modern take on software distributed shared memory, tailored to exploit parallelism inherent in data-intensive applications to overcome their poor locality and input-dependent load distribution. !
Grappa differs from traditional DSMs in three ways:!
• Instead of minimizing per-operation latency for performance, Grappa tolerates latency with concurrency (latency-sensitive apps need not apply!)!
• Grappa moves computation to data instead of caching data at computation!
• Grappa operates at byte granularity rather than page granularity!
Latency tolerance has been applied successfully in hardware for nanosecond latencies (e.g., superscalar processors and GPUs). This project explores the application of this idea at distributed system scales with millisecond latencies.
Grappa: enabling next-generation analytics tools via latency-tolerant distributed shared memory Jacob Nelson, Brandon Myers, Brandon Holt, Preston Briggs, Luis Ceze, Simon Kahan, Mark Oskin {nelson, bholt, bdmyers, preston, luisceze, skahan, oskin}@cs.washington.edu University of Washington
Dynamic Concurrency Discovery for Very Large Windows of Execution
Jacob Nelson, Luis Ceze
University of Washington
http://sampa.cs.washington.edu
Limit studies have shown that
sequential programs have
untapped opportunities for parallelization.
How can we identify them?
How can we parallelize
a sequential program?
Start with sequential trace of
dynamic instructions
Tasks are groups of
instructions that will execute
together
Dependences impose order
on task execution
Critical dependences dictate
the minimum execution time
2
3
6
7
4
5
1
Time
InstructionsTask
ID
st B
ld D
st C
ld B
ld C
st B
st C
ld A
ld B
ld D
ld A
ld D
st D 1
2
3
4
5
6
7
Dynamic
Dependence
Graph
1 2
3 45
6
7
• Measured
- overall edge error
- edge error on critical paths
• As we increase bit budget,
Bloom Map becomes
more effective
• Bloom Map
finds dependences on
critical paths to each node
0
0.2
0.4
0.6
0.8
1.0
Overall Critical Dependences
Harmonic Mean
0 0
1Mb
16Mb
129Mb
1Gb
Edge Error for SPECINT 2006
(Smaller bars are better)
• Goal:
Capture critical dependences
• Idea:
Store dependence sources
in imprecise map
• Allowed Imprecision:
- may return correct dependence source
- may return incorrect dependence source
- may find spurious dependence
• Bloom Map configuration determines
likelihood of imprecision
2A: B: 2 C: 3 D: 1
1: st D
2: st B
3: st C
4: ld B
Bloom Map Concept
• Extends Bloom Filter:
Now store Task IDs
instead of just bits
Now, instead of “no” or “yes”,
queries return “no” or
“yes, and the source is X”
• Writes: store writing Task ID,
overwriting previous value
• Reads: choose one selected Task ID
Address
H1 Hn
Task IDs
Result
Choice
H1 Hn
...
...
...
1
Result
Choice
1: st A
1 12...
2: st B
3: ld A
2
ld A
• Task IDs increase monotonically;
writes inserted in order
• One option: Choose newest Task ID
No overwritten locations: correct source
One overwritten location: later source
• Better option: choose oldest Task ID
As long as correct Task ID is visible in
one bit vector, we can find it
• No later write to A can exist:
would have updated all hashed locations
Choosing Between Bloom Map Results
Task
s
Dep
end
ences
• Identify dependences
to find concurrency
• Select tasks for parallel execution,
balancing potential speedup
with overhead
• Execute tasks in parallel,
enforcing dependences
Our vision:
Dynamic thread-level speculation
Bloom Map Structure
sa pa
000
More info, tech report, downloads: http://grappa.io
Friendster Twitter
0
2
4
6
0
1
2
3
4
Pagerank CC SSSP BFS Pagerank CC SSSP BFSApplications
Tim
e (n
orm
alize
d to
Gra
ppa)
Grappa GraphLab(pds)
GraphLab(random)
(a) Application performance (31 nodes)
0
100
200
300
400
500
Pagerank
Tota
l dat
a m
oved
(GB)
0
20
40
60
80
Pagerank
Mea
n ag
greg
ated
pac
ket s
ize (k
B)
(b) Communication metrics at31 nodes on PageRank.
●
●
●
●
●
●
●
●
●
●
● ●●
●
●
●
●
●
●
●
●
●
●
●
●
●
●
●
●●
●
●
Friendster Twitter
0
50
100
0
50
100
150
16 32 64 128 16 32 64 128Nodes
Tim
e (s
)
● ● ●Grappa GraphLab(pds)
GraphLab(random)
●● ●
●
Synthetic
0.0
0.5
1.0
16 32 64 128Nodes
Tim
e (n
orm
alize
d)
(c) Scaling PageRank: strong scaling on Twitter andFriendster, weak scaling on synthetic graphs.
Figure 9: Performance characterization of Grappa’s GraphLab API. (a) shows time to converge (same number ofiterations) normalized to Grappa, on the Twitter and Friendster datasets. (b) shows communication metrics for thePageRank data points. (c) shows scaling results for PageRank out to 128 nodes – Friendster and Twitter measure strongscaling, and weak scaling is measured on synthetic power-law graphs scaled proportionally with nodes.
(30-80 kB) most of the network stack overhead (MPI orTCP) is amortized for both systems.
Figure 10 demonstrates the connection between concur-rency and aggregation over time while executing PageR-ank. We can clearly see that each iteration, the number ofconcurrent tasks spikes as scatter delegates are performedon outgoing edges, which leads to a corresponding spikein bandwidth due to aggregating the many concurrentmessages. At these points, Grappa achieves roughly 1.1GB/s per node, which is 47% of peak bisection bandwidthfor large packets discussed in §3.3.1, or 61% of the band-width for 80 kB messages, the average aggregated size.This discrepancy is due to not being able to aggregatepackets as fast as the network can send them, but is stillsignificantly better than unaggregated bandwidth.
Figure 9c(left) shows strong scaling results on bothdatasets. As we can see, scaling is poor beyond 32 nodesfor both platforms, due to the relatively small size of thegraphs – there is not enough parallelism for either systemto scale on this hardware. To explore how Grappa fareson larger graphs, we show results of a weak scaling exper-iment in Figure 9c(right). This experiment runs PageR-ank on synthetic graphs generated using Graph500’s Kro-necker generator, scaling the graph size with the numberof nodes, from 200M vertices, 4B edges, up to 2.1B ver-tices, 34B edges. Runtime is normalized to show distancefrom ideal scaling (horizontal line), showing that scalingdeteriorates less than 30% at 128 nodes.
4.2 Relational queries on GrappaWe used Grappa to build a distributed backend to Raco, arelational algebra compiler and optimization framework[53]. Raco supports a variety of relational query language
0
100
200
300
400
0 5 10 15 20 25
Con
curre
nt ta
sks
(milli
ons)
Peak bandwidthPeak for aggregated size
0.00.51.01.52.0
0 5 10 15 20 25Time (s)
Band
wid
th(G
B/s/
node
)
Figure 10: Grappa PageRank execution over time on 32nodes. The top shows the total number of concurrent tasks(including delegate operations), over the 85 iterations,peaks decreasing as fewer vertices are being updated.The bottom shows message bandwidth per node, whichcorrelates directly with the concurrency at each time step,compared against the peak bandwidth, and the bandwidthfor the given message size.
frontends, including SQL, Datalog, and an imperativelanguage, MyriaL. It includes an extensible relationalalgebra optimizer and various intermediate query planrepresentations.
We compare performance of our system to that ofShark, a fast implementation of Hive (SQL-like), builtupon Spark. We chose this comparison point becauseShark is optimized for in-memory execution and performscompetitively with parallel databases [65].
Our particular approach for the Grappa backend toRaco is source-to-source translation. We generateforalls for each pipeline in the physical query plan (ex-
9
GraphLab-like API"~60 lines of code, implementing:!- Synchronous engine with delta caching!- Random graph partition with no
replication!
Benchmarks run on 31 nodes using 1.8B edge Friendster social network graph and 1.4B edge Twitter follower dataset, compared with GraphLab using two partitioning strategies
In-memory MapReduce"~150 lines of code, implemented with forall loop over inputs followed by forall over keys!
K-Means computation with 64 nodes on SeaFlow flow cytometry dataset with two different k values, compared with Spark using MEMORY_ONLY fault tolerance
Backend for Raco relational query compiler"~700 lines of code, translating physical query plan into C++11 code using Grappa forall loops!
SP2Bench benchmark run on16 nodes, compared with Shark distributed query engine
Three prototype data analytics tools
Experiments run at Pacific Northwest National Laboratory on PAL cluster (1.9GHz AMD Interlagos processors,
Mellanox ConnectX-2 InfiniBand network)
0
10
20
30
40
Q3b Q3c Q1 Q3a Q9 Q5a Q5b Q2 Q4Query
Tim
e (n
orm
alize
d to
Gra
ppa) Grappa
Shark
(a) The SP
2Bench benchmark on 16 nodes. Query Q4 is a large
workload so it was run with 64 nodes.
0
5
10
15
Grappa Shark
Tim
e (s
ec)
0
1
2
3
4
Cum
ulat
ive s
peed
up Componentnetworkdata serializationiterationother overheadsapp compute
(b) Performance breakdown of speedup of GRAPPA over Shark on Q2.
Figure 9: Relational query performance.
0.0
2.5
5.0
7.5
10.0
10 10000k
Mea
n ite
ratio
n tim
e (n
orm
alize
d to
Gra
ppa)
Grappa(MapReduce) Spark
(a) Performance for 64 nodes
0.00
0.25
0.50
0.75
1.00
2 10 1001000k
Frac
tion
of it
erat
ion
time
PhaseMapCombineReduceReallocateBroadcast
(b) Breakdown of time spent in MapRe-
duce phases for GRAPPA-MapReduce
Figure 10: Data parallel experiments using k-means on a8.9GB Seaflow dataset.
into a local hash table, using GRAPPA’s partition-awareness.The global-view model of GRAPPA allows iterations to be im-plemented by the application programmer with a while loop.
4.3.1. Performance We pick k-means clustering as a testworkload; it exercises all-to-all communication and iteration.To provide a reference point, we compare the performance tothe SparkKMeans implementation for Spark. Both versionsuse the same algorithm: map the points, reduce the clustermeans, and broadcast local means. The Spark code caches theinput points in memory and does not persist partitions. Cur-rently, our implementation of MapReduce is not fault-tolerant.To ensure the comparison is fair, we made sure Spark didnot use fault-tolerance features: we used MEMORY_ONLYstorage level for RDDs, which does not replicate an RDD orpersist it to disk and verified during the runs that no partitionswere recomputed due to failures. We run k-means on a datasetfrom Seaflow [65], where each instance is a flow cytometrysample of seawater containing characteristics of phytoplank-ton cells. The dataset is 8.9GB and contains 123M instances.The clustering task is to identify species of phytoplankton sothe populations may be counted.
●
●
●
●
●
●
●
●
●
●
●
●
●
●
●
●●●
●
●
●
●
●
●
●
●
●
●
●
●
●
●
●
●
●
●●
●
●
●
●
●
●
Friendster Twitter
0
10
20
0
5
10
15
16 32 64 128 16 32 64 128Nodes
Tim
e (s
)
Platform
Grappa
GraphLab
Version
●
●
●
●
GraphLab(pds)GraphLab(random)
Grappa
Grappa (native)
Figure 11: Scaling BFS out to 128 nodes. In addition toGRAPPA’s GraphLab engine, we also show a custom algo-rithm for BFS implemented natively which employs Beamer’sbottom-up optimization to achieve even better performance.
The results are shown in Figure 10a for K = 10 andK = 10000. We find GRAPPA-MapReduce to be nearly an or-der of magnitude faster than the comparable Spark implemen-tation. Absolute runtime for GRAPPA-MapReduce is 0.13sper iteration for K = 10 and 17.3s per iteration for K = 10000,compared to 1s and 170s respectively for Spark.
We examined profiles to understand this difference. Wesee similar results as with Shark: the bulk of the differencecomes from the networking layer and from data serialization.As K grows, this problem should be compute-bound: mostexecution time is spent assigning points to clusters in the mapstep. Figure 10b shows the time breakdown for GRAPPA-MapReduce. At large K, GRAPPA-MapReduce is clearlycompute-bound. However, the equivalent profile for Sparkshows only 50% of the execution time in map; the rest ofthe time is in the reduce step in network code. GRAPPA’sefficient small message support and support for overlappingcommunication and computation help it perform well here.
10
0
10
20
30
40
Q3b Q3c Q1 Q3a Q9 Q5a Q5b Q2 Q4Query
Tim
e (n
orm
alize
d to
Gra
ppa) Grappa
Shark
(a) The SP
2Bench benchmark on 16 nodes. Query Q4 is a large
workload so it was run with 64 nodes.
0
5
10
15
Grappa Shark
Tim
e (s
ec)
0
1
2
3
4
Cum
ulat
ive s
peed
up Componentnetworkdata serializationiterationother overheadsapp compute
(b) Performance breakdown of speedup of GRAPPA over Shark on Q2.
Figure 9: Relational query performance.
0.0
2.5
5.0
7.5
10.0
10 10000k
Mea
n ite
ratio
n tim
e (n
orm
alize
d to
Gra
ppa)
Grappa(MapReduce) Spark
(a) Performance for 64 nodes
0.00
0.25
0.50
0.75
1.00
2 10 1001000k
Frac
tion
of it
erat
ion
time
PhaseMapCombineReduceReallocateBroadcast
(b) Breakdown of time spent in MapRe-
duce phases for GRAPPA-MapReduce
Figure 10: Data parallel experiments using k-means on a8.9GB Seaflow dataset.
into a local hash table, using GRAPPA’s partition-awareness.The global-view model of GRAPPA allows iterations to be im-plemented by the application programmer with a while loop.
4.3.1. Performance We pick k-means clustering as a testworkload; it exercises all-to-all communication and iteration.To provide a reference point, we compare the performance tothe SparkKMeans implementation for Spark. Both versionsuse the same algorithm: map the points, reduce the clustermeans, and broadcast local means. The Spark code caches theinput points in memory and does not persist partitions. Cur-rently, our implementation of MapReduce is not fault-tolerant.To ensure the comparison is fair, we made sure Spark didnot use fault-tolerance features: we used MEMORY_ONLYstorage level for RDDs, which does not replicate an RDD orpersist it to disk and verified during the runs that no partitionswere recomputed due to failures. We run k-means on a datasetfrom Seaflow [65], where each instance is a flow cytometrysample of seawater containing characteristics of phytoplank-ton cells. The dataset is 8.9GB and contains 123M instances.The clustering task is to identify species of phytoplankton sothe populations may be counted.
●
●
●
●
●
●
●
●
●
●
●
●
●
●
●
●●●
●
●
●
●
●
●
●
●
●
●
●
●
●
●
●
●
●
●●
●
●
●
●
●
●
Friendster Twitter
0
10
20
0
5
10
15
16 32 64 128 16 32 64 128Nodes
Tim
e (s
)
Platform
Grappa
GraphLab
Version
●
●
●
●
GraphLab(pds)GraphLab(random)
Grappa
Grappa (native)
Figure 11: Scaling BFS out to 128 nodes. In addition toGRAPPA’s GraphLab engine, we also show a custom algo-rithm for BFS implemented natively which employs Beamer’sbottom-up optimization to achieve even better performance.
The results are shown in Figure 10a for K = 10 andK = 10000. We find GRAPPA-MapReduce to be nearly an or-der of magnitude faster than the comparable Spark implemen-tation. Absolute runtime for GRAPPA-MapReduce is 0.13sper iteration for K = 10 and 17.3s per iteration for K = 10000,compared to 1s and 170s respectively for Spark.
We examined profiles to understand this difference. Wesee similar results as with Shark: the bulk of the differencecomes from the networking layer and from data serialization.As K grows, this problem should be compute-bound: mostexecution time is spent assigning points to clusters in the mapstep. Figure 10b shows the time breakdown for GRAPPA-MapReduce. At large K, GRAPPA-MapReduce is clearlycompute-bound. However, the equivalent profile for Sparkshows only 50% of the execution time in map; the rest ofthe time is in the reduce step in network code. GRAPPA’sefficient small message support and support for overlappingcommunication and computation help it perform well here.
10
Global Heap
Local heap
"a"→7
"g"→2
Cell[2] Cell[5]Cell[3] Cell[4]Cell[1]Cell[0]
Node 0 Node 1 Node 2 ...
..."h""g""d""c""x""c""o" "b""q" "p""i""a"
"b"→1
"o"→1
"i"→5
"c"→3
"e"→1 "f"→2
"l"→1
// distributed input arrayGlobalAddress<char> chars = load_input();
// distributed hash table:using Cell = std::map<char,int>;GlobalAddress<Cell> cells = global_alloc<Cell>(ncells);
forall(chars, nchars, [=](char& c) { // hash the char to determine destination size_t idx = hash(c) % ncells; delegate(&cells[idx], [=](Cell& cell) { // runs atomically if (cell.count(c) == 0) cell[c] = 1; else cell[c] += 1; });});
hash("i")
Figure 1: “Character count” with a simple hash table implemented using GRAPPA’s distributed shared memory.
Memory
Cores
Infiniband network, user level access
...
Memory
Cores
Memory
Cores
Memory
Cores
Message aggregation layer
Distributed Shared Memory
Lightweight Multihreading w/Global Task Pool
CommunicationLayer
MapReduce GraphLabRelational
Query Engine
Irregular apps, native code, etc...
Figure 2: GRAPPA design overview
Tasking system The tasking system supports lightweightmultithreading and global distributed work-stealing — taskscan be stolen from any node in the system, which providesautomated load balancing. Concurrency is expressed throughcooperatively-scheduled user-level threads. Threads that per-form long-latency operations (i.e., remote memory access)automatically suspend while the operation is executing andwake up when the operation completes.
Communication layer The main goal of our communicationlayer is to aggregate small messages into large ones. Thisprocess is invisible to the application programmer. Its in-terface is based on active messages [69]. Since aggregationand deaggregation of messages needs to be very efficient, weperform the process in parallel and carefully use lock-freesynchronization operations. For portability, we use MPI [49]as the underlying messaging library as well as for processsetup and tear down.
3.1. Distributed Shared Memory
Below we describe how GRAPPA implements a shared globaladdress space and the consistency model it offers.3.1.1. Addressing Modes
Local memory addressing Applications written forGRAPPA may address memory in two ways: locally and glob-ally. Local memory is local to a single core within a nodein the system. Accesses occur through conventional point-ers. Applications use local accesses for a number of things in
GRAPPA: the stack associated with a task, accesses to globalmemory from the memory’s home core, and accesses to debug-ging infrastructure local to each system node. Local pointerscannot access memory on other cores, and are valid only ontheir home core.
Global memory addressing GRAPPA allows any local dataon a core’s stacks or heap to be exported to the global addressspace to be made accessible to other cores across the sys-tem. This uses a traditional PGAS (partitioned global addressspace [29]) addressing model, where each address is a tupleof a rank in the job (or global process ID) and an address inthat process.
GRAPPA also supports symmetric allocations, which allo-cates space for a copy (or proxy) of an object on every corein the system. The behavior is identical to performing a lo-cal allocation on all cores, but the local addresses of all theallocations are guaranteed to be identical. Symmetric objectsare often treated as a proxy to a global object, holding localcopies of constant data, or allowing operations to be transpar-ently buffered. A separate publication [40] explored how thiswas used to implement GRAPPA’s synchronized global datastructures, including vector and hash map.
Putting it all together Figure 3 shows an example of howglobal, local and symmetric heaps can all be used togetherfor a simple graph data structure. In this example, verticesare allocated from the global heap, automatically distributingthem across nodes. Symmetric pointers are used to accesslocal objects which hold information about the graph, suchas the base pointer to the vertices, from any core withoutcommunication. Finally, each vertex holds a vector of edgesallocated from their core’s local heap, which other cores canaccess by going through the vertex.3.1.2. Delegate Operations Access to GRAPPA’s distributedshared memory is provided through delegate operations, whichare short operations performed at the memory location’s homenode. When the data access pattern has low locality, it ismore efficient to modify the data on its home core rather thanbringing a copy to the requesting core and returning a modifiedversion. Delegate operations [48, 52] provide this capability.
3
Grappa’s familiar single-system multithreaded C++ programming model enables easier development of analysis tools for terabyte-scale data. We provide sequential consistency for race-free programs using RPC-like atomic delegate operations, along with standard multithreaded synchronization primitives.!
Here is an example of using Grappa’s C++11 library to build a distributed parallel word-count-like application with a simple hash table:
// distributed input arrayGlobalAddress<char> chars;// distributed hash table:using Cell = map<char,int>;GlobalAddress<Cell> cells;forall(chars, nchars, [=](char c) { // hash the char to determine destination size_t idx = hash(c) % ncells; delegate(cells+idx, [=](Cell& cell) { // runs atomically if (cell.count(c) == 0) cell[c] = 1; else cell[c] += 1; });});
Memory
Cores
Infiniband network, user level access
...
Memory
Cores
Memory
Cores
Memory
Cores
Message aggregation layer
MapReduce GraphLabRelational
Query Engine
Irregular apps, native code, etc...
...
Distributed Shared Memory
Lightweight Multithreading& Global Task Pool
Communication Layer
Direct access to network reduces software overhead.
Messages aggregated to mitigate low network injection rates.
Thousands of threads per core to expose parallelism and mask network latencies.
Programmer sees global memory abstraction.
Core 0
Messages listsaggregated
locally per core
Sending coreserializesinto buffer
Buffer movedover network
via MPI/RDMA
Receiving coredistributes messages
to dest. cores
Messagesdeserialized; handlers run
on home cores
Core 1
Core 0
Core 1
Node 0 Node n
Key feature: Message Aggregation
●
●
●
●
●
●
●
●0e+00
1e+09
2e+09
3e+09
16 32 64 96 128Nodes
Atom
ic in
crem
ents
per
sec
ond
● ●Grappadelegate
RDMAatomicincrement
GUPS(aggregated)
GUPS(unaggregated)
0
1
2
16 B 1 kB 64 kBMessage size
Band
wid
th (G
B/s)
MPI RDMA (verbs)
Commodity networks have a limited message injection rate, so building up larger packets from unrelated tasks is essential for small-message throughput (fine-grained random access to global memory).
GUPS
0
200
400
600
800
GraphLab(TCP)
Spark(TCP)
Grappa(TCP)
Grappa(RDMA)
Atom
ic in
crem
ent r
ate
(milli
ons
/ sec
) GrappaPrefetching
DisabledEnabled
0
5
10
15
Grappa Shark
Tim
e (s
ec)
0
1
2
3
4
Cum
ulat
ive s
peed
up Componentnetworkdata serializationiterationother overheadsapp compute
Performance breakdown (or “why is Grappa faster?”)
Kernel-bypass communication with cache-aware data placement.
High-concurrency program phases enable aggregation and thus high message rates.
PageRank on Twitter
0
200
400
600
0 20 40 60Time (s)
Mes
sage
rate
(milli
ons
/ s)
GraphLab(pds)Grappa
More efficient network layer, lower serialization cost.
0
5
10
15
Grappa Shark
Tim
e (s
ec)
0
1
2
3
4
Cum
ulat
ive s
peed
up Componentnetworkdata serializationiterationother overheadsapp compute
Programming example
Visit http://grappa.io for more info on memory allocation, data distribution,
and hotspot avoidance via combining.
SP2Bench Query 2
Grappa’s current target is small, ~128 node clusters and does not yet implement fault tolerance. Work is ongoing.