time-evolving graph processing on commodity clusters
Post on 12-Apr-2017
888 Views
Preview:
TRANSCRIPT
Tegra: Time-evolving Graph Processing on Commodity Clusters
Anand Iyer, Ion StoicaPresented by Ankur Dave
Graphs are everywhere…
Social Networks
Graphs are everywhere…
Gnutella network subgraph
Graphs are everywhere…
Graphs are everywhere…
Metabolic network of a single cell organism Tuberculosis
Plenty of interest in processing them
• Graph DBMS 25% of all enterprises by 20171
• Many open-source and research prototypes on distributed graph processing frameworks: Giraph, Pregel, GraphLab, Chaos, GraphX, …
1Forrester Research
Real-world Graphs are Dynamic
Many interesting business and research insights possible by processing them…
…but little work on incrementally updating computation results or on window-based operations
A Motivating Example…
A Motivating Example…
A Motivating Example…
Retrieve the network state when a disruption happened
Analyze the evolution of hotspot groups in the last day by hour
Track regions of heavy load in the last 10 minutes
What factors explain the performance in the network?
Tegra
How do we perform efficient computations on time-evolving, dynamically changing graphs?
Goals
• Create and manage time-evolving graphs• Retrieve the network state when a disruption happened
• Temporal analytics on windows• Analyze the evolution of hotspot groups in the last day by hour
• Sliding window computations• Track regions of heavy load in the last 10 minutes interval
•Mix graph and data parallel computing• What factors explain the performance in the network
Tegra
Graph Snapshot Index
Timelapse Abstraction
Lightweight Incremental Computations
→ →
Pause-Shift-ResumeComputations
→ →
Goals
• Create and manage time-evolving graphs using Graph Snapshot Index• Retrieve the network state when a disruption happened
• Temporal analytics on windows• Analyze the evolution of hotspot groups in the last day by hour
• Sliding window computations• Track regions of heavy load in the last 10 minutes interval
•Mix graph and data parallel computing• What factors explain the performance in the network
Representing Evolving Graphs
Time
A
B C
G1
A
B C
δg1
A D
δg2
C
DE
δg3
A
B C
D
G2
A
B C
DE
G3
Snapshots
Updates
E
D
D
A
D
C
E
C
D
A
remove add add add add
Graph Composition
Updating graphs depend on application semantics
A
B C
D
G2
A
B C
DE
G3’
C
DE
δg
A
+ =
Maintaining Multiple Snapshots
Store entire snapshots
A
B C
G1
A
B C
D
G2
A
B C
DE
G3
+ Efficient retrieval- Storage overhead
Store only deltas
A
B C
δg1
A D
δg2
C
DE
δg3
+ Efficient storage- Retrieval overhead
Maintaining Multiple Snapshots
Snapshot 2Snapshot 1t1 t2
Use a structure-sharing persistent data-structure.
Persistent Adaptive Radix Tree (PART) is one solution available for Spark.
Graph Snapshot Index
Snapshot 2Snapshot 1
Vert
ex
t1 t2
Snapshot 2Snapshot 1t1 t2
Edge
Partition
Snapshot ID Management
Graph Windowing
A
B C
G1
A
B C
D
G2
A
B C
DE
G3
A
C
DE
G4
G’1=G3 G’2=G4- G1
EquivalenttoGraphReduce() inGraphLINQ,GraphReduceByWindow()inCellIQ
G’2=G’1+δg4- δg1G’1=δg1+δg2 +δg3
A
B C
δg1
A D
δg2
C
DE
δg3
B
δg4
Goals
• Create and manage time-evolving graphs using Distributed Graph Snapshot Index• Retrieve the network state when a disruption happened
• Temporal analytics on windows using TimelapseAbstraction• Analyze the evolution of hotspot groups in the last day by hour
• Sliding window computations• Track regions of heavy load in the last 10 minutes interval
•Mix graph and data parallel computing• What factors explain the performance in the network
Graph Parallel Computation
Many approaches• Vertex centric, edge centric, subgraph centric, …
Timelapse
Operations on windows of snapshots result in redundant computation
A
B C
A
B C
D A
B C
DE
Time
G1 G2 G3
Timelapse
A
B C
A
B C
D A
B C
DE
Time
G1 G2 G3
D
BCBA
AAA
B C
DE
• Significant reduction in messages exchanged between graph nodes
• Avoids redundant computations
Instead, expose the temporal evolution of a node
Timelapse API
B C
A D
F E
A DD
B C
D
E
AA
F
B C
A D
F E
A DD
B C
D
E
AA
F
Transition
(0.977, 0.968)
(X , Y): X is 10 iteration PageRank Y is 23 iteration PageRank
After 11 iteration on graph 2,Both converge to 3-digit precision
(0.977, 0.968)(0.571, 0.556)
1.224
0.8490.502
(2.33, 2.39)
2.07
0.8490.502
(0.571, 0.556)(0.571, 0.556)
Figure 8: Example showing the benefit of PSR computation.
Streaming master program. For each iteration of Pregel,we check for the availability of a new graph. When itis available, we stop the iterations on the current graph,and resume resume it on the new graph after copyingover the computed results. The new computation willonly have vertices in the new active set continue messagepassing. The new active set is a function of the old activeset and the changes between the new graph and the oldgraph. For a large class of algorithms (e.g. incrementalPageRank [19]), the new active set includes vertices fromthe old active set, any new vertices and vertices withedge additions and deletions. Listing 2 shows a simplestreaming pagerank implementation using this API.
def StreamingPageRank(ts: TegraStream) = {def vprog(v: VertexId, msgSum: double) =
0.15+0.85*msgSumreturn ts.PSRCompute
(1, 100, EdgeDirection.Out, "10s")(vprog,triplet => triplet.src.pr/triplet.src.outDeg,(A, B) => A+B)
}
Listing 2: Page Rank Computation on Time-Evolving Graphs
Figure 8 shows a concrete example. For the first graph,it takes 23 iterations to converge to 3-digit precision. Ifwe reuse this page rank for the second updated graph onthe right, it will take another 11 iterations to converge to3-digit precision on the new graph. On the other hand,if we only finishes 10 iterations on the first graph, thentransition to the updated graph. It will take the same11 iterations to converge to 3-digit precision on the newgraph. Essentially, we saved 13 iterations.
4.2 Timelapse APITo implement the timelapse API (§3.2), we extend theGraph API in GraphX as shown in listing 3.
The API gives access to Tegra’s DGSI using the collec-tion view functions by extending them with an additional(optional) snapshot ID. By default, the snapshot ID isnot supplied, which signals Tegra to return the latestsnapshot. The timelapse computation is enabled by theextended mrTriplets7 function which now takes in an op-
7This function has been replaced by the new aggregateMessagesfunction, we simply use it for legacy reasons.
class Graph[V, E] {// Collection viewsdef vertices(sid: Int): Collection[(Id, V)]def edges(sid: Int): Collection[(Id, Id, E)]def triplets(sid: Int): Collection[Triplet]// Graph-parallel computationdef mrTriplets(f: (Triplet) => M,
sum: (M, M) => M,sids: Array[Int]): Collection[(Int, Id, M)]
// Convenience functionsdef mapV(f: (Id, V) => V,
sids: Array[Int]): Graph[V, E]def mapE(f: (Id, Id, E) => E
sids: Array[Int]): Graph[V, E]def leftJoinV(v: Collection[(Id, V)],
f: (Id, V, V) => V,sids: Array[Int]): Graph[V, E]
def leftJoinE(e: Collection[(Id, Id, E)],f: (Id, Id, E, E) => E,sids: Array[Int]): Graph[V, E]
def subgraph(vPred: (Id, V) => Boolean,ePred: (Triplet) => Boolean,sids: Array[Int]): Graph[V, E]
def reverse(sids: Array[Int]): Graph[V, E]}
Listing 3: GraphX [24] operators modified to support Tegra’stimelapse abstraction.
tional array of snapshot IDs to operate on. Internally, themrTriplets functionality is implemented using a seriesof map, join and group-by operations. By simultane-ously computing on multiple snapshots, Tegra is able toreduce the number of expensive join and group-bys. Inaddition, DGSI provides e�cient ways to perform joinwhich gives Tegra further advantages.
4.3 Dynamic ComputationsTegra uses the GAS decomposition to implement graph-parallel iterative computations. The GAS model is shownin listing 4. In this model, a vertex first accumulatesupdates from its neighbors, applies the update on itselfand then propagates the results to the neighbors. Wecan view this as a transformation of the original graph,followed by the evolution of the transformed graph. Inthis view, the transformed graph and its evolutions areorthogonal to the time-evolution of the graph, and eachsnapshot in the evolution represents one iteration of GAS.We store the transformed graph and its evolution in DGSIwith a special annotation where the prefix is taken fromthe snapshot ID which generated the transform.
Tegra can access this evolving graph in later snapshots.Intuitively, the transformed graph lets Tegra peek intoa possible future state of the graph without having toexchange messages between the vertices. This enablesthe framework to restrict the message exchanges betweenvertices to the di�erence between the previous run and thegraph changes, thus realizing a form of di�erential com-putation. It is to be noted that this is di�erent from storingand replaying messages as proposed by GraphInc [19].
8
Temporal Operations
Evolution analysis with no state keeping requirements
Bulk Transformations
A
B C
A
B C
D A
B C
DE
A
B C
A
B C
D A
B C
DE
Bulk Iterative Computations
A
B C
A
B C
D A
B C
DE
A
A A
A
A A
A A
A A
AA
How did the hotspots change over this window?
Goals
• Create and manage time-evolving graphs using Distributed Graph Snapshot Index• Retrieve the network state when a disruption happened
• Temporal analytics on windows using TimelapseAbstraction• Analyze the evolution of hotspot groups in the last day by hour
• Sliding window computations• Track regions of heavy load in the last 10 minutes interval
•Mix graph and data parallel computing• What factors explain the performance in the network
Incremental Computation
• If results from a previous snapshot is available, how can we reuse them?• Three approaches in the past:• Restart the algorithm
• Redundant computations• Memoization (GraphInc1)
• Too much state• Operator-wise state (Naiad2,3)
• Too much overhead
1Facilitating real- time graph mining, CloudDB ’122 Naiad: A timely dataflow system, SOSP ’133 Differential dataflow, CIDR ‘13
Incremental Computation
A
B C
D
G1
A
B C
DE
G2
A
B C
D A
A B
A A
A A
A
A
B C
DE
A
A B
AC
A
A A
AA
Time
→ →
• Can keep state as an efficient time-evolving graph• Not limited to vertex-centric computations
Incremental Computation
• Some iterative graph algorithms are robust to graph changes • Allow them to proceed without keeping any state
A
D E
B
C
0.5570.557
0.5570.557
2.37
A
D E
B
C B
0.5570.557
0.5570.557
2.37 0
A
D E
B
C
0.8470.507
0.8470.507
2.07 B 1.2→
Goals
• Create and manage time-evolving graphs using Distributed Graph Snapshot Index• Retrieve the network state when a disruption happened
• Temporal analytics on windows using TimelapseAbstraction• Analyze the evolution of hotspot groups in the last day by hour
• Sliding window computations• Track regions of heavy load in the last 10 minutes interval
•Mix graph and data parallel computing• What factors explain the performance in the network
Implementation & Evaluation
• Implemented as a major enhancement to GraphX• Evaluated on two open source real-world graphs• Twitter: 41,652,230 vertices, 1,468,365,182 edges• uk-2007: 105,896,555 vertices, 3,738,733,648 edges
Preliminary Evaluation
����������������������������
�� �� �� �� �� �� �� �� �� ���
�������������������
�������������
�����������������������������
�����������������
Tegra can pack more snapshots in memory
Linearreduction
Graphdifferencesignificant
Preliminary Evaluation
��
����
��
����
��
����
��
�� �� �� �� ��
�����������������
�������������
�������������������������
�����������������
Timelapse results in run time reduction
Preliminary Evaluation
Effectiveness of incremental computation
220
4534 35 37
0
50
100
150
200
250
1 2 3 4 5
Conv
erge
nce
Tim
e (s
)
Snapshot #
Summary
• Processing time-evolving graph efficiently can be useful.• Efficient storage of multiple snapshots and reducing
communication between graph nodes key to evolving graph analysis.
Ongoing Work
• Expand timelapse and its incremental computation model to other graph-parallel paradigms• Other interesting graph algorithms:
• Fraud detection/prediction/incremental pattern matching
• Add graph querying support• Graph queries and analytics in a single system
• Stay tuned for code release!
top related