Apache Flink• An open-source, distributed data analysis framework
• True streaming at its core
• Streaming & Batch API
2
Historic data
Kafka, RabbitMQ, ...
HDFS, JDBC, ...
Event logsETL, Graphs,Machine LearningRelational, …
Low latency,windowing, aggregations, ...
Integration (picture not complete)
POSIX Java/ScalaCollections
POSIX
Why Stream Processing?
• Most problems have streaming nature • Stream processing gives lower latency • Data volumes more easily tamed
4
Event stream
Batch and Streaming
Pipelined andblocking operators Streaming Dataflow Runtime
Batch Parameters
DataSet DataStream
RelationalOptimizer
WindowOptimization
Pipelined andwindowed operators
Schedule lazilySchedule eagerly
Recompute whole operators Periodic checkpoints
Streaming data movement
Stateful operations
DAG recoveryFully buffered streams DAG resource management
Streaming Parameters
Flink APIs
6
case class Word (word: String, frequency: Int)
val lines: DataStream[String] = env.readFromKafka(...)
lines.flatMap {line => line.split(" ").map(word => Word(word,1))} .keyBy("word”).timeWindow(Time.of(5,SECONDS)).sum("frequency") .print()
val lines: DataSet[String] = env.readTextFile(...)
lines.flatMap {line => line.split(" ").map(word => Word(word,1))} .groupBy("word").sum("frequency”)
.print()
DataSet API (batch):
DataStream API (streaming):
Working with Windows
7
Why windows? We are often interested in fresh data! 15 38 65 88 110 120
#sec40 80
SUM #2
0
SUM #1
20 60 100 120
15 38 65 881) Tumbling windows
myKeyStream.timeWindow( Time.of(60, TimeUnit.SECONDS));
#sec40 80
SUM #3
SUM #2
0
SUM #1
20 60 100
15 38
38 65
65 88myKeyStream.timeWindow( Time.of(60, TimeUnit.SECONDS), Time.of(20, TimeUnit.SECONDS));
2) Sliding windows
window buckets/panes
Working with Windows
7
Why windows? We are often interested in fresh data!
Highlight: Flink can form and trigger windows consistently under different notions of time and deal with late events!
15 38 65 88 110 120
#sec40 80
SUM #2
0
SUM #1
20 60 100 120
15 38 65 881) Tumbling windows
myKeyStream.timeWindow( Time.of(60, TimeUnit.SECONDS));
#sec40 80
SUM #3
SUM #2
0
SUM #1
20 60 100
15 38
38 65
65 88myKeyStream.timeWindow( Time.of(60, TimeUnit.SECONDS), Time.of(20, TimeUnit.SECONDS));
2) Sliding windows
window buckets/panes
Flink Stack
Gel
ly
Tabl
e
ML
SAM
OA
DataSet (Java/Scala) DataStream (Java/Scala)
Had
oop
M/R
Local Remote Yarn Embedded
Dat
aflo
w
Dat
aflo
w (W
iP)
Tabl
e
Cas
cadi
ngStreaming dataflow runtime
CEP
8
Gelly the Flink Graph API
Meet Gelly
• Java & Scala Graph APIs on top of Flink • graph transformations and utilities
• iterative graph processing • library of graph algorithms
• Can be seamlessly mixed with the DataSet Flink API to easily implement applications that use both record-based and graph-based analysis
10
Hello, Gelly!ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Edge<Long, NullValue>> edges = getEdgesDataSet(env);
Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, env);
DataSet<Vertex<Long, Long>> verticesWithMinIds = graph.run(
new ConnectedComponents(maxIterations));
val env = ExecutionEnvironment.getExecutionEnvironment
val edges: DataSet[Edge[Long, NullValue]] = getEdgesDataSet(env)
val graph = Graph.fromDataSet(edges, env)
val components = graph.run(new ConnectedComponents(maxIterations))
Java
Scala
11
Graph Methods
Graph PropertiesgetVertexIds getEdgeIds numberOfVertices numberOfEdges getDegrees ...
12
Transformationsmap, filter, join subgraph, union, difference reverse, undirected getTriplets
Mutationsadd vertex/edge remove vertex/edge
Neighborhood Methods
graph.reduceOnNeighbors(new MinValue, EdgeDirection.OUT)
13
Iterative Graph Processing• Gelly offers iterative graph processing abstractions
on top of Flink’s Delta iterations
• Based on the BSP, vertex-centric model
• scatter-gather
• gather-sum-apply
• vertex-centric (pregel)*
• partition-centric*
14
Scatter-Gather Iterations
• MessagingFunction: generate message for other vertices
• VertexUpdateFunction: update vertex value based on received messages
15
Scatter Gather
Gather-Sum-Apply Iterations
• Gather: compute one value per edge
• Sum: combine the partial values of Gather to a single value
• Apply: update the vertex value, based on the Sum and the current value
16
Gather ApplySum
Library of Algorithms• PageRank* • Single Source Shortest Paths* • Label Propagation • Weakly Connected Components* • Community Detection • Triangle Count & Enumeration • Graph Summarization
• val ranks = inputGraph.run(new PageRank(0.85, 20))
• *: both scatter-gather and GSA implementations
17
Gelly-Stream single-pass stream graph
processing with Flink
Real Graphs are dynamic
Graphs are created from events happening in real-time
19
20
20
20
20
20
20
20
20
20
Gelly on Streams
21
DataStreamDataSet
Distributed Dataflow
Deployment
DataStream
Gelly on Streams
21
DataStreamDataSet
Distributed Dataflow
Deployment
Gelly
• Static Graphs • Multi-Pass Algorithms • Full Computations
DataStream
Gelly on Streams
21
DataStreamDataSet
Distributed Dataflow
Deployment
Gelly Gelly-Stream
• Static Graphs • Multi-Pass Algorithms • Full Computations
DataStream
Gelly on Streams
21
DataStreamDataSet
Distributed Dataflow
Deployment
Gelly Gelly-Stream
• Static Graphs • Multi-Pass Algorithms • Full Computations
• Dynamic Graphs • Single-Pass Algorithms • Approximate Computations
DataStream
Batch vs. Stream Graph Processing
22
Batch Stream
Input Graph static dynamic
Analysis on a snapshot continuous
Response after job completion immediately
Graph Streaming Challenges
• Maintain the graph structure • How to apply state updates efficiently?
• Result updates • Re-run the analysis for each event? • Design an incremental algorithm? • Run separate instances on multiple snapshots?
• Computation on most recent events only
23
Single-Pass Graph Streaming
• Each event is an edge addition
• Maintain only a graph summary
• Recent events are grouped in graph windows
24
Graph Summaries
• spanners for distance estimation • sparsifiers for cut estimation • sketches for homomorphic properties
graph summary
algorithm algorithm~R1 R2
25
Examples
Batch Connected Components
• State: the graph and a component ID per vertex (initially equal to vertex ID)
• Iterative Computation: For each vertex:
• choose the min of neighbors’ component IDs and own component ID as new ID
• if component ID changed since last iteration, notify neighbors
27
1
43
2
5
6
7
8
i=0
Batch Connected Components
28
1
11
2
2
6
6
6
i=1
Batch Connected Components
29
1
11
1
5
6
6
6
i=2
Batch Connected Components
30
1
1
11
1
1
6
6
6
i=3
Batch Connected Components
31
Stream Connected Components
• State: a disjoint set data structure for the components
• Computation: For each edge
• if seen for the 1st time, create a component with ID the min of the vertex IDs
• if in different components, merge them and update the component ID to the min of the component IDs
• if only one of the endpoints belongs to a component, add the other one to the same component
32
31
52
54
76
86
ComponentID Vertices
1
43
2
5
6
7
8
33
31
52
54
76
86
42
ComponentID Vertices
1 1, 3
1
43
2
5
6
7
8
34
31
52
54
76
86
42
ComponentID Vertices
43
2 2, 5
1 1, 3
1
43
2
5
6
7
8
35
31
52
54
76
86
42
43
87
ComponentID Vertices
2 2, 4, 5
1 1, 3
1
43
2
5
6
7
8
36
31
52
54
76
86
42
43
87
41
ComponentID Vertices
2 2, 4, 5
1 1, 3
6 6, 7
1
43
2
5
6
7
8
37
52
54
76
86
42
43
87
41
ComponentID Vertices
2 2, 4, 5
1 1, 3
6 6, 7, 8
1
43
2
5
6
7
8
38
54
76
86
42
43
87
41 ComponentID Vertices
2 2, 4, 5
1 1, 3
6 6, 7, 8
1
43
2
5
6
7
8
39
76
86
42
43
87
41
ComponentID Vertices
2 2, 4, 5
1 1, 3
6 6, 7, 8
1
43
2
5
6
7
8
40
76
86
42
43
87
41
ComponentID Vertices
6 6, 7, 8
1 1, 2, 3, 4, 5
1
43
2
5
6
7
8
41
86
42
43
87
41
ComponentID Vertices
6 6, 7, 8
1 1, 2, 3, 4, 5
1
43
2
5
6
7
8
42
42
43
87
41
ComponentID Vertices
6 6, 7, 8
1 1, 2, 3, 4, 5
1
43
2
5
6
7
8
43
Distributed Stream Connected Components
44
API Requirements
• Continuous aggregations on edge streams
• Global graph aggregations
• Support for windowing
45
Introducing Gelly-Stream
46
Gelly-Stream enriches the DataStream API with two new additional ADTs:
• GraphStream: • A representation of a data stream of edges.
• Edges can have state (e.g. weights).
• Supports property streams, transformations and aggregations.
• GraphWindow: • A “time-slice” of a graph stream.
• It enables neighborhood aggregations
GraphStream Operations
47
.getEdges()
.getVertices()
.numberOfVertices()
.numberOfEdges()
.getDegrees()
.inDegrees()
.outDegrees()
GraphStream -> DataStream
.mapEdges();
.distinct();
.filterVertices();
.filterEdges();
.reverse();
.undirected();
.union();
GraphStream -> GraphStream
Property Streams Transformations
Graph Stream Aggregations
48
result aggregate
property streamgraph stream
(window) fold
combine
fold
reduce
local summaries
global summary
edges
agg
global aggregates can be persistent or transient
graphStream.aggregate( new MyGraphAggregation(window, fold, combine, transform))
Graph Stream Aggregations
49
result aggregate
property streamgraph stream
(window) fold
combine transform
fold
reduce map
local summaries
global summary
edges
agg
graphStream.aggregate( new MyGraphAggregation(window, fold, combine, transform))
Connected Components
50
graph stream #components
Connected Components
50
graph stream
1
43
2
5
6
7
8
#components
Connected Components
50
graph stream
31
52
1
43
2
5
6
7
8
#components
Connected Components
51
graph stream
{1,3}
{2,5}
1
43
2
5
6
7
8
#components
Connected Components
52
graph stream
{1,3}
{2,5}
54
1
43
2
5
6
7
8
#components
Connected Components
53
graph stream
{1,3}
{2,5}
{4,5}76
86
1
43
2
5
6
7
8
#components
Connected Components
54
graph stream
{1,3}
{2,5}
{4,5}
{6,7}
{6,8}
1
43
2
5
6
7
8
#components
Connected Components
54
graph stream
{1,3}
{2,5}
{4,5}
{6,7}
{6,8}
1
43
2
5
6
7
8
#components
windowtriggers
Connected Components
55
graph stream
{2,5}{6,8}
{1,3}{4,5}
{6,7}
1
43
2
5
6
7
8
#components
Connected Components
55
graph stream
{2,5}{6,8}
{1,3}{4,5}
{6,7}
3
1
43
2
5
6
7
8
#components
Connected Components
56
graph stream
{1,3}{2,4,5}
{6,7,8}
1
43
2
5
6
7
8
#components
Connected Components
56
graph stream
{1,3}{2,4,5}
{6,7,8}
3
1
43
2
5
6
7
8
#components
Connected Components
57
graph stream
{1,3}{2,4,5}
{6,7,8}42
43
1
43
2
5
6
7
8
#components
Connected Components
58
graph stream
{1,3}{2,4,5}
{6,7,8}{2,4}
{3,4}
41
87
1
43
2
5
6
7
8
#components
Connected Components
59
graph stream
{1,3}{2,4,5}
{6,7,8}{1,2,4}
{3,4}{7,8}
1
43
2
5
6
7
8
#components
Connected Components
59
graph stream
{1,3}{2,4,5}
{6,7,8}{1,2,4}
{3,4}{7,8}
1
43
2
5
6
7
8
#components
windowtriggers
Connected Components
60
graph stream
{1,2,4,5}{6,7,8}
{3,4}{7,8}
1
43
2
5
6
7
8
#components
Connected Components
60
graph stream
{1,2,4,5}{6,7,8}
2
{3,4}{7,8}
1
43
2
5
6
7
8
#components
Connected Components
61
graph stream
{1,2,3,4,5}{6,7,8}
1
43
2
5
6
7
8
#components
Connected Components
61
graph stream
{1,2,3,4,5}{6,7,8}
2
1
43
2
5
6
7
8
#components
Slicing Graph Streams
62
graphStream.slice(Time.of(1, MINUTE));
11:40 11:41 11:42 11:43
Aggregating Slices
63
graphStream.slice(Time.of(1, MINUTE), direction)
• Slicing collocates edges by vertex information
• Neighborhood aggregations on sliced graphs
source
target
Aggregating Slices
63
graphStream.slice(Time.of(1, MINUTE), direction)
• Slicing collocates edges by vertex information
• Neighborhood aggregations on sliced graphs
source
target
Aggregating Slices
63
graphStream.slice(Time.of(1, MINUTE), direction)
.reduceOnEdges();
.foldNeighbors();
.applyOnNeighbors();
• Slicing collocates edges by vertex information
• Neighborhood aggregations on sliced graphs
source
target
Aggregations
Finding Matches Nearby
64
Finding Matches Nearby
64
graphStream.filterVertices(GraphGeeks()) .slice(Time.of(15, MINUTE), EdgeDirection.IN) .applyOnNeighbors(FindPairs())
GraphStream :: graph geek check-ins
wendy checked_in soap_bar steve checked_in soap_bar
tom checked_in joe’s_grill sandra checked_in soap_bar
rafa checked_in joe’s_grill
Finding Matches Nearby
64
graphStream.filterVertices(GraphGeeks()) .slice(Time.of(15, MINUTE), EdgeDirection.IN) .applyOnNeighbors(FindPairs())
slice
GraphStream :: graph geek check-ins
wendy checked_in soap_bar steve checked_in soap_bar
tom checked_in joe’s_grill sandra checked_in soap_bar
rafa checked_in joe’s_grill
wendy
steve
sandra
soapbar
tom
rafa
joe’sgrill
GraphWindow :: user-place
Finding Matches Nearby
64
graphStream.filterVertices(GraphGeeks()) .slice(Time.of(15, MINUTE), EdgeDirection.IN) .applyOnNeighbors(FindPairs())
slice
GraphStream :: graph geek check-ins
wendy checked_in soap_bar steve checked_in soap_bar
tom checked_in joe’s_grill sandra checked_in soap_bar
rafa checked_in joe’s_grill
wendy
steve
sandra
soapbar
tom
rafa
joe’sgrill
FindPairs
{wendy, steve} {steve, sandra} {wendy, sandra} {tom, rafa}
GraphWindow :: user-place
What’s next?
• Integration with Neo4j (Input / Output)
• OpenCypher on Flink/Gelly
• Pregel and Partition-Centric Iterations
• Integration with Graphalytics
Feeling Gelly?• Gelly Guide
https://ci.apache.org/projects/flink/flink-docs-master/libs/gelly_guide.html • Gelly-Stream Repository
https://github.com/vasia/gelly-streaming • Gelly-Stream talk @FOSDEM16
https://fosdem.org/2016/schedule/event/graph_processing_apache_flink/ • An interesting read
http://people.cs.umass.edu/~mcgregor/papers/13-graphsurvey.pdf • A cool thesis
http://urn.kb.se/resolve?urn=urn:nbn:se:kth:diva-170425