processing large-scale graphs with google(tm) pregel by michael hackstein at big data spain 2014

Post on 12-Jul-2015

153 Views

Category:

Technology

2 Downloads

Preview:

Click to see full reader

TRANSCRIPT

PROCESSING LARGE-SCALE GRAPHS WITH GOOGLE(TM) PREGEL

MICHAEL HACKSTEINFRONT END AND GRAPH SPECIALIST ARANGODB

Processing large-scale graphs

with GoogleTMPregel

Michael Hackstein

@mchacki

November 17th

www.arangodb.com

Michael Hackstein

ArangoDB Core Team

Web Frontend

Graph visualisation

Graph features

Host of cologne.js

Master’s Degree

(spec. Databases and

Information Systems)

1

Graph Algorithms

Pattern matching

Search through the entire graph

Identify similar components

⇒ Touch all vertices and their neighbourhoods

Traversals

Define a specific start point

Iteratively explore the graph

⇒ History of steps is known

Global measurements

Compute one value for the graph, based on all it’s vertices

or edges

Compute one value for each vertex or edge

⇒ Often require a global view on the graph

2

Graph Algorithms

Pattern matching

Search through the entire graph

Identify similar components

⇒ Touch all vertices and their neighbourhoods

Traversals

Define a specific start point

Iteratively explore the graph

⇒ History of steps is known

Global measurements

Compute one value for the graph, based on all it’s vertices

or edges

Compute one value for each vertex or edge

⇒ Often require a global view on the graph

2

Graph Algorithms

Pattern matching

Search through the entire graph

Identify similar components

⇒ Touch all vertices and their neighbourhoods

Traversals

Define a specific start point

Iteratively explore the graph

⇒ History of steps is known

Global measurements

Compute one value for the graph, based on all it’s vertices

or edges

Compute one value for each vertex or edge

⇒ Often require a global view on the graph

2

Pregel

A framework to query distributed, directed graphs.

Known as “Map-Reduce” for graphs

Uses same phases

Has several iterations

Aims at:

Operate all servers at full capacity

Reduce network traffic

Good at calculations touching all vertices

Bad at calculations touching a very small number of vertices

3

Example – Connected Components

active inactive

3 forward message 2 backward message

1

1

2

2

3

3 4

45

5

6

6

7

7

4

Example – Connected Components

active inactive

3 forward message 2 backward message

1

1

2

2

3

3 4

45

5

6

6

7

7

2

34

4

5

6

7

4

Example – Connected Components

active inactive

3 forward message 2 backward message

1

1

2

2

3

3 4

45

5

6

6

7

7

2

34

4

5

6

7

4

Example – Connected Components

active inactive

3 forward message 2 backward message

1

1

2

2

3

3 4

45

5

6

5

7

6

1

22

3

5

5

6

4

Example – Connected Components

active inactive

3 forward message 2 backward message

1

1

2

2

3

3 4

45

5

6

5

7

6

1

22

3

5

5

6

4

Example – Connected Components

active inactive

3 forward message 2 backward message

1

1

2

1

3

2 4

25

5

6

5

7

5

11

2

2

4

Example – Connected Components

active inactive

3 forward message 2 backward message

1

1

2

1

3

2 4

25

5

6

5

7

5

11

2

2

4

Example – Connected Components

active inactive

3 forward message 2 backward message

1

1

2

1

3

1 4

15

5

6

5

7

5

1

1

4

Example – Connected Components

active inactive

3 forward message 2 backward message

1

1

2

1

3

1 4

15

5

6

5

7

5

1

1

4

Example – Connected Components

active inactive

3 forward message 2 backward message

1

1

2

1

3

1 4

15

5

6

5

7

5

4

Pregel – Sequence

5

Pregel – Sequence

5

Pregel – Sequence

5

Pregel – Sequence

5

Pregel – Sequence

5

Worker =̂ Map

“Map” a user-defined algorithm over all vertices

Output: set of messages to other vertices

Available parameters:

The current vertex and his outbound edges

All incoming messages

Global values

Allow modifications on the vertex:

Attach a result to this vertex and his outgoing edges

Delete the vertex and his outgoing edges

Deactivate the vertex

6

Combine =̂ Reduce

“Reduce” all generated messages

Output: An aggregated message for each vertex.

Executed on sender as well as receiver.

Available parameters:

One new message for a vertex

The stored aggregate for this vertex

Typical combiners are SUM, MIN or MAX

Reduces network traffic

7

Activity =̂ Termination

Execute several rounds of Map/Reduce

Count active vertices and messages

Start next round if one of the following is true:

At least one vertex is active

At least one message is sent

Terminate if neither a vertex is active nor messages were sent

Store all non-deleted vertices and edges as resulting graph

8

Pregel at ArangoDB

