advanced streaming analytics with apache flink and apache kafka, stephan ewen

50
Stephan Ewen @stephanewen Streaming Analytics with Apache Flink

Upload: confluent

Post on 15-Apr-2017

3.655 views

Category:

Engineering


5 download

TRANSCRIPT

Page 1: Advanced Streaming Analytics with Apache Flink and Apache Kafka, Stephan Ewen

Stephan Ewen

@stephanewen

Streaming Analyticswith Apache Flink

Page 2: Advanced Streaming Analytics with Apache Flink and Apache Kafka, Stephan Ewen

Apache Flink Stack

2

DataStream APIStream Processing

DataSet APIBatch Processing

RuntimeDistributed Streaming Data Flow

LibrariesApache Beam

Streaming and batch as first class citizens.

Page 3: Advanced Streaming Analytics with Apache Flink and Apache Kafka, Stephan Ewen

Today

3

Streaming and batch as first class citizens.

DataStream APIStream Processing

DataSet APIBatch Processing

RuntimeDistributed Streaming Data Flow

LibrariesApache Beam

Page 4: Advanced Streaming Analytics with Apache Flink and Apache Kafka, Stephan Ewen

4

Streaming technology is enabling the obvious: continuous processing on data that is

continuously produced

Page 5: Advanced Streaming Analytics with Apache Flink and Apache Kafka, Stephan Ewen

Continuous Processing with Batch

Continuous ingestion

Periodic (e.g., hourly) files

Periodic batch jobs

5

Page 6: Advanced Streaming Analytics with Apache Flink and Apache Kafka, Stephan Ewen

λ Architecture

"Batch layer": what we had before

"Stream layer": approximate early results

6

Page 7: Advanced Streaming Analytics with Apache Flink and Apache Kafka, Stephan Ewen

A Stream Processing Pipeline

7

collect log analyze serve & store

Page 8: Advanced Streaming Analytics with Apache Flink and Apache Kafka, Stephan Ewen

Programs and Dataflows

8

Source

Transformation

Transformation

Sink

val lines: DataStream[String] = env.addSource(new FlinkKafkaConsumer09(…))

val events: DataStream[Event] = lines.map((line) => parse(line))

val stats: DataStream[Statistic] = stream.keyBy("sensor").timeWindow(Time.seconds(5)).sum(new MyAggregationFunction())

stats.addSink(new RollingSink(path))

Source[1]

map()[1]

keyBy()/window()/

apply()[1]

Sink[1]

Source[2]

map()[2]

keyBy()/window()/

apply()[2]

StreamingDataflow

Page 9: Advanced Streaming Analytics with Apache Flink and Apache Kafka, Stephan Ewen

Why does Flink stream flink?

9

Low latency

High Throughput

Well-behavedflow control

(back pressure)

Make more sense of data

Works on real-timeand historic data

TrueStreaming

Event Time

APIsLibraries

StatefulStreaming

Globally consistentsavepoints

Exactly-once semanticsfor fault tolerance

Windows &user-defined state

Flexible windows(time, count, session, roll-your own)

Complex Event Processing

Page 10: Advanced Streaming Analytics with Apache Flink and Apache Kafka, Stephan Ewen

Streaming Analytics by Example

10

Page 11: Advanced Streaming Analytics with Apache Flink and Apache Kafka, Stephan Ewen

Time-Windowed Aggregations

11

case class Event(sensor: String, measure: Double)

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream: DataStream[Event] = env.addSource(…)

stream.keyBy("sensor").timeWindow(Time.seconds(5)).sum("measure")

Page 12: Advanced Streaming Analytics with Apache Flink and Apache Kafka, Stephan Ewen

Time-Windowed Aggregations

12

case class Event(sensor: String, measure: Double)

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream: DataStream[Event] = env.addSource(…)

stream.keyBy("sensor").timeWindow(Time.seconds(60), Time.seconds(5)).sum("measure")

Page 13: Advanced Streaming Analytics with Apache Flink and Apache Kafka, Stephan Ewen

Session-Windowed Aggregations

13

case class Event(sensor: String, measure: Double)

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream: DataStream[Event] = env.addSource(…)

