apache flink training - async io
TRANSCRIPT
1
Apache Flink® Training
Flink v1.3 – 9.9.2017
DataStream API
Async IO
Stream Enrichment Pattern
For each incoming element:
• extract some info from the element (e.g. key)
• use key to somehow enrich element with related info
• emit an enriched version of the input element
2
Financial Trading Example
Goal: enrich a stream of trades with customer info
Customer data lives in an external service/database
3
Enrichment via external DB
4
Source(1)
Source(2)
mapkeyBy(1)
mapkeyBy(2)
trades
trades
map(1)
map(2)
CustomerData
Straightforward Solution
Query the service/DB in a MapFunction
Problems:
• Slow (synchronous requests) and/or
• Poor resource utilization (many parallel blocking
tasks)
5
Synchronous Access
6
Synchronous Access
7
Communication delay can dominate application
throughput and latency
Asynchronous Access
8
Flink 1.2 added AsyncFunction
Requirement:
• a client that supports asynchronous requests
The API handles:
• integration with DataStream API
• fault-tolerance
• order of emitted elements
• time semantics (event/processing time)
9
AsyncFunction
Simple API:/**
* Trigger async operation for each stream input.
*/
void asyncInvoke(IN input, AsyncCollector<OUT> collector) throws Exception;
API call:/**
* Example async function call.
*/
DataStream<...> result = AsyncDataStream.(un)orderedWait(stream,
new MyAsyncFunction(), 1000, TimeUnit.MILLISECONDS, 100);
10
AsyncFunction
11
Emitter
P2P3 P1P4
AsyncWaitOperator
E5
AsyncWaitOperator:• a queue of “Promises”• a separate thread (Emitter)
AsyncFunction
12
Emitter
P2P3 P1P4
AsyncWaitOperator• Wrap E5 in a “promise” P5
• Put P5 in the queue• Call asyncInvoke(E5, P5)
E5
P5
asyncInvoke(E5, P5)P5
AsyncFunction
13
Emitter
P2P3 P1P4
AsyncWaitOperator
E5
P5
asyncInvoke(E5, P5)P5
asyncInvoke(value, asyncCollector):• a user-defined function• value : the input element• asyncCollector : the collector of the result
(when the query returns)
AsyncFunction
14
Emitter
P2P3 P1P4
AsyncWaitOperator
E5
P5
asyncInvoke(E5, P5)P5
asyncInvoke(value, asyncCollector):• a user-defined function• value : the input element• asyncCollector : the collector of the result
(when the query returns)
Future<String> future = client.query(E5);
future.thenAccept((String result) -> { P5.collect(
Collections.singleton(new Tuple2<>(E5, result)));
});
AsyncFunction
15
Emitter
P2P3 P1P4
AsyncWaitOperator
E5
P5
asyncInvoke(E5, P5)P5
Emitter:• separate thread• polls queue for completed
promises (blocking)• emits elements downstream
AsyncFunction
our asyncFunction
a timeout: max time until considered failed
capacity: max number of in-flight requests16
DataStream<Tuple2<String, String>> result =
AsyncDataStream.(un)orderedWait(stream,
new MyAsyncFunction(),
1000, TimeUnit.MILLISECONDS,
100);
AsyncFunction
17
DataStream<Tuple2<String, String>> result =
AsyncDataStream.(un)orderedWait(stream,
new MyAsyncFunction(),
1000, TimeUnit.MILLISECONDS,
100);
AsyncFunction
18
DataStream<Tuple2<String, String>> result =
AsyncDataStream.(un)orderedWait(stream,
new MyAsyncFunction(),
1000, TimeUnit.MILLISECONDS,
100);
P2P3 P1P4E2E3 E1E4
Ideally... Emitter
AsyncFunction
19
DataStream<Tuple2<String, String>> result =
AsyncDataStream.unorderedWait(stream,
new MyAsyncFunction(),
1000, TimeUnit.MILLISECONDS,
100);
P2P3 P1P4E2E3 E1E4
Realistically...Emitter
...output ordered based on which request finished first
AsyncFunction
unorderedWait: emit results in order of completion
orderedWait: emit results in order of arrival
Always: watermarks never overpass elements and vice versa
20
P2P3 P1P4E2E3 E1E4
Emitter