Started as a side project in free hack time

Experimental on operational database

Implemented as an alternative to traversals

Make use of the flexibility of JavaScript:

No strict type system

No pre-compilation, on-the-fly queries

Native JSON documents

Really fast development

9

Pagerank for Giraph

10

1 public class SimplePageRankComputation extends BasicComputation <LongWritable , DoubleWritable , FloatWritable , DoubleWritable >{

2 public static final int MAX_SUPERSTEPS = 30;34 @Override5 public void compute(Vertex <LongWritable , DoubleWritable ,

FloatWritable > vertex , Iterable <DoubleWritable > messages)throws IOException {

6 if (getSuperstep () >= 1) {7 double sum = 0;8 for (DoubleWritable message : messages) {9 sum += message.get();10 }11 DoubleWritable vertexValue = new DoubleWritable ((0.15f /

getTotalNumVertices ()) + 0.85f * sum);12 vertex.setValue(vertexValue);13 }14 if (getSuperstep () < MAX_SUPERSTEPS) {15 long edges = vertex.getNumEdges ();16 sendMessageToAllEdges(vertex , new DoubleWritable(vertex.

getValue ().get() / edges));17 } else {18 vertex.voteToHalt ();19 }20 }2122 public static class SimplePageRankWorkerContext extends

WorkerContext {23 @Override24 public void preApplication () throws InstantiationException ,

IllegalAccessException { }25 @Override26 public void postApplication () { }27 @Override28 public void preSuperstep () { }29 @Override30 public void postSuperstep () { }31 }3233 public static class SimplePageRankMasterCompute extends

DefaultMasterCompute {34 @Override35 public void initialize () throws InstantiationException ,

IllegalAccessException {36 }37 }38 public static class SimplePageRankVertexReader extends

GeneratedVertexReader <LongWritable , DoubleWritable ,FloatWritable > {

39 @Override40 public boolean nextVertex () {41 return totalRecords > recordsRead;42 }

44 @Override45 public Vertex <LongWritable , DoubleWritable , FloatWritable >

getCurrentVertex () throws IOException {46 Vertex <LongWritable , DoubleWritable , FloatWritable > vertex

= getConf ().createVertex ();47 LongWritable vertexId = new LongWritable(48 (inputSplit.getSplitIndex () * totalRecords) +

recordsRead);49 DoubleWritable vertexValue = new DoubleWritable(vertexId.

get() * 10d);50 long targetVertexId = (vertexId.get() + 1) % (inputSplit.

getNumSplits () * totalRecords);51 float edgeValue = vertexId.get() * 100f;52 List <Edge <LongWritable , FloatWritable >> edges = Lists.

newLinkedList ();53 edges.add(EdgeFactory.create(new LongWritable(

targetVertexId), new FloatWritable(edgeValue)));54 vertex.initialize(vertexId , vertexValue , edges);55 ++ recordsRead;56 return vertex;57 }58 }5960 public static class SimplePageRankVertexInputFormat extends

GeneratedVertexInputFormat <LongWritable , DoubleWritable ,FloatWritable > {

61 @Override62 public VertexReader <LongWritable , DoubleWritable ,

FloatWritable > createVertexReader(InputSplit split ,TaskAttemptContext context)

63 throws IOException {64 return new SimplePageRankVertexReader ();65 }66 }6768 public static class SimplePageRankVertexOutputFormat extends

TextVertexOutputFormat <LongWritable , DoubleWritable ,FloatWritable > {

69 @Override70 public TextVertexWriter createVertexWriter(

TaskAttemptContext context) throws IOException ,InterruptedException {

71 return new SimplePageRankVertexWriter ();72 }7374 public class SimplePageRankVertexWriter extends

TextVertexWriter {75 @Override76 public void writeVertex( Vertex <LongWritable ,

DoubleWritable , FloatWritable > vertex) throwsIOException , InterruptedException {

77 getRecordWriter ().write( new Text(vertex.getId().toString ()), new Text(vertex.getValue ().toString ()));

78 }79 }80 }81 }

Pagerank for TinkerPop3

11

