apache flink training - advanced windowing

Post on 17-Mar-2018

1.330 Views

Category:

Technology

1 Downloads

Preview:

Click to see full reader

TRANSCRIPT

1

Apache Flink® Training

Flink v1.3 – 9.9.2017

DataStream API

Advanced Windowing

Windows

The lower-level details

2

Specifying Windowing

3

stream

.keyBy() / keyed vs non-keyed windows

.window() / “Assigner”

.trigger() / each Assigner has a default Trigger

.evictor() / default: no Evictor

.allowedLateness() / default: zero

.reduce|apply|process()

Triggers

Determines when a window is ready to have its

process or window function called, e.g.,

• by counting

• by comparing timestamps to the current watermark

Each WindowAssigner comes with a default

Trigger

4

Trigger interface

onElement()

onEventTime()

onProcessingTime()

onMerge()• merges the states of two triggers when their windows merge

clear()• use this to clear any managed state, etc

5

Trigger interface

onElement()

onEventTime()

onProcessingTime()

onMerge()• merges the states of two triggers when their windows merge

clear()• use this to clear any managed state

6

return a TriggerResult

TriggerResult

CONTINUE

FIRE

PURGE

FIRE_AND_PURGE

Purging only removes the window’s contents

7

8

public class CountTrigger<W extends Window> extends Trigger<Object, W> {

private final long maxCount;

private final ReducingStateDescriptor<Long> stateDesc =

new ReducingStateDescriptor<>("count", new Sum(),

LongSerializer.INSTANCE);

private CountTrigger(long maxCount) {

this.maxCount = maxCount;

}

. . .

}

9

@Override

public TriggerResult onElement(Object element,

long timestamp,

W window,

TriggerContext ctx) throws Exception {

ReducingState<Long> count = ctx.getPartitionedState(stateDesc);

count.add(1L);

if (count.get() >= maxCount) {

count.clear();

return TriggerResult.FIRE;

}

return TriggerResult.CONTINUE;

}

10

@Overridepublic TriggerResult onEventTime(long time, W window, TriggerContext ctx) {

return TriggerResult.CONTINUE;}

@Overridepublic TriggerResult onProcessingTime(long time, W window,

TriggerContext ctx) throws Exception {return TriggerResult.CONTINUE;

}

@Overridepublic void clear(W window, TriggerContext ctx) throws Exception {

ctx.getPartitionedState(stateDesc).clear();}

@Overridepublic boolean canMerge() {

return true;}

@Overridepublic void onMerge(W window, OnMergeContext ctx) throws Exception {

ctx.mergePartitionedState(stateDesc);}

Evictors

Control which elements are passed to the window function (by removing elements from a window)

Always called after the trigger fires, but beforeand/or after the window function• evictBefore()

• evictAfter()

11

Late elements

When allowedLateness > 0

Late elements can trigger late firings

With merging windows, late firings can lead to

late merging

12

ProcessWindowFunction

New and improved version of WindowFunction

that has more information about the window-

firing context

Has API for accessing both per-window state

and global state (both per key)

1

3

ProcessWindowFunction (API)

1

4

abstract class ProcessWindowFunction<IN, OUT, KEY, W extends Window>extends AbstractRichFunction {

/** Called for each window firing. */ public abstract void process(

KEY key,Context context,Iterable<IN> elements,Collector<OUT> out);

/** Called when a window is being purged. */public void clear(Context context) throws Exception {}

}

Context (API)

1

5

public abstract class Context implements java.io.Serializable {public abstract W window();

public abstract long currentProcessingTime();public abstract long currentWatermark();

public abstract KeyedStateStore windowState();public abstract KeyedStateStore globalState();

}

Per-Window vs Global state

Both per-window and global state are still tied to

a key

Per-window state is additionally scoped to the

key of the processed window

Global state is the same for all windows

1

6

Example – Differential Window

1

7

private static class DifferentialWindowFunctionextends ProcessWindowFunction<Long, Tuple2<Long, Long>, String, TimeWindow> {

private final static ValueStateDescriptor<Long> previousFiringState =new ValueStateDescriptor<>("previous-firing", LongSerializer.INSTANCE);

private final static ReducingStateDescriptor<Long> firingCounterState =new ReducingStateDescriptor<>(

"firing-counter", new Sum(), LongSerializer.INSTANCE);

@Overridepublic void process(…) { … }

}

Example – Differential Window

1

8

@Overridepublic void process(

String key,Context context, Iterable<Long> values, Collector<Tuple2<Long, Long>> out) {

ValueState<Long> previousFiring =context.windowState().getState(previousFiringState);

ReducingState<Long> firingCounter =context.windowState().getState(firingCounterState);

Long output = Iterables.getOnlyElement(values);if (firingCounter.get() == null) {// first firingout.collect(Tuple2.of(0L, output));

} else {// subsequent firingout.collect(Tuple2.of(firingCounter.get(), output - previousFiring.value()));

} firingCounter.add(1L);previousFiring.update(output);

}

Example – Differential Window

1

9

@Overridepublic void clear(Context context) {

ValueState<Long> previousFiring =context.windowState().getState(previousFiringState);

ReducingState<Long> firingCounter =context.windowState().getState(firingCounterState);

previousFiring.clear();firingCounter.clear();

}

Window anti-patterns

Using an Evictor prevents pre-aggregation

A sliding window 1 day long with a slide of 1

minute will make 24x60 copies of every

element – so don’t do that!

20

top related