apache flink @ nyc flink meetup

Post on 08-Jan-2017

1.021 Views

Category:

Technology

1 Downloads

Preview:

Click to see full reader

TRANSCRIPT

Stephan Ewen@stephanewen

Streaming Analyticswith Apache Flink 1.0

Apache Flink Stack

2

DataStream APIStream Processing

DataSet APIBatch Processing

RuntimeDistributed Streaming Data Flow

Libraries

Streaming and batch as first class citizens.

Today

3

Streaming and batch as first class citizens.

DataStream APIStream Processing

DataSet APIBatch Processing

RuntimeDistributed Streaming Data Flow

Libraries

4

Streaming is the next programming paradigm for data applications, and you need to start

thinking in terms of streams.

5

Streaming technology is enabling the obvious: continuous processing on data that is

continuously produced

Continuous Processing with Batch Continuous

ingestion

Periodic (e.g., hourly) files

Periodic batch jobs

6

λ Architecture "Batch layer": what

we had before

"Stream layer": approximate early results

7

A Stream Processing Pipeline

8

collect log analyze serve & store

A brief History of Flink

9

January ‘10 December ‘14

v0.5 v0.6 v0.7

March ‘16

Flink ProjectIncubation

Top LevelProject

v0.8 v0.10

Release1.0

ProjectStratosphere

(Flink precursor)

v0.9

April ‘14

A brief History of Flink

10

January ‘10 December ‘14

v0.5 v0.6 v0.7

March ‘16

Flink ProjectIncubation

Top LevelProject

v0.8 v0.10

Release1.0

ProjectStratosphere

(Flink precursor)

v0.9

April ‘14

The academia gap:Reading/writing papers,

teaching, worrying about thesis

Realizing this might be interesting to people

beyond academia(even more so, actually)

Programs and Dataflows

11

Source

Transformation

Transformation

Sink

val lines: DataStream[String] = env.addSource(new FlinkKafkaConsumer09(…))

val events: DataStream[Event] = lines.map((line) => parse(line))

val stats: DataStream[Statistic] = stream .keyBy("sensor") .timeWindow(Time.seconds(5)) .sum(new MyAggregationFunction())

stats.addSink(new RollingSink(path))

Source[1]

map()[1]

keyBy()/window()/

apply()[1]

Sink[1]

Source[2]

map()[2]

keyBy()/window()/

apply()[2]

StreamingDataflow

What makes Flink flink?

12

Low latency

High Throughput

Well-behavedflow control

(back pressure)

Make more sense of data

Works on real-timeand historic data

TrueStreaming

Event Time

APIsLibraries

StatefulStreaming

Globally consistentsavepoints

Exactly-once semanticsfor fault tolerance

Windows &user-defined state

Flexible windows(time, count, session, roll-your own)

Complex Event Processing

13

Streaming Analytics by Example

Time-Windowed Aggregations

14

case class Event(sensor: String, measure: Double)

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream: DataStream[Event] = env.addSource(…)

stream .keyBy("sensor") .timeWindow(Time.seconds(5)) .sum("measure")

Time-Windowed Aggregations

15

case class Event(sensor: String, measure: Double)

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream: DataStream[Event] = env.addSource(…)

stream .keyBy("sensor") .timeWindow(Time.seconds(60), Time.seconds(5)) .sum("measure")

Session-Windowed Aggregations

16

case class Event(sensor: String, measure: Double)

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream: DataStream[Event] = env.addSource(…)

stream .keyBy("sensor") .window(EventTimeSessionWindows.withGap(Time.seconds(60))) .max("measure")

Session-Windowed Aggregations

17

case class Event(sensor: String, measure: Double)

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream: DataStream[Event] = env.addSource(…)

stream .keyBy("sensor") .window(EventTimeSessionWindows.withGap(Time.seconds(60))) .max("measure")

Flink 1.1 syntax

Pattern Detection

18

case class Event(producer: String, evtType: Int, msg: String)case class Alert(msg: String)