stream.keyBy("sensor").window(EventTimeSessionWindows.withGap(Time.seconds(60))).max("measure")

Page 14: Advanced Streaming Analytics with Apache Flink and Apache Kafka, Stephan Ewen

Pattern Detection

14

case class Event(producer: String, evtType: Int, msg: String)case class Alert(msg: String)

val stream: DataStream[Event] = env.addSource(…)stream

.keyBy("producer")

.flatMap(new RichFlatMapFuncion[Event, Alert]() {

lazy val state: ValueState[Int] = getRuntimeContext.getState(…)

def flatMap(event: Event, out: Collector[Alert]) = {val newState = state.value() match {

case 0 if (event.evtType == 0) => 1case 1 if (event.evtType == 1) => 0case x => out.collect(Alert(event.msg, x)); 0

}state.update(newState)

}})

Page 15: Advanced Streaming Analytics with Apache Flink and Apache Kafka, Stephan Ewen

Pattern Detection

15

case class Event(producer: String, evtType: Int, msg: String)case class Alert(msg: String)

val stream: DataStream[Event] = env.addSource(…)stream

.keyBy("producer")

.flatMap(new RichFlatMapFuncion[Event, Alert]() {

lazy val state: ValueState[Int] = getRuntimeContext.getState(…)

def flatMap(event: Event, out: Collector[Alert]) = {val newState = state.value() match {

case 0 if (event.evtType == 0) => 1case 1 if (event.evtType == 1) => 0case x => out.collect(Alert(event.msg, x)); 0

}state.update(newState)

}})

Embedded key/valuestate store

Page 16: Advanced Streaming Analytics with Apache Flink and Apache Kafka, Stephan Ewen

Many more

Joining streams (e.g. combine readings from sensor)

Detecting Patterns (CEP)

Applying (changing) rules or models to events

Training and applying online machine learning models

16

Page 17: Advanced Streaming Analytics with Apache Flink and Apache Kafka, Stephan Ewen

