Stephan Ewen
@stephanewen
Streaming Analyticswith Apache Flink
Apache Flink Stack
2
DataStream APIStream Processing
DataSet APIBatch Processing
RuntimeDistributed Streaming Data Flow
Libraries
Streaming and batch as first class citizens.
Today
3
Streaming and batch as first class citizens.
DataStream APIStream Processing
DataSet APIBatch Processing
RuntimeDistributed Streaming Data Flow
Libraries
4
Streaming is the next programming paradigmfor data applications, and you need to start
thinking in terms of streams.
5
Streaming technology is enabling the obvious: continuous processing on data that is
continuously produced
A brief History of Flink
6
January ‘10 December ‘14
v0.5 v0.6 v0.7
March ‘16
Flink Project
Incubation
Top Level
Project
v0.8 v0.10
Release
1.0
Project
Stratosphere
(Flink precursor)
v0.9
April ‘14
A brief History of Flink
7
January ‘10 December ‘14
v0.5 v0.6 v0.7
March ‘16
Flink Project
Incubation
Top Level
Project
v0.8 v0.10
Release
1.0
Project
Stratosphere
(Flink precursor)
v0.9
April ‘14
The academia gap:Reading/writing papers,
teaching, worrying about thesis
Realizing this might be interesting to people
beyond academia(even more so, actually)
A Stream Processing Pipeline
8
collect log analyze serve
Programs and Dataflows
9
Source
Transformation
Transformation
Sink
val lines: DataStream[String] = env.addSource(new FlinkKafkaConsumer09(…))
val events: DataStream[Event] = lines.map((line) => parse(line))
val stats: DataStream[Statistic] = stream.keyBy("sensor").timeWindow(Time.seconds(5)).sum(new MyAggregationFunction())
stats.addSink(new RollingSink(path))
Source[1]
map()[1]
keyBy()/window()/
apply()[1]
Sink[1]
Source[2]
map()[2]
keyBy()/window()/
apply()[2]
StreamingDataflow
Why does Flink stream flink?
10
Low latency
High Throughput
Well-behavedflow control
(back pressure)
Make more sense of data
Works on real-timeand historic data
TrueStreaming
Event Time
APIsLibraries
StatefulStreaming
Globally consistentsavepoints
Exactly-once semanticsfor fault tolerance
Windows &user-defined state
Flexible windows(time, count, session, roll-your own)
Complex Event Processing
Counting
11
Continuous counting
A seemingly simple application, but generally an unsolved problem
E.g., count visitors, impressions, interactions, clicks, etc
Aggregations and OLAP cube operations are generalizations of counting
12
Counting in batch architecture
Continuous ingestion
Periodic (e.g., hourly) files
Periodic batch jobs
13
Problems with batch architecture
High latency
Too many moving parts
Implicit treatment of time
Out of order event handling
Implicit batch boundaries
14
Counting in λ architecture
"Batch layer": what we had before
"Stream layer": approximate early results
15
Problems with batch and λ
Way too many moving parts (and code dup)
Implicit treatment of time
Out of order event handling
Implicit batch boundaries
16
Counting in streaming architecture
Message queue ensures stream durability and replay
Stream processor ensures consistent counting
17
Counting in Flink DataStream API
Number of visitors in last hour by country
18
DataStream<LogEvent> stream = env.addSource(new FlinkKafkaConsumer(...)) // create stream from Kafka .keyBy("country") // group by country .timeWindow(Time.minutes(60)) // window of size 1 hour .apply(new CountPerWindowFunction()); // do operations per window
Counting hierarchy of needs
19
Continuous counting
... with low latency,
... efficiently on high volume streams,
... fault tolerant (exactly once),
... accurate and
repeatable,
... queryable
Based on Maslow's
hierarchy of needs
Counting hierarchy of needs
20
Continuous counting
Counting hierarchy of needs
21
Continuous counting
... with low latency,
Counting hierarchy of needs
22
Continuous counting
... with low latency,
... efficiently on high volume streams,
Counting hierarchy of needs
23
Continuous counting
... with low latency,
... efficiently on high volume streams,
... fault tolerant (exactly once),
Counting hierarchy of needs
24
Continuous counting
... with low latency,
... efficiently on high volume streams,
... fault tolerant (exactly once),
... accurate and
repeatable,
Counting hierarchy of needs
25
Continuous counting
... with low latency,
... efficiently on high volume streams,
... fault tolerant (exactly once),
... accurate and
repeatable,
... queryable1.1+
Rest of this talk
26
Continuous counting
... with low latency,
... efficiently on high volume streams,
... fault tolerant (exactly once),
... accurate and
repeatable,
... queryable
Streaming Analytics by Example
27
Time-Windowed Aggregations
28
case class Event(sensor: String, measure: Double)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream: DataStream[Event] = env.addSource(…)
stream.keyBy("sensor").timeWindow(Time.seconds(5)).sum("measure")
Time-Windowed Aggregations
29
case class Event(sensor: String, measure: Double)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream: DataStream[Event] = env.addSource(…)
stream.keyBy("sensor").timeWindow(Time.seconds(60), Time.seconds(5)).sum("measure")
Session-Windowed Aggregations
30
case class Event(sensor: String, measure: Double)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream: DataStream[Event] = env.addSource(…)
stream.keyBy("sensor").window(EventTimeSessionWindows.withGap(Time.seconds(60))).max("measure")
Session-Windowed Aggregations
31
case class Event(sensor: String, measure: Double)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream: DataStream[Event] = env.addSource(…)
stream.keyBy("sensor").window(EventTimeSessionWindows.withGap(Time.seconds(60))).max("measure")
Flink 1.1 syntax
Pattern Detection
32
case class Event(producer: String, evtType: Int, msg: String)case class Alert(msg: String)
val stream: DataStream[Event] = env.addSource(…)stream
.keyBy("producer")
.flatMap(new RichFlatMapFuncion[Event, Alert]() {
lazy val state: ValueState[Int] = getRuntimeContext.getState(…)
def flatMap(event: Event, out: Collector[Alert]) = {val newState = state.value() match {
case 0 if (event.evtType == 0) => 1case 1 if (event.evtType == 1) => 0case x => out.collect(Alert(event.msg, x)); 0
}state.update(newState)
}})
Pattern Detection
33
case class Event(producer: String, evtType: Int, msg: String)case class Alert(msg: String)
val stream: DataStream[Event] = env.addSource(…)stream
.keyBy("producer")
.flatMap(new RichFlatMapFuncion[Event, Alert]() {
lazy val state: ValueState[Int] = getRuntimeContext.getState(…)
def flatMap(event: Event, out: Collector[Alert]) = {val newState = state.value() match {
case 0 if (event.evtType == 0) => 1case 1 if (event.evtType == 1) => 0case x => out.collect(Alert(event.msg, x)); 0
}state.update(newState)
}})
Embedded key/valuestate store
Many more
Joining streams (e.g. combine readings from sensor)
Detecting Patterns (CEP)
Applying (changing) rules or models to events
Training and applying online machine learning models
…
34
(It's) About Time
35
36
The biggest change in moving frombatch to streaming is
handling time explicitly
Example: Windowing by Time
37
case class Event(id: String, measure: Double, timestamp: Long)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream: DataStream[Event] = env.addSource(…)
stream.keyBy("id").timeWindow(Time.seconds(15), Time.seconds(5)).sum("measure")
Example: Windowing by Time
38
case class Event(id: String, measure: Double, timestamp: Long)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream: DataStream[Event] = env.addSource(…)
stream.keyBy("id").timeWindow(Time.seconds(15), Time.seconds(5)).sum("measure")
Different Notions of Time
39
Event Producer Message QueueFlink
Data SourceFlink
Window Operator
partition 1
partition 2
EventTime
IngestionTime
WindowProcessing
Time
1977 1980 1983 1999 2002 2005 2015
Processing Time
Episode
IV
Episode
V
Episode
VI
Episode
I
Episode
II
Episode
III
Episode
VII
Event Time
Event Time vs. Processing Time
40
Processing Time
41
case class Event(id: String, measure: Double, timestamp: Long)
val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(ProcessingTime)
val stream: DataStream[Event] = env.addSource(…)
stream.keyBy("id").timeWindow(Time.seconds(15), Time.seconds(5)).sum("measure")
Window by operator's processing time
Ingestion Time
42
case class Event(id: String, measure: Double, timestamp: Long)
val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(IngestionTime)
val stream: DataStream[Event] = env.addSource(…)
stream.keyBy("id").timeWindow(Time.seconds(15), Time.seconds(5)).sum("measure")
Event Time
43
case class Event(id: String, measure: Double, timestamp: Long)
val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(EventTime)
val stream: DataStream[Event] = env.addSource(…)
stream.keyBy("id").timeWindow(Time.seconds(15), Time.seconds(5)).sum("measure")
Event Time
44
case class Event(id: String, measure: Double, timestamp: Long)
val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(EventTime)
val stream: DataStream[Event] = env.addSource(…)val tsStream = stream.assignAscendingTimestamps(_.timestamp)
tsStream.keyBy("id").timeWindow(Time.seconds(15), Time.seconds(5)).sum("measure")
Event Time
45
case class Event(id: String, measure: Double, timestamp: Long)
val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(EventTime)
val stream: DataStream[Event] = env.addSource(…)val tsStream = stream.assignTimestampsAndWatermarks(
new MyTimestampsAndWatermarkGenerator())
tsStream.keyBy("id").timeWindow(Time.seconds(15), Time.seconds(5)).sum("measure")
Watermarks
46
7
W(11)W(17)
11159121417122220 171921
WatermarkEvent
Event timestamp
Stream (in order)
7
W(11)W(20)
Watermark
991011141517
Event
Event timestamp
1820 192123
Stream (out of order)
Watermarks in Parallel
47
Source(1)
Source(2)
map(1)
map(2)
window(1)
window(2)
2929
17
14
14
29
14
14
W(33)
W(17)
W(17)
A|30B|31
C|30
D|15
E|30
F|15G|18H|20
K|35
Watermark
Event Timeat the operator
Event[id|timestamp]
Event Timeat input streams
WatermarkGeneration
M|39N|39Q|44
L|22O|23R|37
Per Kafka Partition Watermarks
48
Source(1)
Source(2)
map(1)
map(2)
window(1)
window(2)
33
17
2929
17
14
14
29
14
14
W(33)
W(17)
W(17)
A|30B|73
C|33
D|18
E|31
F|15G|91H|94
K|77
WatermarkGeneration
L|35N|39
O|97 M|89
I|21Q|23
T|99 S|97
Per Kafka Partition Watermarks
49
val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(EventTime)
val kafka = new FlinkKafkaConsumer09(topic, schema, props)kafka.assignTimestampsAndWatermarks(
new MyTimestampsAndWatermarkGenerator())
val stream: DataStream[Event] = env.addSource(kafka)stream
.keyBy("id")
.timeWindow(Time.seconds(15), Time.seconds(5))
.sum("measure")
Matters of State(Fault Tolerance, Reinstatements, etc)
50
Back to the Aggregation Example
51
case class Event(id: String, measure: Double, timestamp: Long)
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream: DataStream[Event] = env.addSource(new FlinkKafkaConsumer09(topic, schema, properties))
stream.keyBy("id").timeWindow(Time.seconds(15), Time.seconds(5)).sum("measure")
Stateful
Fault Tolerance
Prevent data loss (reprocess lost in-flight events)
Recover state consistency (exactly-once semantics)• Pending windows & user-defined (key/value) state
Checkpoint based fault tolerance• Periodicaly create checkpoints
• Recovery: resume from last completed checkpoint
• Async. Barrier Snapshots (ABS) Algorithm
52
Checkpoints
53
data stream
event
newer records older records
State of the dataflowat point Y
State of the dataflowat point X
Checkpoint Barriers
Markers, injected into the streams
54
Checkpoint Procedure
55
Checkpoint Procedure
56
Savepoints
A "Checkpoint" is a globally consistent point-in-time snapshot of the streaming program (point in stream, state)
A "Savepoint" is a user-triggered retained checkpoint
Streaming programs can start from a savepoint
57
Savepoint B Savepoint A
(Re)processing data (in batch)
Re-processing data (what-if exploration, to correct bugs, etc.)
Usually by running a batch job with a set of old files
Tools that map files to times
58
2016-3-112:00 am
2016-3-11:00 am
2016-3-12:00 am
2016-3-1111:00pm
2016-3-1212:00am
2016-3-121:00am…
Collection of files, by ingestion time
2016-3-1110:00pm
To the batchprocessor
Unclear Batch Boundaries
59
2016-3-112:00 am
2016-3-11:00 am
2016-3-12:00 am
2016-3-1111:00pm
2016-3-1212:00am
2016-3-121:00am…
2016-3-1110:00pm
To the batchprocessor
??
What about sessions across batches?
(Re)processing data (streaming)
Draw savepoints at times that you will want to start new jobs from (daily, hourly, …)
Reprocess by starting a new job from a savepoint• Defines start position in stream (for example Kafka offsets)
• Initializes pending state (like partial sessions)
60
Savepoint
Run new streamingprogram from savepoint
Continuous Data Sources
61
2016-3-112:00 am
2016-3-11:00 am
2016-3-12:00 am
2016-3-1111:00pm
2016-3-1212:00am
2016-3-121:00am
2016-3-1110:00pm …
partition
partition
Savepoint
Savepoint
Stream of Kafka Partitions
Stream view over sequence of files
Kafka offsets +Operator state
File mod timestamp +File position +Operator state
WIP (Flink 1.1?)
Upgrading Programs
A program starting from a savepoint can differ from the program that created the savepoint• Unique operator names match state and operator
Mechanism be used to fix bugs in programs, to evolve programs, parameters, libraries, …
62
State Backends
Large state is a collection of key/value pairs
State backend defines what data structure holds the state, plus how it is snapshotted
Most common choices• Main memory – snapshots to master
• Main memory – snapshots to dist. filesystem
• RocksDB – snapshots to dist. filesystem
63
Complex Event Processing Primer
64
Event Types
65
Defining Patterns
66
Generating Alerts
67
Latency and Throughput
68
Low Latency and High Throughput
Frequently though to be mutually exclusive
• Event-at-a-time low latency, low throughput
• Mini batch high latency, high throughput
The above is not true!
Very little latency has to be sacrificed for very high throughput
69
Latency and Throughput
70
Latency and Throughput
71
The Effect of Buffering
Network stack does not alwaysoperate in event-at-a-timemode
Optional buffering addssome milliseconds latencybut increases throughput
No effect on application logic
72
An Outlook on Things to Come
73
Roadmap
Dynamic Scaling, Resource Elasticity Stream SQL CEP enhancements Incremental & asynchronous state snapshotting Mesos support More connectors, end-to-end exactly once API enhancements (e.g., joins, slowly changing inputs)
Security (data encryption, Kerberos with Kafka)
74
75
I stream*, do you?
* beyond Netflix movies
Why does Flink stream flink?
76
Low latency
High Throughput
Well-behavedflow control
(back pressure)
Make more sense of data
Works on real-timeand historic data
TrueStreaming
Event Time
APIsLibraries
StatefulStreaming
Globally consistentsavepoints
Exactly-once semanticsfor fault tolerance
Windows &user-defined state
Flexible windows(time, count, session, roll-your own)
Complex Event Processing
Addendum
77
On a technical level
Decouple all things
Clocks• Wall clock time (processing time)• Event time (watermarks & punctuations)• Consistency clock (logical checkpoint timestamps)
Buffering• Windows (application logic)• Network (throughput tuning)
…
78
Decoupling clocks
79
Stream Alignment
80
High Availability Checkpoints
81
JobManager
Client
TaskManagers
Apache Zookeeper™
1. Take snapshots
High Availability Checkpoints
82
JobManager
Client
TaskManagers
Apache Zookeeper™
1. Take snapshots2. Persist snapshots3. Send handles to JM
High Availability Checkpoints
83
JobManager
Client
TaskManagers
Apache Zookeeper™
1. Take snapshots2. Persist snapshots3. Send handles to JM4. Create global checkpoint
High Availability Checkpoints
84
JobManager
Client
TaskManagers
Apache Zookeeper™
1. Take snapshots2. Persist snapshots3. Send handles to JM4. Create global checkpoint5. Persist global checkpoint
High Availability Checkpoints
85
JobManager
Client
TaskManagers
Apache Zookeeper™
1. Take snapshots2. Persist snapshots3. Send handles to JM4. Create global checkpoint5. Persist global checkpoint6. Write handle to ZooKeeper