val stream: DataStream[Event] = env.addSource(…)stream .keyBy("producer") .flatMap(new RichFlatMapFuncion[Event, Alert]() {

lazy val state: ValueState[Int] = getRuntimeContext.getState(…)

def flatMap(event: Event, out: Collector[Alert]) = { val newState = state.value() match { case 0 if (event.evtType == 0) => 1 case 1 if (event.evtType == 1) => 0 case x => out.collect(Alert(event.msg, x)); 0 } state.update(newState) } })

Pattern Detection

19

case class Event(producer: String, evtType: Int, msg: String)case class Alert(msg: String)

val stream: DataStream[Event] = env.addSource(…)stream .keyBy("producer") .flatMap(new RichFlatMapFuncion[Event, Alert]() {

lazy val state: ValueState[Int] = getRuntimeContext.getState(…)

def flatMap(event: Event, out: Collector[Alert]) = { val newState = state.value() match { case 0 if (event.evtType == 0) => 1 case 1 if (event.evtType == 1) => 0 case x => out.collect(Alert(event.msg, x)); 0 } state.update(newState) } })

Embedded key/valuestate store

Many more Joining streams (e.g. combine readings from sensor) Detecting Patterns (CEP) Applying (changing) rules or models to events Training and applying online machine learning

models …

20

21

