flink forward sf 2017: kenneth knowles - back to sessions overview
TRANSCRIPT
![Page 1: Flink Forward SF 2017: Kenneth Knowles - Back to Sessions overview](https://reader034.vdocuments.us/reader034/viewer/2022042723/58f9a8de760da3da068b68a5/html5/thumbnails/1.jpg)
Portable stateful big data processing in Apache Beam
Kenneth Knowles
Apache Beam PMCSoftware Engineer @ Google
[email protected] / @KennKnowles https://s.apache.org/ffsf-2017-beam-stateFlink Forward San Francisco 2017
![Page 2: Flink Forward SF 2017: Kenneth Knowles - Back to Sessions overview](https://reader034.vdocuments.us/reader034/viewer/2022042723/58f9a8de760da3da068b68a5/html5/thumbnails/2.jpg)
Agenda1. What is Apache Beam?
2. State
3. Timers
4. Example & Little Demo
![Page 3: Flink Forward SF 2017: Kenneth Knowles - Back to Sessions overview](https://reader034.vdocuments.us/reader034/viewer/2022042723/58f9a8de760da3da068b68a5/html5/thumbnails/3.jpg)
What isApache Beam?
![Page 4: Flink Forward SF 2017: Kenneth Knowles - Back to Sessions overview](https://reader034.vdocuments.us/reader034/viewer/2022042723/58f9a8de760da3da068b68a5/html5/thumbnails/4.jpg)
TL;DR
4(Flink draws it more like this)
![Page 5: Flink Forward SF 2017: Kenneth Knowles - Back to Sessions overview](https://reader034.vdocuments.us/reader034/viewer/2022042723/58f9a8de760da3da068b68a5/html5/thumbnails/5.jpg)
20142004 2006 2008 2010 2012 20162005 2007 2009 2013 20152011
MapReduce(paper)
Apache Hadoop
Dataflow Model(paper)
MillWheel(paper)
Heron
ApacheSpark
ApacheStorm
Apache Gearpump
(incubating)Apache
Apex
Apache Flink
Cloud Dataflow
FlumeJava(paper)
Apache Beam
DAGs, DAGs, DAGs
Apache Samza
![Page 6: Flink Forward SF 2017: Kenneth Knowles - Back to Sessions overview](https://reader034.vdocuments.us/reader034/viewer/2022042723/58f9a8de760da3da068b68a5/html5/thumbnails/6.jpg)
The Beam Vision
Sum Per Key
6
input.apply(
Sum.integersPerKey())
Java
input | Sum.PerKey()
Python
⋮
Apache Flinklocal, on-prem,
cloud
Apache Sparklocal, on-prem,
cloud
Cloud Dataflow: fully managed
⋮
Apache Apexlocal, on-prem,
cloud
Apache Gearpump
(incubating)
![Page 7: Flink Forward SF 2017: Kenneth Knowles - Back to Sessions overview](https://reader034.vdocuments.us/reader034/viewer/2022042723/58f9a8de760da3da068b68a5/html5/thumbnails/7.jpg)
The Beam Vision
KafkaIO
7
input | KakaIO.read()
Python
⋮
class KafkaIO extends
UnboundedSource { … }
Java
Apache Flinklocal, on-prem,
cloud
Apache Sparklocal, on-prem,
cloud
Cloud Dataflow: fully managed
⋮
Apache Apexlocal, on-prem,
cloud
Apache Gearpump
(incubating)
![Page 8: Flink Forward SF 2017: Kenneth Knowles - Back to Sessions overview](https://reader034.vdocuments.us/reader034/viewer/2022042723/58f9a8de760da3da068b68a5/html5/thumbnails/8.jpg)
The Beam Model
Pipeline
8
PTransform
PCollection(bounded or unbounded)
![Page 9: Flink Forward SF 2017: Kenneth Knowles - Back to Sessions overview](https://reader034.vdocuments.us/reader034/viewer/2022042723/58f9a8de760da3da068b68a5/html5/thumbnails/9.jpg)
The Beam Model
What are you computing? (read, map, reduce)
Where in event time? (event time windowing)
9
When in processing time are results produced? (triggers)
How do refinements relate? (accumulation mode)
![Page 10: Flink Forward SF 2017: Kenneth Knowles - Back to Sessions overview](https://reader034.vdocuments.us/reader034/viewer/2022042723/58f9a8de760da3da068b68a5/html5/thumbnails/10.jpg)
What are you computing?
10
ReadParallel connectors to
external systems
ParDoPer element
"Map"
GroupingGroup by key,
Combine per key,"Reduce"
CompositeEncapsulated
subgraph
![Page 11: Flink Forward SF 2017: Kenneth Knowles - Back to Sessions overview](https://reader034.vdocuments.us/reader034/viewer/2022042723/58f9a8de760da3da068b68a5/html5/thumbnails/11.jpg)
State
![Page 12: Flink Forward SF 2017: Kenneth Knowles - Back to Sessions overview](https://reader034.vdocuments.us/reader034/viewer/2022042723/58f9a8de760da3da068b68a5/html5/thumbnails/12.jpg)
What are you computing?
12
ReadParallel connectors to
external systems
ParDoPer element
"Map"
GroupingGroup by key,
Combine per key,"Reduce"
CompositeEncapsulated
subgraph
![Page 13: Flink Forward SF 2017: Kenneth Knowles - Back to Sessions overview](https://reader034.vdocuments.us/reader034/viewer/2022042723/58f9a8de760da3da068b68a5/html5/thumbnails/13.jpg)
State for ParDo
13
ParDo(DoFn)
![Page 14: Flink Forward SF 2017: Kenneth Knowles - Back to Sessions overview](https://reader034.vdocuments.us/reader034/viewer/2022042723/58f9a8de760da3da068b68a5/html5/thumbnails/14.jpg)
"Example"
14
Per Key Quantiles
ParDo.of(new DoFn<...>() {
// declare some state
@ProcessElement
public void process(...) {
// update quantiles
// output if needed
}
})
![Page 15: Flink Forward SF 2017: Kenneth Knowles - Back to Sessions overview](https://reader034.vdocuments.us/reader034/viewer/2022042723/58f9a8de760da3da068b68a5/html5/thumbnails/15.jpg)
Partitioned
15
Quantiles Quantiles Quantiles
![Page 16: Flink Forward SF 2017: Kenneth Knowles - Back to Sessions overview](https://reader034.vdocuments.us/reader034/viewer/2022042723/58f9a8de760da3da068b68a5/html5/thumbnails/16.jpg)
Parallelism!
16
QuantilesDoFn
QuantilesDoFn
QuantilesDoFn
new DoFn<Foo, Quantiles<Foo>>() {
@StateId("quantiles")
private final StateSpec<...>
quantilesState = StateSpecs.combining();
…}
![Page 17: Flink Forward SF 2017: Kenneth Knowles - Back to Sessions overview](https://reader034.vdocuments.us/reader034/viewer/2022042723/58f9a8de760da3da068b68a5/html5/thumbnails/17.jpg)
Windowed
17
QuantilesDoFn
QuantilesDoFn
QuantilesDoFn
Window into Fixed windows of one hour
Expected result: Quantiles for each hour
![Page 18: Flink Forward SF 2017: Kenneth Knowles - Back to Sessions overview](https://reader034.vdocuments.us/reader034/viewer/2022042723/58f9a8de760da3da068b68a5/html5/thumbnails/18.jpg)
Windowed
18
QuantilesDoFn
QuantilesDoFn
QuantilesDoFn
Window into windows of 30 min sliding by 10 min
Expected result: Quantiles for 30 minutes sliding by 10 min
![Page 19: Flink Forward SF 2017: Kenneth Knowles - Back to Sessions overview](https://reader034.vdocuments.us/reader034/viewer/2022042723/58f9a8de760da3da068b68a5/html5/thumbnails/19.jpg)
<k, w>1
<k, w>2
<k, w>3
...
"x" 3 7 15
"y" "fizz" "7" "fizzbuzz"
...
State is per key and window
Bonus: automatically garbage collected when a window expires(vs manual clearing of keyed state)
![Page 20: Flink Forward SF 2017: Kenneth Knowles - Back to Sessions overview](https://reader034.vdocuments.us/reader034/viewer/2022042723/58f9a8de760da3da068b68a5/html5/thumbnails/20.jpg)
Windowed
20
QuantilesDoFn
QuantilesDoFn
QuantilesDoFn
Window into Fixed windows of one hour
Expected result: Quantiles for each hour
![Page 21: Flink Forward SF 2017: Kenneth Knowles - Back to Sessions overview](https://reader034.vdocuments.us/reader034/viewer/2022042723/58f9a8de760da3da068b68a5/html5/thumbnails/21.jpg)
What about Combine?
21
QuantilesCombiner
Window into Fixed windows of one hour
Expected result: Quantiles for each hour
QuantilesCombiner
QuantilesCombiner
![Page 22: Flink Forward SF 2017: Kenneth Knowles - Back to Sessions overview](https://reader034.vdocuments.us/reader034/viewer/2022042723/58f9a8de760da3da068b68a5/html5/thumbnails/22.jpg)
Combine vs State (naive conceptualization)
22
Window hourly
Expected result:Quantiles for each hour
QuantilesCombiner
Window hourly
QuantilesDoFn
Expected result:Quantiles for each hour
![Page 23: Flink Forward SF 2017: Kenneth Knowles - Back to Sessions overview](https://reader034.vdocuments.us/reader034/viewer/2022042723/58f9a8de760da3da068b68a5/html5/thumbnails/23.jpg)
Combine vs State (likely execution plan)
Quantiles Combiner
Quantiles Combiner
Quantiles Combiner
Shuffle Accumulators
QuantilesDoFn
Shuffle Elements
associativecommutativesingle-outputenables optimizations(engine is in control)
non-associative*non-commutative*
multi-outputside outputs(user is in control)
![Page 24: Flink Forward SF 2017: Kenneth Knowles - Back to Sessions overview](https://reader034.vdocuments.us/reader034/viewer/2022042723/58f9a8de760da3da068b68a5/html5/thumbnails/24.jpg)
Combine vs State
Quantiles Combiner Quantiles
DoFn
output governed by trigger
(data/computation unaware) "output only when there's an interesting change"
(data/computation aware)
![Page 25: Flink Forward SF 2017: Kenneth Knowles - Back to Sessions overview](https://reader034.vdocuments.us/reader034/viewer/2022042723/58f9a8de760da3da068b68a5/html5/thumbnails/25.jpg)
Example: Arbitrary indices
D,0
A
Assign indices
B
C
D
E
C,1
A,2
B,3
E,4
non-associative
non-commutative
non-deterministic
and totally fine!
![Page 26: Flink Forward SF 2017: Kenneth Knowles - Back to Sessions overview](https://reader034.vdocuments.us/reader034/viewer/2022042723/58f9a8de760da3da068b68a5/html5/thumbnails/26.jpg)
Kinds of state
● Value - just a mutable cell for a value
● Bag - supports "blind writes"
● Combining - has a CombineFn built in; can support blind writes and lazy accumulation
● Set - membership checking
● Map - lookups and partial writes
![Page 27: Flink Forward SF 2017: Kenneth Knowles - Back to Sessions overview](https://reader034.vdocuments.us/reader034/viewer/2022042723/58f9a8de760da3da068b68a5/html5/thumbnails/27.jpg)
Timers
![Page 28: Flink Forward SF 2017: Kenneth Knowles - Back to Sessions overview](https://reader034.vdocuments.us/reader034/viewer/2022042723/58f9a8de760da3da068b68a5/html5/thumbnails/28.jpg)
Timers for ParDo
28
Stateful ParDo
new DoFn<...>() {
@TimerId("timeout")
private final TimerSpec timeoutTimer =
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
@OnTimer("timout")
public void timeout(...) {
// access state, set new timers, output
}
…}
![Page 29: Flink Forward SF 2017: Kenneth Knowles - Back to Sessions overview](https://reader034.vdocuments.us/reader034/viewer/2022042723/58f9a8de760da3da068b68a5/html5/thumbnails/29.jpg)
Timers in processing time
"call me back in 5 seconds"
output based only on incoming element
output return value of batched RPC
buffer request
batched RPC
On Timer
On Element
![Page 30: Flink Forward SF 2017: Kenneth Knowles - Back to Sessions overview](https://reader034.vdocuments.us/reader034/viewer/2022042723/58f9a8de760da3da068b68a5/html5/thumbnails/30.jpg)
Timers in event time
"call me back when the watermark hits the end of the window"
output speculative result immediately
output replacement value only if changed
store speculative result
On Timer
On Element
![Page 31: Flink Forward SF 2017: Kenneth Knowles - Back to Sessions overview](https://reader034.vdocuments.us/reader034/viewer/2022042723/58f9a8de760da3da068b68a5/html5/thumbnails/31.jpg)
● Per-key arbitrary numbering
● Output only when result changes
● Tighter "side input" management for slowly changing dimension
● Streaming join-matrix / join-biclique
● Fine-grained combine aggregation and output control
● Per-key "workflows" like user sign up flow w/ expiration
● Low-latency deduplication (let the first through, squash the rest)
More example uses for state & timers
![Page 32: Flink Forward SF 2017: Kenneth Knowles - Back to Sessions overview](https://reader034.vdocuments.us/reader034/viewer/2022042723/58f9a8de760da3da068b68a5/html5/thumbnails/32.jpg)
Performance considerations (cross-runner)
32
● Shuffle to colocate keys
● Linear processing of elements for key+window
● Window merging
● Storage of state and timers
● GC of state
![Page 33: Flink Forward SF 2017: Kenneth Knowles - Back to Sessions overview](https://reader034.vdocuments.us/reader034/viewer/2022042723/58f9a8de760da3da068b68a5/html5/thumbnails/33.jpg)
Demo
33
![Page 34: Flink Forward SF 2017: Kenneth Knowles - Back to Sessions overview](https://reader034.vdocuments.us/reader034/viewer/2022042723/58f9a8de760da3da068b68a5/html5/thumbnails/34.jpg)
SummaryState and Timers in Beam...
● … unlock new uses cases
● … they "just work" with event time windowing
● … are portable across runners (implementation ongoing)
![Page 35: Flink Forward SF 2017: Kenneth Knowles - Back to Sessions overview](https://reader034.vdocuments.us/reader034/viewer/2022042723/58f9a8de760da3da068b68a5/html5/thumbnails/35.jpg)
Thank you for listening!This talk:
● Me - @KennKnowles● These Slides - https://s.apache.org/ffsf-2017-beam-state
Go Deeper● Design doc - https://s.apache.org/beam-state● Blog post - https://beam.apache.org/blog/2017/02/13/stateful-processing.html
Join the Beam community:● User discussions - [email protected]● Development discussions - [email protected]● Follow @ApacheBeam on Twitter
You can contribute to Beam + Flink● New types of state● Easy launch of Beam job on Flink-on-YARN● Integration tests at scale● Fit and finish: polish, polish, polish!● … and lots more!
https://beam.apache.org