(It's) About Time

17

Page 18: Advanced Streaming Analytics with Apache Flink and Apache Kafka, Stephan Ewen

18

The biggest change in moving frombatch to streaming is

handling time explicitly

Page 19: Advanced Streaming Analytics with Apache Flink and Apache Kafka, Stephan Ewen

Example: Windowing by Time

19

case class Event(id: String, measure: Double, timestamp: Long)

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream: DataStream[Event] = env.addSource(…)

stream.keyBy("id").timeWindow(Time.seconds(15), Time.seconds(5)).sum("measure")

Page 20: Advanced Streaming Analytics with Apache Flink and Apache Kafka, Stephan Ewen

Different Notions of Time

20

Event Producer Message QueueFlink

Data SourceFlink

Window Operator

partition 1

partition 2

EventTime

IngestionTime

WindowProcessing

Time

Page 21: Advanced Streaming Analytics with Apache Flink and Apache Kafka, Stephan Ewen

Time and the Dataflow Model

Event Time semantics in Flink follow theDataflow model (Apache Beam (incub.))

See previous talk by Frances Perry& Tyler Akidau

For the sake of time (no pun intended) I, only brieflyrecapitulate on the basic concept

21

Page 22: Advanced Streaming Analytics with Apache Flink and Apache Kafka, Stephan Ewen

1977 1980 1983 1999 2002 2005 2015

Processing Time

Episode

IV

Episode

V

Episode

VI

Episode

I

Episode

II

Episode

III

Episode

VII

Event Time

Event Time vs. Processing Time

22

Page 23: Advanced Streaming Analytics with Apache Flink and Apache Kafka, Stephan Ewen

Processing Time

23

case class Event(id: String, measure: Double, timestamp: Long)

val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(ProcessingTime)

val stream: DataStream[Event] = env.addSource(…)

stream.keyBy("id").timeWindow(Time.seconds(15), Time.seconds(5)).sum("measure")

Page 24: Advanced Streaming Analytics with Apache Flink and Apache Kafka, Stephan Ewen

Ingestion Time

24

case class Event(id: String, measure: Double, timestamp: Long)

val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(IngestionTime)

val stream: DataStream[Event] = env.addSource(…)

stream.keyBy("id").timeWindow(Time.seconds(15), Time.seconds(5)).sum("measure")

Page 25: Advanced Streaming Analytics with Apache Flink and Apache Kafka, Stephan Ewen

Event Time

25

case class Event(id: String, measure: Double, timestamp: Long)

val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(EventTime)

val stream: DataStream[Event] = env.addSource(…)

stream.keyBy("id").timeWindow(Time.seconds(15), Time.seconds(5)).sum("measure")

Page 26: Advanced Streaming Analytics with Apache Flink and Apache Kafka, Stephan Ewen

Event Time

26

case class Event(id: String, measure: Double, timestamp: Long)

val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(EventTime)

val stream: DataStream[Event] = env.addSource(…)val tsStream = stream.assignTimestampsAndWatermarks(

new MyTimestampsAndWatermarkGenerator())

tsStream.keyBy("id").timeWindow(Time.seconds(15), Time.seconds(5)).sum("measure")

Page 27: Advanced Streaming Analytics with Apache Flink and Apache Kafka, Stephan Ewen

Watermarks

27

7

W(11)W(17)

11159121417122220 171921

WatermarkEvent

Event timestamp

Stream (in order)

7

W(11)W(20)

Watermark

991011141517

Event

Event timestamp

1820 192123

Stream (out of order)

Page 28: Advanced Streaming Analytics with Apache Flink and Apache Kafka, Stephan Ewen

Watermarks in Parallel

28

Source(1)

Source(2)

map(1)

map(2)

window(1)

window(2)

2929

17

14

14

29

14

14

W(33)

W(17)

W(17)

A|30B|31

C|30

D|15

E|30

F|15G|18H|20

K|35

Watermark

Event Timeat the operator

Event[id|timestamp]

Event Timeat input streams

WatermarkGeneration

M|39N|39Q|44

L|22O|23R|37

Page 29: Advanced Streaming Analytics with Apache Flink and Apache Kafka, Stephan Ewen

Per Kafka Partition Watermarks

29

Source(1)

Source(2)

map(1)

map(2)

window(1)

window(2)

33

17

2929

17

14

14

29

14

14

W(33)

W(17)

W(17)

A|30B|73

C|33

D|18

E|31

F|15G|91H|94

K|77

WatermarkGeneration

L|35N|39

O|97 M|89

I|21Q|23

T|99 S|97

Page 30: Advanced Streaming Analytics with Apache Flink and Apache Kafka, Stephan Ewen

Matters of State(Fault Tolerance, Reinstatements, etc)

30

Page 31: Advanced Streaming Analytics with Apache Flink and Apache Kafka, Stephan Ewen

Back to the Aggregation Example

31

case class Event(id: String, measure: Double, timestamp: Long)

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream: DataStream[Event] = env.addSource(new FlinkKafkaConsumer09(topic, schema, properties))

stream.keyBy("id").timeWindow(Time.seconds(15), Time.seconds(5)).sum("measure")

Stateful

Page 32: Advanced Streaming Analytics with Apache Flink and Apache Kafka, Stephan Ewen

Fault Tolerance

Prevent data loss (reprocess lost in-flight events)

Recover state consistency (exactly-once semantics)• Pending windows & user-defined (key/value) state

Checkpoint based fault tolerance• Periodically create checkpoints

• Recovery: resume from last completed checkpoint

• Async. Barrier Snapshots (ABS) Algorithm

32

Page 33: Advanced Streaming Analytics with Apache Flink and Apache Kafka, Stephan Ewen

Checkpoints

33

data stream

event

newer records older records

State of the dataflowat point Y

State of the dataflowat point X

Page 34: Advanced Streaming Analytics with Apache Flink and Apache Kafka, Stephan Ewen

Checkpoint Barriers

Markers, injected into the streams

34

Page 35: Advanced Streaming Analytics with Apache Flink and Apache Kafka, Stephan Ewen

Checkpoint Procedure

35

Page 36: Advanced Streaming Analytics with Apache Flink and Apache Kafka, Stephan Ewen

Checkpoint Procedure

36

Page 37: Advanced Streaming Analytics with Apache Flink and Apache Kafka, Stephan Ewen

Savepoints

A "Checkpoint" is a globally consistent point-in-time snapshot of the streaming program (point in stream, state)

A "Savepoint" is a user-triggered retained checkpoint

Streaming programs can start from a savepoint

37

Savepoint B Savepoint A

Page 38: Advanced Streaming Analytics with Apache Flink and Apache Kafka, Stephan Ewen

(Re)processing data (in batch)

Re-processing data (what-if exploration, to correct bugs, etc.)

Usually by running a batch job with a set of old files

Tools that map files to times

38

2016-3-112:00 am

2016-3-11:00 am

2016-3-12:00 am

2016-3-1111:00pm

2016-3-1212:00am

2016-3-121:00am…

Collection of files, by ingestion time

2016-3-1110:00pm

To the batchprocessor

Page 39: Advanced Streaming Analytics with Apache Flink and Apache Kafka, Stephan Ewen

Unclear Batch Boundaries

39

2016-3-112:00 am

2016-3-11:00 am

2016-3-12:00 am

2016-3-1111:00pm

2016-3-1212:00am

2016-3-121:00am…

2016-3-1110:00pm

To the batchprocessor

??

What about sessions across batches?

Page 40: Advanced Streaming Analytics with Apache Flink and Apache Kafka, Stephan Ewen

(Re)processing data (streaming)

Draw savepoints at times that you will want to start new jobs from (daily, hourly, …)

Reprocess by starting a new job from a savepoint• Defines start position in stream (for example Kafka offsets)

• Initializes pending state (like partial sessions)

40

Savepoint

Run new streamingprogram from savepoint

Page 41: Advanced Streaming Analytics with Apache Flink and Apache Kafka, Stephan Ewen

Continuous Data Sources

41

2016-3-112:00 am

2016-3-11:00 am

2016-3-12:00 am

2016-3-1111:00pm

2016-3-1212:00am

2016-3-121:00am

2016-3-1110:00pm …

partition

partition

Savepoint

Savepoint

Stream of Kafka Partitions

Stream view over sequence of files

Kafka offsets +Operator state

File mod timestamp +File position +Operator state

WIP (target: Flink 1.1)

Page 42: Advanced Streaming Analytics with Apache Flink and Apache Kafka, Stephan Ewen

Complex Event Processing PrimerDemo Time

42

Page 43: Advanced Streaming Analytics with Apache Flink and Apache Kafka, Stephan Ewen

Demo Scenario

Pattern validation & violation detection:

Events should follow a certain pattern,or an alert should be raised

Think cybersecurity, process monitoring, etc

43

Page 44: Advanced Streaming Analytics with Apache Flink and Apache Kafka, Stephan Ewen

An Outlook on Things to Come

44

Page 45: Advanced Streaming Analytics with Apache Flink and Apache Kafka, Stephan Ewen

Flink in the wild

45

30 billion events daily 2 billion events in

10 1Gb machines

data integration & distribution

platform

See talks by at

Page 46: Advanced Streaming Analytics with Apache Flink and Apache Kafka, Stephan Ewen

Roadmap

Dynamic Scaling, Resource Elasticity Stream SQL CEP enhancements Incremental & asynchronous state snapshotting Mesos support More connectors, end-to-end exactly once API enhancements (e.g., joins, slowly changing inputs)

Security (data encryption, Kerberos with Kafka)

46

Page 47: Advanced Streaming Analytics with Apache Flink and Apache Kafka, Stephan Ewen

47

Apache Flink Meetup - Thursday, April, 28th

Page 48: Advanced Streaming Analytics with Apache Flink and Apache Kafka, Stephan Ewen

Flink Forward 2016, BerlinSubmission deadline: June 30, 2016Early bird deadline: July 15, 2016

www.flink-forward.org

Page 49: Advanced Streaming Analytics with Apache Flink and Apache Kafka, Stephan Ewen

We are hiring!data-artisans.com/careers

Page 50: Advanced Streaming Analytics with Apache Flink and Apache Kafka, Stephan Ewen

50

I stream, do you?