1 public class PageRankVertexProgram implements VertexProgram <Double > {

2 private MessageType.Local messageType = MessageType.Local.of(() -> GraphTraversal.<Vertex >of().outE());

3 public static final String PAGE_RANK = Graph.Key.hide("gremlin.pageRank");

4 public static final String EDGE_COUNT = Graph.Key.hide("gremlin.edgeCount");

5 private static final String VERTEX_COUNT = "gremlin.pageRankVertexProgram.vertexCount";

6 private static final String ALPHA = "gremlin.pageRankVertexProgram.alpha";

7 private static final String TOTAL_ITERATIONS = "gremlin.pageRankVertexProgram.totalIterations";

8 private static final String INCIDENT_TRAVERSAL = "gremlin.pageRankVertexProgram.incidentTraversal";

9 private double vertexCountAsDouble = 1;10 private double alpha = 0.85d;11 private int totalIterations = 30;12 private static final Set <String > COMPUTE_KEYS = new HashSet <>(

Arrays.asList(PAGE_RANK , EDGE_COUNT));1314 private PageRankVertexProgram () {}1516 @Override17 public void loadState(final Configuration configuration) {18 this.vertexCountAsDouble = configuration.getDouble(

VERTEX_COUNT , 1.0d);19 this.alpha = configuration.getDouble(ALPHA , 0.85d);20 this.totalIterations = configuration.getInt(

TOTAL_ITERATIONS , 30);21 try {22 if (configuration.containsKey(INCIDENT_TRAVERSAL)) {23 final SSupplier <Traversal > traversalSupplier =

VertexProgramHelper.deserialize(configuration ,INCIDENT_TRAVERSAL);

24 VertexProgramHelper.verifyReversibility(traversalSupplier.get());

25 this.messageType = MessageType.Local.of(( SSupplier)traversalSupplier);

26 }27 } catch (final Exception e) {28 throw new IllegalStateException(e.getMessage (), e);29 }30 }

32 @Override33 public void storeState(final Configuration configuration) {34 configuration.setProperty(GraphComputer.VERTEX_PROGRAM ,

PageRankVertexProgram.class.getName ());35 configuration.setProperty(VERTEX_COUNT , this.

vertexCountAsDouble);36 configuration.setProperty(ALPHA , this.alpha);37 configuration.setProperty(TOTAL_ITERATIONS , this.

totalIterations);38 try {39 VertexProgramHelper.serialize(this.messageType.

getIncidentTraversal (), configuration ,INCIDENT_TRAVERSAL);

40 } catch (final Exception e) {41 throw new IllegalStateException(e.getMessage (), e);42 }43 }4445 @Override46 public Set <String > getElementComputeKeys () {47 return COMPUTE_KEYS;48 }4950 @Override51 public void setup(final Memory memory) {5253 }5455 @Override56 public void execute(final Vertex vertex , Messenger <Double >

messenger , final Memory memory) {57 if (memory.isInitialIteration ()) {58 double initialPageRank = 1.0d / this.vertexCountAsDouble

;59 double edgeCount = Double.valueOf ((Long) this.

messageType.edges(vertex).count().next());60 vertex.singleProperty(PAGE_RANK , initialPageRank);61 vertex.singleProperty(EDGE_COUNT , edgeCount);62 messenger.sendMessage(this.messageType , initialPageRank

/ edgeCount);63 } else {64 double newPageRank = StreamFactory.stream(messenger.

receiveMessages(this.messageType)).reduce (0.0d, (a,b) -> a + b);

65 newPageRank = (this.alpha * newPageRank) + ((1.0d - this.alpha) / this.vertexCountAsDouble);

66 vertex.singleProperty(PAGE_RANK , newPageRank);67 messenger.sendMessage(this.messageType , newPageRank /

vertex.<Double >property(EDGE_COUNT).orElse (0.0d));68 }69 }7071 @Override72 public boolean terminate(final Memory memory) {73 return memory.getIteration () >= this.totalIterations;74 }75 }

Pagerank for ArangoDB

1 var pageRank = function (vertex , message , global) {2 var total , rank , edgeCount , send , edge , alpha , sum;3 total = global.vertexCount;4 edgeCount = vertex._outEdges.length;5 alpha = global.alpha;6 sum = 0;7 if (global.step > 0) {8 while (message.hasNext ()) {9 sum += message.next().data;10 }11 rank = alpha * sum + (1-alpha) / total;12 } else {13 rank = 1 / total;14 }15 vertex._setResult(rank);16 if (global.step < global.MAX_STEPS) {17 send = rank / edgeCount;18 while (vertex._outEdges.hasNext ()) {19 edge = vertex._outEdges.next();20 message.sendTo(edge._getTarget (), send);21 }22 } else {23 vertex._deactivate ();24 }25 };2627 var combiner = function (message , oldMessage) {28 return message + oldMessage;29 };3031 var Runner = require ("org/arangodb/pregelRunner ").Runner;32 var runner = new Runner ();33 runner.setWorker(pageRank);34 runner.setCombiner(combiner);35 runner.start(" myGraph ");

12

Thank you

Further Questions?

Follow me on twitter/github: @mchacki

Write me a mail: mchacki@arangodb.comFollow @arangodb on Twitter

Join our google group:

https://groups.google.com/forum/#!forum/arangodb

Visit our blog https://www.arangodb.com/blog

Slides available at https://www.slideshare.net/arangodb

13

17TH ~ 18th NOV 2014MADRID (SPAIN)

top related