(It's) About Time

22

The biggest change in moving frombatch to streaming is

handling time explicitly

Example: Windowing by Time

23

case class Event(id: String, measure: Double, timestamp: Long)

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream: DataStream[Event] = env.addSource(…)

stream .keyBy("id") .timeWindow(Time.seconds(15), Time.seconds(5)) .sum("measure")

Example: Windowing by Time

24

case class Event(id: String, measure: Double, timestamp: Long)

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream: DataStream[Event] = env.addSource(…)

stream .keyBy("id") .timeWindow(Time.seconds(15), Time.seconds(5)) .sum("measure")

Different Notions of Time

25

Event Producer Message Queue FlinkData Source

FlinkWindow Operator

partition 1

partition 2

EventTime

IngestionTime

WindowProcessing

Time

1977 1980 1983 1999 2002 2005 2015

Processing Time

EpisodeIV

EpisodeV

EpisodeVI

EpisodeI

EpisodeII

EpisodeIII

EpisodeVII

Event Time

Event Time vs. Processing Time

26

Out of order Streams

27

Events occur on devices

Queue / Log

Events analyzed in a

data streaming system

Stream Analysis

Events stored in a log

Out of order Streams

28

Out of order Streams

29

Out of order Streams

30

Out of order Streams

31

Out of order !!!

First burst of eventsSecond burst of events

32

Out of order Streams

Event time windows

Arrival time windows

Instant event-at-a-time

First burst of eventsSecond burst of events

Processing Time

33

case class Event(id: String, measure: Double, timestamp: Long)

val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(ProcessingTime)

val stream: DataStream[Event] = env.addSource(…)

stream .keyBy("id") .timeWindow(Time.seconds(15), Time.seconds(5)) .sum("measure") Window by operator's processing time

Ingestion Time

34

case class Event(id: String, measure: Double, timestamp: Long)

val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(IngestionTime)

val stream: DataStream[Event] = env.addSource(…)

stream .keyBy("id") .timeWindow(Time.seconds(15), Time.seconds(5)) .sum("measure")

Event Time

35

case class Event(id: String, measure: Double, timestamp: Long)

val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(EventTime)

val stream: DataStream[Event] = env.addSource(…)

stream .keyBy("id") .timeWindow(Time.seconds(15), Time.seconds(5)) .sum("measure")

Event Time

36

case class Event(id: String, measure: Double, timestamp: Long)

val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(EventTime)

val stream: DataStream[Event] = env.addSource(…)val tsStream = stream.assignAscendingTimestamps(_.timestamp)

tsStream .keyBy("id") .timeWindow(Time.seconds(15), Time.seconds(5)) .sum("measure")

Event Time

37

case class Event(id: String, measure: Double, timestamp: Long)

val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(EventTime)

val stream: DataStream[Event] = env.addSource(…)val tsStream = stream.assignTimestampsAndWatermarks( new MyTimestampsAndWatermarkGenerator())

tsStream .keyBy("id") .timeWindow(Time.seconds(15), Time.seconds(5)) .sum("measure")

Watermarks

38

7

W(11)W(17)

11159121417122220 171921

WatermarkEvent

Event timestamp

Stream (in order)

7

W(11)W(20)

Watermark

991011141517

Event

Event timestamp

1820 192123

Stream (out of order)

Watermarks in Parallel

39

Source(1)

Source(2)

map(1)

map(2)

window(1)

window(2)

2929

17

14

14

2914

14

W(33)

W(17)

W(17)

A|30B|31

C|30

D|15

E|30

F|15G|18H|20

K|35

Watermark

Event Timeat the operator

Event[id|timestamp]

Event Timeat input streams

33

17

WatermarkGeneration

M|39N|39Q|44

L|22O|23R|37

Mixing Event Time Processing Time

40

case class Event(id: String, measure: Double, timestamp: Long)

val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(EventTime)

val stream: DataStream[Event] = env.addSource(…)val tsStream = stream.assignAscendingTimestamps(_.timestamp)

tsStream .keyBy("id") .window(SlidingEventTimeWindows.of(seconds(15), seconds(5)) .trigger(new MyTrigger()) .sum("measure")

Window Triggers React to any combination of

• Event Time• Processing Time• Event data

Example of a mixed EventTime / Proc. Time Trigger:• Trigger when event time reaches window end

OR• When processing time reaches window end plus 30 secs.

41

Trigger example

42

.sum("measure")public class EventTimeTrigger extends Trigger<Object, TimeWindow> {

public TriggerResult onElement(Object evt, long time, TimeWindow window, TriggerContext ctx) { ctx.registerEventTimeTimer(window.maxTimestamp()); ctx.registerProcessingTimeTimer(window.maxTimestamp() + 30000); return TriggerResult.CONTINUE;}

public TriggerResult onEventTime(long time, TimeWindow w, TriggerContext ctx) { return TriggerResult.FIRE_AND_PURGE;}

public TriggerResult onProcessingTime(long time, TimeWindow w, TriggerContext c) { return TriggerResult.FIRE_AND_PURGE;}

Trigger example

43

.sum("measure")public class EventTimeTrigger extends Trigger<Object, TimeWindow> {

public TriggerResult onElement(Object evt, long time, TimeWindow window, TriggerContext ctx) { ctx.registerEventTimeTimer(window.maxTimestamp()); ctx.registerProcessingTimeTimer(window.maxTimestamp() + 30000); return TriggerResult.CONTINUE;}

public TriggerResult onEventTime(long time, TimeWindow w, TriggerContext ctx) { return TriggerResult.FIRE_AND_PURGE;}

public TriggerResult onProcessingTime(long time, TimeWindow w, TriggerContext c) { return TriggerResult.FIRE_AND_CONTINUE;}

Per Kafka Partition Watermarks

44

Source(1)

Source(2)

map(1)

map(2)

window(1)

window(2)

33

17

2929

17

14

14

2914

14

W(33)

W(17)

W(17)

A|30B|73

C|33

D|18

E|31

F|15G|91H|94

K|77

WatermarkGeneration

L|35N|39

O|97 M|89

I|21Q|23

T|99 S|97

Per Kafka Partition Watermarks

45

val env = StreamExecutionEnvironment.getExecutionEnvironmentenv.setStreamTimeCharacteristic(EventTime)

val kafka = new FlinkKafkaConsumer09(topic, schema, props)kafka.assignTimestampsAndWatermarks( new MyTimestampsAndWatermarkGenerator())

val stream: DataStream[Event] = env.addSource(kafka)stream .keyBy("id") .timeWindow(Time.seconds(15), Time.seconds(5)) .sum("measure")

46

Matters of State(Fault Tolerance, Reinstatements, etc)

Back to the Aggregation Example

47

case class Event(id: String, measure: Double, timestamp: Long)

val env = StreamExecutionEnvironment.getExecutionEnvironment

val stream: DataStream[Event] = env.addSource( new FlinkKafkaConsumer09(topic, schema, properties))

stream .keyBy("id") .timeWindow(Time.seconds(15), Time.seconds(5)) .sum("measure")

Stateful

Fault Tolerance Prevent data loss (reprocess lost in-flight events) Recover state consistency (exactly-once semantics)

• Pending windows & user-defined (key/value) state

Checkpoint based fault tolerance• Periodicaly create checkpoints • Recovery: resume from last completed checkpoint• Async. Barrier Snapshots (ABS) Algorithm

48

Checkpoints

49

data stream

event

newer records older records

State of the dataflowat point Y

State of the dataflowat point X

Checkpoint Barriers Markers, injected into the streams

50

Checkpoint Procedure

51

Checkpoint Procedure

52

Savepoints A "Checkpoint" is a globally consistent point-in-time snapshot

of the streaming program (point in stream, state) A "Savepoint" is a user-triggered retained checkpoint Streaming programs can start from a savepoint

53Savepoint B Savepoint A

(Re)processing data (in batch) Re-processing data (what-if exploration, to correct bugs, etc.) Usually by running a batch job with a set of old files Tools that map files to times

54

2016-3-112:00 am

2016-3-11:00 am

2016-3-12:00 am

2016-3-1111:00pm

2016-3-1212:00am

2016-3-121:00am…

Collection of files, by ingestion time

2016-3-1110:00pm

To the batchprocessor

Unclear Batch Boundaries

55

2016-3-112:00 am

2016-3-11:00 am

2016-3-12:00 am

2016-3-1111:00pm

2016-3-1212:00am

2016-3-121:00am…

2016-3-1110:00pm

To the batchprocessor

??

What about sessions across batches?

(Re)processing data (streaming) Draw savepoints at times that you will want to start new jobs

from (daily, hourly, …) Reprocess by starting a new job from a savepoint

• Defines start position in stream (for example Kafka offsets)• Initializes pending state (like partial sessions)

56

Savepoint

Run new streamingprogram from savepoint

Continuous Data Sources

57

2016-3-112:00 am

2016-3-11:00 am

2016-3-12:00 am

2016-3-1111:00pm

2016-3-1212:00am

2016-3-121:00am

2016-3-1110:00pm …

partition

partition

Savepoint

Savepoint

Stream of Kafka Partitions

Stream view over sequence of files

Kafka offsets +Operator state

File mod timestamp +File position +Operator state

WIP (target: Flink 1.1)

Upgrading Programs A program starting from a savepoint can differ from the

program that created the savepoint• Unique operator names match state and operator

Mechanism be used to fix bugs in programs, to evolve programs, parameters, libraries, …

58

State Backends Large state is a collection of key/value pairs

State backend defines what data structure holds the state, plus how it is snapshotted

Most common choices• Main memory – snapshots to master• Main memory – snapshots to dist. filesystem• RocksDB – snapshots to dist. filesystem

59

60

Complex Event Processing Primer

Example: Temperature Monitoring Receiving temperature an power events

from sensors Looking for temperatures repeatedly

exceeding thresholds within ashort time period (10 secs)

61

Event Types

62

Defining Patterns

63

Generating Alerts

64

65

An Outlook on Things to Come

Flink in the wild

66

30 billion events daily 2 billion events in 10 1Gb machines

data integration & distribution platform

See talks by at

Roadmap Dynamic Scaling, Resource Elasticity Stream SQL CEP enhancements Incremental & asynchronous state snapshotting Mesos support More connectors, end-to-end exactly once API enhancements (e.g., joins, slowly changing inputs) Security (data encryption, Kerberos with Kafka)

67

68

I stream, do you?

top related