large graph processing
DESCRIPTION
A lecture for cloud computing course about large graph processing, Pregel and MizanTRANSCRIPT
King Abdullah University of Science and Technology
CS348: Cloud Computing
Large-Scale Graph Processing
Zuhair Khayyat
10/March/2013
The Importance of Graphs
● A graph is a mathematical structure that represents pairwise
relations between entities or objects. Such as:
– Physical communication networks
– Web pages links
– Social interaction graphs
– Protein-to-protein interactions
● Graphs are used to abstract application-specific features into a
generic problem, which makes Graph Algorithms applicable to
a wide variety of applications*.
*http://11011110.livejournal.com/164613.html
Graph algorithm characteristics*
● Data-Drivin Computations: Computations in graph
algorithms depends on the structure of the graph. It is hard to
predict the algorithm behavior
● Unstructured Problems: Different graph distributions
requires distinct load balancing techniques.
● Poor Data Locality.
● High Data Access to Computation Ratio: Runtime can be
dominated by waiting memory fetches.
*Lumsdaine et. al, Challenges in Parallel Graph Processing
Challenges in Graph processing
● Graphs grows fast; a single computer either cannot fit a large
graph into memory or it fits the large graph with huge cost.
● Custom implementations for a single graph algorithm requires
time and effort and cannot be used on other algorithms
● Scientific parallel applications (i.e. parallel PDE solvers)
cannot fully adapt to the computational requirements of graph
algorithms*.
● Fault tolerance is required to support large scale processing.
*Lumsdaine et. al, Challenges in Parallel Graph Processing
Why Cloud in Graph Processing
● Easy to scale up and down; provision machines
depending on your graph size.
● Cheaper than buying a physical large cluster.
● Can be used in the cloud as “Software as a services” to
support online social networks.
Large Scale Graph Processing
● Systems that tries to solve the problem of processing large graphs in parallel:
– MapReduce – auto task scheduling, distributed disk based computations:
● Pegasus● X-Rime
– Pregel - Bulk Synchronous Parallel Graph Processing:● Giraph● GPS● Mizan
– GraphLab – Asynchronous Parallel Graph Processing.
Pregel* Graph Processing
● Consists of a series of synchronized iterations (supersteps); based on Bulk Synchronous Parallel computing model. Each superstep consists of:
– Concurrent computations
– Communication
– Synchronization barrier
● Vertex centric computation, the user's compute() function is applied individually on each vertex, which is able to:
– Send message to vertices in the next superstep
– Receive messages from the previous superstep
*Malewicz et. al., Pregel: A System for Large-Scale Graph Processing
Pregel messaging Example 1
A B
D C
Superstep 0
Pregel messaging Example 1
A B
D C
Superstep 0 Superstep 122
15
47
9
A B
D C
Pregel messaging Example 1
A B
D C
Superstep 0 Superstep 122
15
47
9
A B
D C
Superstep 2 22, 9
1547
A B
D C
-2
55
14
7
Pregel messaging Example 1
A B
D C
Superstep 0 Superstep 122
15
47
9
A B
D C
Superstep 2 22, 9
1547
A B
D C
-2
55
14
7
Superstep 3 -2, 7
5514
A B
D C
5
98
9
5
Vertex's State
● All vertices are active at superstep 1
● All active vertices runs user function compute() at any
superstep
● A vertex deactivates itself by voting to halt, but returns to
active if it received messages.
● Pregel terminates of all vertices are inactive
Pregel Example 2Data Distribution
(Hash-based partitioning)
Worker 1 Worker 2 Worker 3
Computation
Synchronization Barrier
Communication
Done?No
TerminateYes
Pregel Example 3 – Max
3 6 2 1
Pregel Example 3 – Max
3 6 2 1
6 6 2 6
Pregel Example 3 – Max
3 6 2 1
6 6 6 6
6 6 2 6
Pregel Example 3 – Max
3 6 2 1
6 6 6 6
6 6 6 6
6 6 2 6
Pregel Example 4 – Max code
Class MaxFindVertex:public Vertex<double, void, double> {
public:
virtual void Compute(MessageIterator* msgs) {
int currMax = GetValue();
SendMessageToAllNeighbors(currMax);
for ( ; !msgs>Done(); msgs>Next()) {
if (msgs>Value() > currMax)
currMax = msgs>Value();
}
if (currMax > GetValue())
*MutableValue() = currMax;
else VoteToHalt();
}
};
Vertex value class
Edge value class
Message class
Check messages and store max
Send current Max
Store new max
Pregel Message Optimizations
● Message Combiners:
– A special function that combines the incoming
messages for a vertex before running compute()
– Can run on the message sending or receiving worker
● Global Aggregators :
– A shared object accessible to all vertices. that is
synchronized at the end of each superstep, i.e., max
and min aggregators.
Pregel Guarantees
● Scalability: process vertices in parallel, overlap
computation and communication.
● Messages will be received without duplication in any
order.
● Fault tolerance through check points
Pregel's Limitations
● Pregel's superstep waits for all workers to finish at the
synchronization barrier. That is, it waits for the slowest
worker to finish.
● Smart partitioning can solve the load balancing problem
for static algorithms. However not all algorithms are
static, algorithms can have a variable execution behaviors
which leads to an unbalanced supersteps.
Mizan* Graph Processing
● Mizan is an open source graph processing system, similar
to Pregel, developed locally at KAUST.
● Mizan employs dynamic graph repartitioning without
affecting the correctness of graph processing to
rebalanced the execution of the supersteps for all types of
workloads.
*Khayyat et. al., Mizan: A System for Dynamic Load Balancing in Large-scale Graph Processing
Source of Imbalance in BSP
Source of Imbalance in BSP
Types of Graph Algorithms
● Stationary Graph Algorithms:
– Algorithms with fixed message distribution across superstep
– All vertices are either active or inactive at same time
– i.e. PageRank, Diameter Estimation and weakly connected
components.
● Non-stationary Graph Algorithms
– Algorithms with variable message distribution across supersteps
– Vertices can be active and inactive independent to others
– i.e. Distributed Minimal spanning tree
Mizan architecture
● Each Mizan worker contains three distinct main
components: BSP Processor, communicator and storage
manager.
● The distributed hash table (DHT) is used to maintain the
location of each vertex
● The migration planner interacts
with other components during
the BSP barrier
Mizan's Barriers
Dynamic migration: Statistics
● Mizan monitors the following for every vertex:
– Response time
– Remote outgoing messages
– Incoming messages
Dynamic migration: planning
● Mizan's migration planner runs after the BSP barrier and creates a
new barrier. The planning includes the following steps:
– Identifying unbalanced workers.
– Identifying migration objective:
● Response time
● Incoming messages
● Outgoing messages
– Pair over-utilized workers with underutilized
– Select vertices to migrate
Mizan's Migration Work-flow
Mizan PageRank Compute() Example
void compute(messageIterator<mDouble> * messages, userVertexObject<mLong, mDouble, mDouble, mLong> * data,messageManager<mLong, mDouble, mDouble, mLong> * comm) {
double currVal = data>getVertexValue().getValue(); double newVal = 0; double c = 0.85;
while (messages>hasNext()) { double tmp = messages>getNext().getValue(); newVal = newVal + tmp; }
newVal = newVal * c + (1.0 c) / ((double) vertexTotal); mDouble outVal(newVal / ((double) data>getOutEdgeCount()));
if (data>getCurrentSS() <= maxSuperStep) { for (int i = 0; i < data>getOutEdgeCount(); i++) { comm>sendMessage(data>getOutEdgeID(i), outVal); data>getOutEdgeID(i); } } else { data>voteToHalt(); } data>setVertexValue(mDouble(newVal));}
Processing Messages
Termination Condition
Sending toNeighbors
Mizan PageRank Combiner Example
void combineMessages(mLong dst, messageIterator<mDouble> * messages,messageManager<mLong, mDouble, mDouble, mLong> * mManager) {
double newVal = 0;
while (messages>hasNext()) { double tmp = messages>getNext().getValue(); newVal = newVal + tmp; }
mDouble messageOut(newVal); mManager>sendMessage(dst,messageOut);}
Mizan Max Aggregator Example
class maxAggregator: public IAggregator<mLong> {Public: mlong aggValue;
maxAggregator() { aggValue.setValue(0); }
void aggregate(mLong value) { if (value > aggValue) { aggValue = value; } }
mLong getValue() { return aggValue; }
void setValue(mLong value) { this>aggValue = value; } virtual ~maxAggregator() {}};
Class Assignment
● Your assignment is to configure, install and run Mizan on a single Linux machine throw following this tutorial:
https://thegraphsblog.wordpress.com/mizan-on-ubuntu/
● By the end of the tutorial, you should be able to execute the command on your machine:mpirun np 2 ./Mizan0.1b u ubuntu g webGoogle.txt w 2
● Deliverables: you store the output of of the above command and submit it by Wednesday's class.
● Any questions regarding the tutorial or to get an account for a Ubuntu machine, contact me on: [email protected]