Download - Robert Metzger - Connecting Apache Flink to the World - Reviewing the streaming connectors
Robert Metzger, Aljoscha Krettek@rmetzger_
Connecting Apache Flink® to the World: Reviewing the streaming connectors
What to expect from this talk Overview of all available connectors Kafka connector internals End-to-end exactly-once Apache Bahir and the future of connectors [Bonus] Message Queues and the Message
Acknowledging Source2
3
Connectors in Apache Flink®“Hello World, let’s connect”
Connectors in Flink 1.1Connector Source Sink NotesStreaming files Both source and sink are exactly-onceApache Kafka Consumers (sources) exactly-once
Amazon Kinesis Consumers (sources) exactly-once
RabbitMQ / AMQP Consumers (sources) exactly-once
Elasticsearch No guarantees
Apache Cassandra Exactly-once with idempotent updates
Apache Nifi No guarantees
Redis No guarantees
4There is also a Twitter Source and an ActiveMQ connector in Apache Bahir
Streaming connectors by activityStreaming connectors ordered by number of threads/mentions on the user@flink list: Apache Kafka (250+) (since 0.7) Apache Cassandra (38) (since 1.1) ElasticSearch (34) (since 0.10) File sources (~30) (since 0.10) Redis (27) (since 1.0) RabbitMQ (11) (since 0.7) Kinesis (10) (since 1.1) Apache Nifi (5) (since 0.10)
5Date of evaluation 5.9.2016
6
The Apache Kafka Connector
Apache Kafka connector: Intro“Apache Kafka is publish-subscribe messaging rethought as a distributed commit log.”
7This page contains material copied from http://kafka.apache.org/documentation.html#introduction
Apache Kafka connector: Consumer
Flink has two main Kafka consumer implementations• For Kafka 0.8 an implementation against the
“SimpleConsumer” API of Kafka• For Kafka 0.9+ we are using the new Kafka
consumer (KAFKA-1326) The producers are basically the same
8
Kafka 0.8 Consumer
9
Fetcher Thread
Fetcher Thread
Kafka Broker
topicB:1topicB:3 Kafka
Broker
topicA:3topicB:6topicB:5
Kafka Broker
topicB:4topicB:2topicA:1
Kafka Broker
topicA:2topicB:0topicA:0
Consumer Thread
Fetcher Thread
Fetcher Thread
Consumer Thread
topicA:2topicB:0topicA:0
topicB:1topicB:3topicB:4
topicB:2topicA:1
topicA:3topicB:6topicB:5
Kafka
ClusterFlink Cluster
Each TaskManager has one Consumer Thread, coordinating Fetcher Threads for each Kafka broker
TaskManagerTaskManager
Kafka 0.8 Broker rebalance
10
Fetcher Thread
Fetcher Thread
Kafka Broker
topicB:1topicB:3 Kafka
Broker
topicA:3topicB:6topicB:5
Kafka Broker
topicB:4topicB:2topicA:1
Kafka Broker
topicA:2topicB:0topicA:0
Consumer Thread
Fetcher Thread
Fetcher Thread
Consumer Thread
topicA:2topicB:0topicA:0
topicB:1topicB:3topicB:4
topicB:2topicA:1
topicA:3topicB:6topicB:5
Kafka
ClusterFlink Cluster
The consumer is able to handle broker failures
1 Broker fails
2 Thread returns partitions
Kafka 0.8 Broker rebalance
11
Fetcher Thread
Fetcher Thread
Kafka Broker
topicB:1topicB:3 Kafka
Broker
topicA:3topicB:6topicB:5
Kafka Broker
topicB:4topicB:2topicA:1
Kafka Broker
topicA:2topicB:0topicA:0
Consumer Thread
Fetcher Thread
Fetcher Thread
Consumer Thread
topicA:2topicB:0topicA:0
topicB:1topicB:3topicB:4
topicB:2topicA:1
topicA:3topicB:6topicB:5
Kafka
ClusterFlink Cluster
On a failure, the Consumer Thread re-assigns partitions and spawns new threads as needed
1 Broker fails
2 Thread returns partitions
topicB:4topicB:2topicA:1
Kafka 0.8 Broker rebalance
12
Fetcher Thread
Fetcher Thread
Kafka Broker
topicB:1topicB:3 Kafka
Broker
topicA:3topicB:6topicB:5
Kafka Broker
topicB:4topicB:2topicA:1
Kafka Broker
topicA:2topicB:0topicA:0
Consumer Thread
Fetcher Thread
Fetcher Thread
Consumer Thread
topicA:2topicB:0topicA:0
topicB:1topicB:3 topicA:3
topicB:6topicB:5
Kafka
ClusterFlink Cluster
On a failure, the Consumer Thread re-assigns partitions and spawns new threads as needed
3 Kafka reassigns partitions
topicB:4topicB:2topicA:1
topicB:2topicB:4topicA:1
topicB:2
Fetcher Thread
topicB:4topicA:1
topicB:4topicB:2topicA:1
4 Flink assigns partitions to existing or new threads
Kafka 0.9+ Consumer
13
Kafka Broker
topicB:1topicB:3 Kafka
Broker
topicA:3topicB:6topicB:5
Kafka Broker
topicB:4topicB:2topicA:1
Kafka Broker
topicA:2topicB:0topicA:0
Consumer Thread
Consumer Thread
Kafka
ClusterFlink Cluster
New Kafka Consumer Magic
TaskManager TaskManager
Since Kafka 0.9, the new Consumer API handles broker failures/rebalancing, offset committing, topic querying, …
Exactly-once for Kafka consumers
Mechanism is the same for all connector versions
Offsets to Zookeeper / Broker for group.id restart and external tools (at-least-once)
Offsets checkpointed for exactly-once with Flink state
14
a b c d e
a b c d e
Flink Kafka Consumer
Flink Kafka Consumer
Flink Map Operator
counter = 0
Zookeeper/Brokeroffset partition 0: 0offset partition 1: 0
Flink Checkpoint CoordinatorPending:Completed:
offsets = 0, 0
This toy example is reading from a Kafka topic with two partitions, each containing “a”, “b”, “c”, … as messages.The offset is set to 0 for both partitions, a counter is initialized to 0.
a b c d e
a b c d e
Flink Kafka Consumer
Flink Kafka Consumer
Flink Map Operator
a
counter = 0
Zookeeper/Brokeroffset partition 0: 0offset partition 1: 0
Flink Checkpoint CoordinatorPending:Completed:
offsets = 1, 0
The Kafka consumer starts reading messages from partition 0. Message “a” is in-flight, the offset for the first consumer has been set to 1.
a b c d e
a b c d e
Flink Kafka Consumer
Flink Kafka Consumer
Flink Map Operator
a
counter = 1
Zookeeper/Brokeroffset partition 0: 0offset partition 1: 0
Flink Checkpoint CoordinatorPending:Completed:
offsets = 2, 1
a
b
Trigger Checkpoint at
source
Message “a” arrives at the counter, it is set to 1. The consumers both read the next records (“b” and “a”). The offsets are set accordingly. In parallel, the checkpoint coordinator decides to trigger a checkpoint at the source …
a b c d e
a b c d e
Flink Kafka Consumer
Flink Kafka Consumer
Flink Map Operator a
counter = 2
Zookeeper/Brokeroffset partition 0: 0offset partition 1: 0
Flink Checkpoint CoordinatorPending:Completed:
offsets = 3, 1
a
b
offsets = 2, 1
c
The source has created a snapshot of its state (“offset=2,1”), which is now stored in the checkpoint coordinator.The sources emitted a checkpoint barrier after messages “a” and “b”.
a b c d e
a b c d e
Flink Kafka Consumer
Flink Kafka Consumer
Flink Map Operator
counter = 3
Zookeeper/Brokeroffset partition 0: 0offset partition 1: 0
Flink Checkpoint CoordinatorPending:Completed:
offsets = 3, 2
a
b
offsets = 2, 1 counter = 3
c
bThe map operator has received checkpoint barriers from both sources. It checkpoints its state (counter=3) in the coordinator. At the same time, the consumers are further reading more data from the Kafka partitions.
a b c d e
a b c d e
Flink Kafka Consumer
Flink Kafka Consumer
Flink Map Operator
counter = 4
Zookeeper/Brokeroffset partition 0: 0offset partition 1: 0
Flink Checkpoint CoordinatorPending:Completed:
offsets = 3, 2
a
offsets = 2, 1 counter = 3
c
b
Notify checkpoint complete
The checkpoint coordinator informs the Kafka consumer that the checkpoint has been completed. It commits the checkpoints offsets into Zookeeper. Note that Flink is not relying on the Kafka offsets in ZK for restoring from failures
a b c d e
a b c d e
Flink Kafka Consumer
Flink Kafka Consumer
Flink Map Operator
counter = 4
Zookeeper/Brokeroffset partition 0: 2offset partition 1: 1
Flink Checkpoint CoordinatorPending:Completed:
offsets = 3, 2
a
offsets = 2, 1 counter = 3
c
b
Checkpoint in Zookeeper/
Broker
The checkpoint is now persisted in Zookeeper. External tools such as the Kafka Offset Checker can see the lag of the consumer group.
a b c d e
a b c d e
Flink Kafka Consumer
Flink Kafka Consumer
Flink Map Operator
counter = 5
Zookeeper/Brokeroffset partition 0: 2offset partition 1: 1
Flink Checkpoint CoordinatorPending:Completed:
offsets = 4, 2
offsets = 2, 1 counter = 3
c
b
d
The processing further advances
a b c d e
a b c d e
Flink Kafka Consumer
Flink Kafka Consumer
Flink Map Operator
counter = 5
Zookeeper/Brokeroffset partition 0: 2offset partition 1: 1
Flink Checkpoint CoordinatorPending:Completed:
offsets = 4, 2
offsets = 2, 1 counter = 3
c
b
d
Failure
Some failure has happened (such as worker failure)
a b c d e
a b c d e
Flink Kafka Consumer
Flink Kafka Consumer
Flink Map Operator
counter = 3
Zookeeper/Brokeroffset partition 0: 2offset partition 1: 1
Flink Checkpoint CoordinatorPending:Completed:
offsets = 2, 1
offsets = 2, 1 counter = 3Reset all
operators to last
completed checkpoint
The checkpoint coordinator restores the state at all the operators participating at the checkpointing. The Kafka sources start from offset 2 and 1, the counter’s value is 3.
a b c d e
a b c d e
Flink Kafka Consumer
Flink Kafka Consumer
Flink Map Operator
counter = 3
Zookeeper/Brokeroffset partition 0: 2offset partition 1: 1
Flink Checkpoint CoordinatorPending:Completed:
offsets = 3, 1
offsets = 2, 1 counter = 3Continue
processing …
c
The system continues with the processing, the counter’s value is consistent across a worker failure.
End-to-End exactly once
26
27
Consistently move and process data
Process Transform Analyze
Exactly-once:• Apache Kafka• Kinesis• RabbitMQ / ActiveMQ• File monitoring
Exactly-once:• Rolling file sink• With idempotent updates
• Apache Cassandra• Elasticsearch• Redis
At-least-once (duplicates):• Apache Kafka
Flink allows to move data between systems, keeping consistency
Continuous File Monitoring
28
Some FileSystem
Monitoring task
Periodic Querying
Parallel file reader
Parallel file reader
Parallel file reader
• File Path• Offset
Records
The monitoring task checkpoints the last “modification time” The file readers checkpoint the current file + offset and the list of pending files to
read
Rolling / Bucketing File Sink System time bucketing
Bucketing based on record data
29
Bucketing Operator
11:00 10:00 9:00
9 5 1 1 8 4 2 4 6 2 3 4 Bucketing Operator
8:00
0-4 5-9
Bucketing File Sink exactly-once On Hadoop 2.7+, we call truncate() to remove
invalid data on restore On earlier versions, we’ll write a metadata file
with valid offsets Downstream consumers must take valid offset metadata into account
30
Kafka Producer: Avoid data loss Apache Kafka does currently not provide the infrastructure to
produce in an exactly-once fashion By avoiding data-loss, we can guarantee at-least-once.
31
Flink Kafka Producer
Kafka broker
Kafka partition
unacknowledged=7
On checkpoint, Flink calls flush() and waits for unack == 0
Guarantee that data has been written
ACK
ACKACK
ACK
32
Apache Bahir and the future of connectorsWhat’s next
Future of Connectors in Flink Kafka 0.10 support, with timestamps Dynamic scaling support for Kafka and other
connectors Refactor Kafka connector API
33
Apache Bahir™ Bahir is a community specialized in connectors, allowing faster
releases independent of engine releases. Apache Bahir™ has been created for providing community-
contributed connectors a platform, following Apache governance. The Flink community decided to move some of our connectors
there. Kafka, Kinesis, streaming files, … will stay in Flink! Flink connectors in Bahir:
ActiveMQ, Redis, Flume sink, RethinkDB (incoming), streaming Hbase (incoming).
New connector contributions are welcome!34
Disclaimer: The description of the Bahir community is my personal view. I am not a representative of the project.
35
Time for questions…
Connectors in Apache Flink Ask me now!
Follow me on Twitter: @rmetzger_ Ask the Flink community on
[email protected] Ask me privately on
37
Message QueuesExactly-once for
38
Message Queues supported by Flink Traditional message queues have different semantics than
Kafka, Kinesis, etc. RabbitMQ
• Advanced Message Queuing Protocol (AMQP)
• Available in Apache Flink ActiveMQ
• Java Message Service (JMS)• Available in Apache Bahir (no release yet)
Image source: http://www.instructables.com/id/Spark-Core-Photon-and-CloudMQTT/step1/What-is-Message-Queuing/
Message Queue Semantics
39
Flink RabbitMQ
Source
Offset
Flink Kafka Consumer
In MQs, messages are removed once they are consumed Replay not possible
Message Acknowledging Once a checkpoint has been completed by all operators, the messages in the queue are
acknowledged, leading to their removal from the queue.
40
id=8 id=7 id=6 id=5 id=4 id=3 id=2 id=1
Flink RabbitMQ Source
Checkpoint 1:id=1id=2id=3 Unconfirm
edCheckpoint 2:id=4id=5id=6 Unconfirm
ed
Checkpoint 1 completed
id=8 id=7 id=6 id=5 id=4
Flink RabbitMQ Source
Checkpoint 1:id=1id=2id=3 Confirm
ed
Checkpoint 2:id=4id=5id=6 Unconfirm
ed
Message queue
ACKid=1 ACK
id=2ACKid=3
Message Acknowledging In case of a failure, all the unacknowledged messages are consumed
again
41
id=8 id=7 id=6 id=5 id=4 id=3 id=2 id=1
Flink RabbitMQ Source
Checkpoint 1:id=1id=2id=3 Unconfirm
edCheckpoint 2:id=4id=5id=6 Unconfirm
ed
System failure
Flink RabbitMQ Source
Checkpoint 1:id=1id=2id=3
Message queue
Unconfirmedid=8 id=7 id=6 id=5 id=4 id=3 id=2 id=1
Message are not lost and send again
after recovery
Message Acknowledging What happens if the system fails after a checkpoint is completed, but
before all messages have been acknowledged?
42
Checkpoint 1 completed
id=8 id=7 id=6 id=5 id=4
Flink RabbitMQ Source
Checkpoint 1:id=1id=2id=3 Confirm
ed
Checkpoint 2:id=4id=5id=6 Unconfirm
ed
ACKid=1 ACK
id=2ACKid=3
FAIL
Flink stores a correlation ID of each (un-acked) message to de-duplicate on restore
id=3