![Page 1: Apache Flink Training: DataStream API Part 2 Advanced](https://reader035.vdocuments.us/reader035/viewer/2022062316/5871384b1a28abf0568b631b/html5/thumbnails/1.jpg)
Apache Flink® TrainingDataStream API Advanced
August 26, 2015
![Page 2: Apache Flink Training: DataStream API Part 2 Advanced](https://reader035.vdocuments.us/reader035/viewer/2022062316/5871384b1a28abf0568b631b/html5/thumbnails/2.jpg)
2
Type System and KeysWhat kind of data can Flink handle?
Note: Identical to DataSet API
![Page 3: Apache Flink Training: DataStream API Part 2 Advanced](https://reader035.vdocuments.us/reader035/viewer/2022062316/5871384b1a28abf0568b631b/html5/thumbnails/3.jpg)
3
Apache Flink’s Type System
Flink aims to support all data types• Ease of programming• Seamless integration with existing code
Programs are analyzed before execution• Used data types are identified• Serializer & comparator are configured
![Page 4: Apache Flink Training: DataStream API Part 2 Advanced](https://reader035.vdocuments.us/reader035/viewer/2022062316/5871384b1a28abf0568b631b/html5/thumbnails/4.jpg)
4
Apache Flink’s Type System
Data types are either• Atomic types (like Java Primitives)• Composite types (like Flink Tuples)
Composite types nest other types
Not all data types can be used as keys!• Flink partitions DataStreams on keys• Key types must be comparable
![Page 5: Apache Flink Training: DataStream API Part 2 Advanced](https://reader035.vdocuments.us/reader035/viewer/2022062316/5871384b1a28abf0568b631b/html5/thumbnails/5.jpg)
5
Atomic TypesFlink Type Java Type Can be used as key?
BasicType Java Primitives (Integer, String, …)
Yes
ArrayType Arrays of Java primitives or objects
No (Yes as of 0.10)
WritableType Implements Hadoop’sWritable interface
Yes, if implementsWritableComparable
GenericType Any other type Yes, if implements Comparable
![Page 6: Apache Flink Training: DataStream API Part 2 Advanced](https://reader035.vdocuments.us/reader035/viewer/2022062316/5871384b1a28abf0568b631b/html5/thumbnails/6.jpg)
Composite Types Are composed of fields with other
types• Fields types can be atomic or composite
Fields can be addressed as keys• Field type must be a key type!
A composite type can be a key type • All field types must be key types!
6
![Page 7: Apache Flink Training: DataStream API Part 2 Advanced](https://reader035.vdocuments.us/reader035/viewer/2022062316/5871384b1a28abf0568b631b/html5/thumbnails/7.jpg)
TupleType Java:
org.apache.flink.api.java.tuple.Tuple1 to Tuple25 Scala:
use default Scala tuples (1 to 22 fields)
Tuple fields are typed
Tuple3<Integer, String, Double> t3 = new Tuple3<>(1, “2”, 3.0);
val t3: (Int, String, Double) = (1, ”2”, 3.0)
Tuples give the best performance7
![Page 8: Apache Flink Training: DataStream API Part 2 Advanced](https://reader035.vdocuments.us/reader035/viewer/2022062316/5871384b1a28abf0568b631b/html5/thumbnails/8.jpg)
TupleType Define keys by field positionDataStream<Tuple3<Integer, String, Double>> d = …// group on String fieldd.groupBy(1);
Or field names// group on Double fieldd.groupBy(“f2”);
8
![Page 9: Apache Flink Training: DataStream API Part 2 Advanced](https://reader035.vdocuments.us/reader035/viewer/2022062316/5871384b1a28abf0568b631b/html5/thumbnails/9.jpg)
PojoType Any Java class that• Has an empty default constructor• Has publicly accessible fields
(public field or default getter & setter)
public class Person { public int id; public String name; public Person() {}; public Person(int id, String name) {…};}
DataStream<Person> p = env.fromElements(new Person(1, ”Bob”)); 9
![Page 10: Apache Flink Training: DataStream API Part 2 Advanced](https://reader035.vdocuments.us/reader035/viewer/2022062316/5871384b1a28abf0568b631b/html5/thumbnails/10.jpg)
PojoType Define keys by field nameDataStream<Person> p = …// group on “name” fieldd.groupBy(“name”);
10
![Page 11: Apache Flink Training: DataStream API Part 2 Advanced](https://reader035.vdocuments.us/reader035/viewer/2022062316/5871384b1a28abf0568b631b/html5/thumbnails/11.jpg)
Scala CaseClasses Scala case classes are natively
supportedcase class Person(id: Int, name: String)d: DataStream[Person] =
env.fromElements(Person(1, “Bob”)
Define keys by field name// use field “name” as keyd.groupBy(“name”)
11
![Page 12: Apache Flink Training: DataStream API Part 2 Advanced](https://reader035.vdocuments.us/reader035/viewer/2022062316/5871384b1a28abf0568b631b/html5/thumbnails/12.jpg)
Composite & Nested KeysDataStream<Tuple3<String, Person, Double>> d;
Composite keys are supported// group on both long fieldsd.groupBy(0, 1);
Nested fields can be used as types// group on nested “name” fieldd.groupBy(“f1.name”);
Full types can be used as key using “*” wildcard// group on complete nested Pojo fieldd.groupBy(“f1.*”);
• “*” wildcard can also be used for atomic types 12
![Page 13: Apache Flink Training: DataStream API Part 2 Advanced](https://reader035.vdocuments.us/reader035/viewer/2022062316/5871384b1a28abf0568b631b/html5/thumbnails/13.jpg)
KeySelectors Keys can be computed using
KeySelectors
public class SumKeySelector implements KeySelector<Tuple2<Long, Long>, Long> {
public Long getKey(Tuple2<Long, Long> t) { return t.f0 + t.f1; }}
DataStream<Tuple2<Long,Long>> d = …d.groupBy(new SumKeySelector());
13
![Page 14: Apache Flink Training: DataStream API Part 2 Advanced](https://reader035.vdocuments.us/reader035/viewer/2022062316/5871384b1a28abf0568b631b/html5/thumbnails/14.jpg)
Windows and Aggregates
14
![Page 15: Apache Flink Training: DataStream API Part 2 Advanced](https://reader035.vdocuments.us/reader035/viewer/2022062316/5871384b1a28abf0568b631b/html5/thumbnails/15.jpg)
Windows Aggregations on DataStreams are different
from aggregations on DataSets• e.g., it is not possible to count all elements of a
DataStream – they are infinite
DataStream aggregations make sense on windowed streams• i.e., a window of the "latest" elements of a stream
Windows can be defined on grouped and partitioned streams
15
![Page 16: Apache Flink Training: DataStream API Part 2 Advanced](https://reader035.vdocuments.us/reader035/viewer/2022062316/5871384b1a28abf0568b631b/html5/thumbnails/16.jpg)
Windows (2)
16
// (name, age) of passengersDataStream<Tuple2<String, Integer>> passengers = …
// group by second field (age) and keep last 1 minute// worth of data sliding the window every 10 secondspassengers
.groupBy(1)
.window(Time.of(1, TimeUnit.MINUTES))
.every(Time.of(10, TimeUnit.SECONDS))
![Page 17: Apache Flink Training: DataStream API Part 2 Advanced](https://reader035.vdocuments.us/reader035/viewer/2022062316/5871384b1a28abf0568b631b/html5/thumbnails/17.jpg)
Types of Windows Tumbling time window
• .window(Time.of(1, TimeUnit.MINUTES))
Sliding time window• .window(Time.of(60, TimeUnit.SECONDS)) .every(Time.of(10, TimeUnit.SECONDS))
Count-based sliding window• .window(Count.of(1000))
.every(Count.of(10))
17
![Page 18: Apache Flink Training: DataStream API Part 2 Advanced](https://reader035.vdocuments.us/reader035/viewer/2022062316/5871384b1a28abf0568b631b/html5/thumbnails/18.jpg)
Aggregations on Windowed Streams
18
// (name, age) of passengersDataStream<Tuple2<String, Integer>> passengers = …
// group by second field (age) and keep last 1 minute// worth of data sliding the window every 10 seconds// count passengersemployees
.groupBy(1)
.window(Time.of(1, TimeUnit.MINUTES))
.every(Time.of(10, TimeUnit.SECONDS))
.mapWindow(new CountSameAge());
Warning: 0.9 -> 0.10 mapWindow becomes apply
![Page 19: Apache Flink Training: DataStream API Part 2 Advanced](https://reader035.vdocuments.us/reader035/viewer/2022062316/5871384b1a28abf0568b631b/html5/thumbnails/19.jpg)
MapWindowpublic static class CountSameAge implements WindowMapFunction<Tuple2<String, Integer>, Tuple2<Integer, Integer>> {
@Overridepublic void mapWindow(Iterable<Tuple2<String, Integer>> persons,
Collector<Tuple2<Integer, Integer>> out) {
Integer ageGroup = 0;Integer countsInGroup = 0;
for (Tuple2<String, Integer> person : persons) {ageGroup = person.f1;countsInGroup++;
}
out.collect(new Tuple2<>(ageGroup, countsInGroup));}
}
19Warning: 0.9 -> 0.10 WindowMapFunction becomes WindowFunction
![Page 20: Apache Flink Training: DataStream API Part 2 Advanced](https://reader035.vdocuments.us/reader035/viewer/2022062316/5871384b1a28abf0568b631b/html5/thumbnails/20.jpg)
Operations on Windowed Streams
mapWindow• Do something over the whole window
reduceWindow• Apply a functional reduce function to the
window Aggregates: sum, min, max, and
others flatten• Get back a regular DataStream
20
Warning: 0.9 -> 0.10 mapWindow becomes applyreduceWindow becomes reduce
![Page 21: Apache Flink Training: DataStream API Part 2 Advanced](https://reader035.vdocuments.us/reader035/viewer/2022062316/5871384b1a28abf0568b631b/html5/thumbnails/21.jpg)
Working With Multiple Streams
21
![Page 22: Apache Flink Training: DataStream API Part 2 Advanced](https://reader035.vdocuments.us/reader035/viewer/2022062316/5871384b1a28abf0568b631b/html5/thumbnails/22.jpg)
22
Connecting Streams
Sometimes several DataStreams need to be correlated with each other and share state
You can connect or join two DataStreams
DataStream<String> strings = …DataStream<Integer> ints = …
// Create a ConnectedDataStreamstrings.connect(ints);
![Page 23: Apache Flink Training: DataStream API Part 2 Advanced](https://reader035.vdocuments.us/reader035/viewer/2022062316/5871384b1a28abf0568b631b/html5/thumbnails/23.jpg)
Map on Connected Streams
23
DataStream<String> strings = …DataStream<Integer> ints = …
// Create a ConnectedDataStreamstrings.connect(ints)
.map(new CoMapFunction<Integer,String,Boolean> {@Overridepublic Boolean map1 (Integer value) {return true;}@Overridepublic Boolean map2 (String value) {return false;}
});
![Page 24: Apache Flink Training: DataStream API Part 2 Advanced](https://reader035.vdocuments.us/reader035/viewer/2022062316/5871384b1a28abf0568b631b/html5/thumbnails/24.jpg)
FlatMap on Connected Streams
24
DataStream<String> strings = …DataStream<Integer> ints = …
// Create a ConnectedDataStreamstrings.connect(ints)
.flatMap(new CoFlatMapFunction<Integer,String,String> {@Overridepublic void flatMap1 (Integer value, Collector<String> out) {out.collect(value.toString());}@Overridepublic void flatMap2 (String value, Collector<String> out) {for (String word: value.split(" ")) {out.collect(word)}}
});
![Page 25: Apache Flink Training: DataStream API Part 2 Advanced](https://reader035.vdocuments.us/reader035/viewer/2022062316/5871384b1a28abf0568b631b/html5/thumbnails/25.jpg)
Rich functions and state
25
![Page 26: Apache Flink Training: DataStream API Part 2 Advanced](https://reader035.vdocuments.us/reader035/viewer/2022062316/5871384b1a28abf0568b631b/html5/thumbnails/26.jpg)
26
RichFunctions Function interfaces have only one method• Single abstract method (SAM)• Support for Java8 Lambda functions
There is a “Rich” variant for each function.• RichFlatMapFunction, …• Additional methods
• open(Configuration c)• close()• getRuntimeContext()
![Page 27: Apache Flink Training: DataStream API Part 2 Advanced](https://reader035.vdocuments.us/reader035/viewer/2022062316/5871384b1a28abf0568b631b/html5/thumbnails/27.jpg)
27
RichFunctions & RuntimeContext
RuntimeContext has useful methods:• getIndexOfThisSubtask ()• getNumberOfParallelSubtasks()• getExecutionConfig()
Give access to partitioned state
flink.apache.org
![Page 28: Apache Flink Training: DataStream API Part 2 Advanced](https://reader035.vdocuments.us/reader035/viewer/2022062316/5871384b1a28abf0568b631b/html5/thumbnails/28.jpg)
28
Stateful Computations All DataStream transformations can be stateful
• State is mutable and lives as long as the streaming job is running
• State is recovered with exactly-once semantics by Flink after a failure
You can define two kinds of state• Local state: each parallel task can register some local
variables to take part in Flink’s checkpointing• Partitioned by key state: an operator on a partitioned
by key stream can access and update state corresponding to its key
• Partitioned state will be available in Flink 0.10
![Page 29: Apache Flink Training: DataStream API Part 2 Advanced](https://reader035.vdocuments.us/reader035/viewer/2022062316/5871384b1a28abf0568b631b/html5/thumbnails/29.jpg)
Defining Local State
29
DataSet<String> aStream;DataStream<Long> lengths = aStream.map (new MapWithCounter());
public static class MapWithCounter implements MapFunction<String,Long>, Checkpointed<Long> {
private long totalLength = 0;
@Overridepublic Long map (String value) {
totalLength += value.length();8return (Long) value.length();}
@Overridepublic Serializable snapshotState(long checkpointId,long checkpointTimestamp) throws Exception {return new Long (totalLength);}
@Overridepublic void restoreState (Serializable state) throws Exception {totalLength = (Long) state;}
}
![Page 30: Apache Flink Training: DataStream API Part 2 Advanced](https://reader035.vdocuments.us/reader035/viewer/2022062316/5871384b1a28abf0568b631b/html5/thumbnails/30.jpg)
Defining Partitioned State
30
DataSet<Tuple2<String,String>> aStream;DataStream<Long> lengths = aStream.groupBy(0).map (new MapWithCounter());
public static class MapWithCounter implements RichMapFunction<Tuple2<String,String>,Long> {
private OperatorState<Long> totalLengthByKey;
@Overridepublic Long map (Tuple2<String,String> value) {totalLengthByKey.update(totalLengthByKey.update.value() + 1);return (Long) value.f1.length();}
@Overridepublic void open (Configuration conf) {totalLengthByKey = getRuntimeContext().getOperatorState("totalLengthByKey", 0L, true);}
} Note: Will be available in Flink 0.10
![Page 31: Apache Flink Training: DataStream API Part 2 Advanced](https://reader035.vdocuments.us/reader035/viewer/2022062316/5871384b1a28abf0568b631b/html5/thumbnails/31.jpg)
Connecting to Apache Kafka
31
![Page 32: Apache Flink Training: DataStream API Part 2 Advanced](https://reader035.vdocuments.us/reader035/viewer/2022062316/5871384b1a28abf0568b631b/html5/thumbnails/32.jpg)
32
Kafka and Flink “Apache Kafka is a distributed, partitioned, replicated
commit log service”
Kafka uses Apache Zookeeper for coordination
Kafka maintains feeds of messages in categories called topics
A Kafka topic can be read by Flink to produce a DataStream, and a DataStream can be written to a Kafka topic
Flink coordinates with Kafka to provide recovery in the case of failures
![Page 33: Apache Flink Training: DataStream API Part 2 Advanced](https://reader035.vdocuments.us/reader035/viewer/2022062316/5871384b1a28abf0568b631b/html5/thumbnails/33.jpg)
Reading Data from Kafka Enable checkpointing
E.g., env.enableCheckpointing(5000);
Add a DataStream source from a Kafka topic
Properties props = new Properties();props.setProperty("zookeeper.connect", “localhost:2181”);props.setProperty("bootstrap.servers", “localhost:9092”);props.setProperty("group.id", “myGroup”);
// create a data sourceDataStream<TaxiRide> rides = env.addSource(
new FlinkKafkaConsumer082<TaxiRide>( “myTopic”, new TaxiRideSchema(), props)
);33
![Page 34: Apache Flink Training: DataStream API Part 2 Advanced](https://reader035.vdocuments.us/reader035/viewer/2022062316/5871384b1a28abf0568b631b/html5/thumbnails/34.jpg)
Writing Data to Kafka Add a Kafka sink to a DataStream by
providing• The broker address• The topic name• A serialization schemaDataStream<String> aStream = …aStream.addSink(new KafkaSink<String>(“localhost:9092”, // default local broker“myTopic”, new SimpleStringSchema));
34
![Page 35: Apache Flink Training: DataStream API Part 2 Advanced](https://reader035.vdocuments.us/reader035/viewer/2022062316/5871384b1a28abf0568b631b/html5/thumbnails/35.jpg)
35
More API Features
![Page 36: Apache Flink Training: DataStream API Part 2 Advanced](https://reader035.vdocuments.us/reader035/viewer/2022062316/5871384b1a28abf0568b631b/html5/thumbnails/36.jpg)
36
Not Covered Here Iterations (feedback edges)• Very useful for Machine Learning
More transformations• union, join, ...
![Page 37: Apache Flink Training: DataStream API Part 2 Advanced](https://reader035.vdocuments.us/reader035/viewer/2022062316/5871384b1a28abf0568b631b/html5/thumbnails/37.jpg)
37