![Page 1: Apache Flink Training - DataStream API - Connectors](https://reader035.vdocuments.us/reader035/viewer/2022062302/5871383c1a28abf0568b62f1/html5/thumbnails/1.jpg)
DataStream API
Connectors
Apache Flink® Training
Flink v1.2.0 – 27.02.2017
![Page 2: Apache Flink Training - DataStream API - Connectors](https://reader035.vdocuments.us/reader035/viewer/2022062302/5871383c1a28abf0568b62f1/html5/thumbnails/2.jpg)
Streaming Connectors▪Basic data sources
• Collections• Sockets• Filesystem
▪Twitter Stream (source)▪Queuing systems (sources and sinks)
• Apache Kafka• Amazon Kinesis• RabbitMQ• Apache NiFi
▪Data stores (sinks)• Rolling files (HDFS, S3, …)• Elasticsearch• Cassandra
▪ Custom connectors
2
![Page 3: Apache Flink Training - DataStream API - Connectors](https://reader035.vdocuments.us/reader035/viewer/2022062302/5871383c1a28abf0568b62f1/html5/thumbnails/3.jpg)
Add’l connectors in Apache Bahir
▪Netty (source)
▪ActiveMQ (source and sink)
▪Akka (sink)▪Flume (sink)▪Redis (sink)
3
![Page 4: Apache Flink Training - DataStream API - Connectors](https://reader035.vdocuments.us/reader035/viewer/2022062302/5871383c1a28abf0568b62f1/html5/thumbnails/4.jpg)
Basic Connectors
4
![Page 5: Apache Flink Training - DataStream API - Connectors](https://reader035.vdocuments.us/reader035/viewer/2022062302/5871383c1a28abf0568b62f1/html5/thumbnails/5.jpg)
Basic Data Sources: CollectionsStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// read from elementsDataStream<String> names = env.fromElements("Some", "Example", "Strings");
// read from Java collectionList<String> list = new ArrayList<String>(); list.add("Some"); list.add("Example"); list.add("Strings");
DataStream<String> names = env.fromCollection(list);
5
![Page 6: Apache Flink Training - DataStream API - Connectors](https://reader035.vdocuments.us/reader035/viewer/2022062302/5871383c1a28abf0568b62f1/html5/thumbnails/6.jpg)
Basic Data Sources: SocketsStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// read text socket from portDataStream<String> socketLines = env .socketTextStream("localhost", 9999);
6
![Page 7: Apache Flink Training - DataStream API - Connectors](https://reader035.vdocuments.us/reader035/viewer/2022062302/5871383c1a28abf0568b62f1/html5/thumbnails/7.jpg)
Basic Data Sources: FilesStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> lines = env.readTextFile("file:///path");
DataStream<String> lines = env.readFile(inputFormat, "file:///path");
7
![Page 8: Apache Flink Training - DataStream API - Connectors](https://reader035.vdocuments.us/reader035/viewer/2022062302/5871383c1a28abf0568b62f1/html5/thumbnails/8.jpg)
Data Sources: Monitored Files & Directories
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// monitor directory, checking for new files// every 100 millisecondsTextInputFormat format = new TextInputFormat( new org.apache.flink.core.fs.Path("file:///tmp/dir/"));
DataStream<String> inputStream = env.readFile( format, "file:///tmp/dir/", FileProcessingMode.PROCESS_CONTINUOUSLY, 100, FilePathFilter.createDefaultFilter());
8
Note: if you modify a file (e.g. by appending to it), its entire contents will be reprocessed! This will break exactly-once semantics.
![Page 9: Apache Flink Training - DataStream API - Connectors](https://reader035.vdocuments.us/reader035/viewer/2022062302/5871383c1a28abf0568b62f1/html5/thumbnails/9.jpg)
Basic Data Sinks
Print to the standard output▪stream.print()
Write as text file using toString()▪stream.writeAsText("/path/to/file")
Write as CSV file▪stream.writeAsCsv("/path/to/file")
Emit to socket▪stream.writeToSocket(host, port,
SerializationSchema)
9
![Page 10: Apache Flink Training - DataStream API - Connectors](https://reader035.vdocuments.us/reader035/viewer/2022062302/5871383c1a28abf0568b62f1/html5/thumbnails/10.jpg)
Execution
• Keep in mind that programs are lazily executed
DataStream<T> result;
// nothing happensresult.writeToSocket(...);
// nothing happensresult.writeAsText("/path/to/file", "\n", "|");
// Execution really starts hereenv.execute();
10
![Page 11: Apache Flink Training - DataStream API - Connectors](https://reader035.vdocuments.us/reader035/viewer/2022062302/5871383c1a28abf0568b62f1/html5/thumbnails/11.jpg)
Unbundled Connectors
11
![Page 12: Apache Flink Training - DataStream API - Connectors](https://reader035.vdocuments.us/reader035/viewer/2022062302/5871383c1a28abf0568b62f1/html5/thumbnails/12.jpg)
Linking with the Unbundled Connectors
▪Note that many of the available streaming connectors are not bundled with Flink by default
▪This prevents dependency clashes with your code
▪To use these modules, you can either▪Copy the JAR files into the lib folder of each TaskManager▪Or package them with your code (recommended)
▪Docshttps://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/cluster_execution.html#linking-with-modules-not-contained-in-the-binary-distribution
12
![Page 13: Apache Flink Training - DataStream API - Connectors](https://reader035.vdocuments.us/reader035/viewer/2022062302/5871383c1a28abf0568b62f1/html5/thumbnails/13.jpg)
Connecting to Apache Kafka
13
![Page 14: Apache Flink Training - DataStream API - Connectors](https://reader035.vdocuments.us/reader035/viewer/2022062302/5871383c1a28abf0568b62f1/html5/thumbnails/14.jpg)
Kafka and Flink
▪“Apache Kafka is a distributed, partitioned, replicated commit log service”
▪Kafka maintains feeds of messages in categories called topics
▪Flink can read a Kafka topic to produce a DataStream and write a DataStream to a Kafka topic
▪Flink coordinates with Kafka to provide recovery in the case of failures
14
![Page 15: Apache Flink Training - DataStream API - Connectors](https://reader035.vdocuments.us/reader035/viewer/2022062302/5871383c1a28abf0568b62f1/html5/thumbnails/15.jpg)
Reading Data from Kafka
▪Add a DataStream source from a Kafka topic
Properties props = new Properties();props.setProperty("zookeeper.connect", "localhost:2181");props.setProperty("bootstrap.servers", "localhost:9092");props.setProperty("group.id", "myGroup");
// create a data sourceDataStream<String> data= env.addSource( new FlinkKafkaConsumer010<String>( "myTopic", // Kafka topic new SimpleStringSchema(), // deserialization schema props) // Consumer config);
15
![Page 16: Apache Flink Training - DataStream API - Connectors](https://reader035.vdocuments.us/reader035/viewer/2022062302/5871383c1a28abf0568b62f1/html5/thumbnails/16.jpg)
Writing Data to Kafka
▪Add a Kafka sink to a DataStream by providing• the broker address• the topic name• a serialization schema
DataStream<String> aStream = …aStream.addSink(
new FlinkKafkaProducer010<String>( "localhost:9092", // default local
broker "myTopic", // Kafka topic
new SimpleStringSchema()) // serialization schema);
16
![Page 17: Apache Flink Training - DataStream API - Connectors](https://reader035.vdocuments.us/reader035/viewer/2022062302/5871383c1a28abf0568b62f1/html5/thumbnails/17.jpg)
When are Kafka offsets committed?
▪If Flink checkpointing is disabled, then the Properties auto.commit.enable and auto.commit.interval.ms control this behavior
▪If checkpointing is enabled, then the autocommit Properties are ignored, and Flink commits the offsets whenever a checkpoint is completed
17
![Page 18: Apache Flink Training - DataStream API - Connectors](https://reader035.vdocuments.us/reader035/viewer/2022062302/5871383c1a28abf0568b62f1/html5/thumbnails/18.jpg)
Kafka timestamps
▪Since Kafka 0.10, Kafka messages can carry timestamps
▪Flink can use these timestamps; see https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html#using-kafka-timestamps-and-flink-event-time-in-kafka-010 for details
▪You will still need to arrange for watermarks to be emitted
18
![Page 19: Apache Flink Training - DataStream API - Connectors](https://reader035.vdocuments.us/reader035/viewer/2022062302/5871383c1a28abf0568b62f1/html5/thumbnails/19.jpg)
Writing to Elasticsearch
19
![Page 20: Apache Flink Training - DataStream API - Connectors](https://reader035.vdocuments.us/reader035/viewer/2022062302/5871383c1a28abf0568b62f1/html5/thumbnails/20.jpg)
Elasticsearch
▪Distributed search engine, based on Apache Lucene
▪Part of an ecosystem that also includes Kibana for exploration and visualization
▪Often used to store and index JSON documents
▪Has good defaults, but you can not modify an index mapping (schema) after inserting data
▪Elasticsearch has an HTTP-based REST API20
![Page 21: Apache Flink Training - DataStream API - Connectors](https://reader035.vdocuments.us/reader035/viewer/2022062302/5871383c1a28abf0568b62f1/html5/thumbnails/21.jpg)
Elasticsearch and Flink
▪Flink has separate Sink connectors for Elasticsearch 1.x and 2.x (and 5.x in Flink 1.3)
▪The Flink connectors use the Transport Client to send data
▪You’ll need to know your• cluster’s network address• cluster name• index name
21
![Page 22: Apache Flink Training - DataStream API - Connectors](https://reader035.vdocuments.us/reader035/viewer/2022062302/5871383c1a28abf0568b62f1/html5/thumbnails/22.jpg)
Fault Tolerance
22
![Page 23: Apache Flink Training - DataStream API - Connectors](https://reader035.vdocuments.us/reader035/viewer/2022062302/5871383c1a28abf0568b62f1/html5/thumbnails/23.jpg)
Fault Tolerance
▪What happens if a worker thread goes down?
▪Flink supports different guarantee levels for failure recovery:
▪Exactly once▪Each event affects the declared state of a program exactly once.▪Note: This does not mean that events are processed exactly once!
▪At least once▪Each event affects the declared state of a program at least once
▪Deactivated / None / At most once▪All state is lost in case of a failure
23
![Page 24: Apache Flink Training - DataStream API - Connectors](https://reader035.vdocuments.us/reader035/viewer/2022062302/5871383c1a28abf0568b62f1/html5/thumbnails/24.jpg)
Source & Sink Requirements
▪“Exactly once” & “at least once” guarantees require replayable sources• Data must be replayed in case of a failure
▪“End-to-End exactly once” guarantees require• Transactional sinks, or• Idempotent writes
24
![Page 25: Apache Flink Training - DataStream API - Connectors](https://reader035.vdocuments.us/reader035/viewer/2022062302/5871383c1a28abf0568b62f1/html5/thumbnails/25.jpg)
Guarantees of Data SourcesSource GuaranteeApache Kafka Exactly once
AWS Kinesis Streams Exactly once
RabbitMQ None (v 0.10) / Exactly once (v 1.0)
Twitter Streaming API None
Collections Exactly once
Files Exactly once
Sockets None
25
![Page 26: Apache Flink Training - DataStream API - Connectors](https://reader035.vdocuments.us/reader035/viewer/2022062302/5871383c1a28abf0568b62f1/html5/thumbnails/26.jpg)
Guarantees of Data SinksSink GuaranteeHDFS rolling sink Exactly once
Cassandra Exactly once for idempotent updates
Elasticsearch Exactly once for idempotent indexing
Kafka At least once
AWS Kinesis Streams At least once
File sinks At least once
Socket sinks At least once
Standard output At least once
Redis At least once
26
![Page 27: Apache Flink Training - DataStream API - Connectors](https://reader035.vdocuments.us/reader035/viewer/2022062302/5871383c1a28abf0568b62f1/html5/thumbnails/27.jpg)
References
▪Documentation• https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/in
dex.html
▪Blog posts• http://data-artisans.com/kafka-flink-a-practical-how-to/• https://www.elastic.co/blog/building-real-time-dashboard-applications-with-
apache-flink-elasticsearch-and-kibana
27