data loss and duplication in kafka
TRANSCRIPT
© 2015, Conversant, Inc. All rights reserved.
PRESENTED BY
May 2, 2023
Data Loss and Data Duplication
in Kafka
Jayesh Thakrar
© 2015, Conversant, Inc. All rights reserved.2
Kafka is a distributed, partitioned, replicated, durable commit log service. It provides the functionality of a messaging system, but with a unique design.
Exactly once - each message is delivered once and only once
© 2015, Conversant, Inc. All rights reserved.3
Kafka Overview Data Loss Data Duplication Data Loss and Duplicate Prevention Monitoring
AGENDA
© 2015, Conversant, Inc. All rights reserved.4
Kafka Overview
© 2015, Conversant, Inc. All rights reserved.5
Kafka As A Log Abstraction
Client: Producer
Client: Consumer BClient: Consumer A
Kafka Server = Kafka Broker
Topic: app_events
Source: https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying
© 2015, Conversant, Inc. All rights reserved.6
Topic Partitioning . . .
Kafka Broker
Client: Producer or Consumer
Source: https://engineering.linkedin.com/distributed-systems/log-what-every-software-engineer-should-know-about-real-time-datas-unifying
Topic: app_events
© 2015, Conversant, Inc. All rights reserved.7
Topic Partitioning – Scalability
Clients: Producer, Consumer
Leader
Replica
Replica
Leader
Replica
Replica
Leader
Replica
Replica
Kafka Broker 0
Kafka Broker 1
Kafka Broker 2
© 2015, Conversant, Inc. All rights reserved.8
Topic Partitioning – redundancy
Client: Producer, Consumer
Kafka Broker 2
Leader
Replica
Replica
Leader
Replica
Replica
Leader
Replica
Replica
Kafka Broker 0
Kafka Broker 1
© 2015, Conversant, Inc. All rights reserved.9
Topic Partitioning – Redundancy/durabilityKafka Broker 2
Leader
Replica
Replica
Leader
Replica
Replica
Leader
Replica
Replica
Kafka Broker 0
Kafka Broker 1
Pull-based inter-broker replication
© 2015, Conversant, Inc. All rights reserved.10
Topic Partitioning – summary Log sharded into partitions
Messages assigned to partitions by API or custom partitioner
Partitions assigned to brokers (manual or automatic)
Partitions replicated (as needed)
Messages ordered within each partition
Message offset = absolute position in partition
Partitions stored on filesystem as ordered sequence of log segments (files)
© 2015, Conversant, Inc. All rights reserved.11
Other Key Concepts Cluster = collection of brokers
Broker-id = a unique id (integer) assigned to each broker Controller = functionality within each broker responsible for leader
assignment and management, with one being the active controller
Replica = partition copy, represented (identified) by the broker-id Assigned replicas = set of all replicas (broker-ids) for a partition
ISR = In-Sync Replicas = subset of assigned replicas (brokers) that are “in-sync/caught-up”* with the leader (ISR always includes the leader)
© 2015, Conversant, Inc. All rights reserved.12
Data Loss
© 2015, Conversant, Inc. All rights reserved.13
Data Loss : Inevitable
Upto 0.01% data lossFor 700 billion messages / day,
that's up to 7 million / day
© 2015, Conversant, Inc. All rights reserved.14
Data loss at the producer
Kafka Producer API
API Call-tree
kafkaProducer.send() …. accumulator.append() // buffer …. sender.send() // network I/O
•Messages accumulate in buffer in batches•Batched by partition, retry at batch level•Expired batches dropped after retries•Error count and other metrics via JMX
Data Loss at Producer
•Failure to close / flush producer on termination
•Dropped batches due to communication or other errors when acks = 0 or retry exhaustion
•Data produced faster than delivery, causing BufferExhaustedException(deprecated in 0.10+)
© 2015, Conversant, Inc. All rights reserved.15
dATA LOSS AT The CLUSTER (BY BROKERS)
Was it a
leader?
Detected by Controller via
zookeeper
Was it in ISR?
Other replicas in ISR?
Elect another leader
Allow unclean election?
ISR >= min.insync.replicas?
Relax, everything will be fine
Partition unavailable !!
Other replicas
available?
Y Y
N
N
Y
Y
Y
Y
N
Broker Crashes
N
N
N
1
2
4
5 6
3
7
© 2015, Conversant, Inc. All rights reserved.16
Non-leader broker crash
Was it a
leader?
Detected by Controller via
zookeeper
Was it in ISR?
Other replicas in ISR?
Elect another leader
Allow unclean election?
ISR >= min.insync.replicas?
Relax, everything will be fine
Partition unavailable !!
Other replicas
available?
Y Y
N
N
Y
Y
Y
Y
N
Broker Crashes
N
N
N
1
2
4
5 6
3
7
© 2015, Conversant, Inc. All rights reserved.17
Leader broker crash: Scenario 1
Was it a
leader?
Detected by Controller via
zookeeper
Was it in ISR?
Other replicas in ISR?
Elect another leader
Allow unclean election?
ISR >= min.insync.replicas?
Relax, everything will be fine
Partition unavailable !!
Other replicas
available?
Y Y
N
N
Y
Y
Y
Y
N
Broker Crashes
N
N
N
1
2
4
5 6
3
7
© 2015, Conversant, Inc. All rights reserved.18
Leader broker crash: Scenario 2
Was it a
leader?
Detected by Controller via
zookeeper
Was it in ISR?
Other replicas in ISR?
Elect another leader
Allow unclean election?
ISR >= min.insync.replicas?
Relax, everything will be fine
Partition unavailable !!
Other replicas
available?
Y Y
N
N
Y
Y
Y
Y
N
Broker Crashes
N
N
N
1
2
4
5 6
3
7
© 2015, Conversant, Inc. All rights reserved.19
dATA LOSS AT The CLUSTER (BY BROKERS)
Was it a
leader?
Detected by Controller via
zookeeper
Was it in ISR?
Other replicas in ISR?
Elect another leader
Allow unclean election?
ISR >= min.insync.replicas?
Relax, everything will be fine
Partition unavailable !!
Other replicas
available?
Y Y
N
N
Y
Y
Y
Y
N
Potential data-loss depending upon acks
config at producer. See KAFKA-3919 KAFKA-4215
Broker Crashes
N
N
N
1
2
4
5 6
3
7
© 2015, Conversant, Inc. All rights reserved.20
FROM KAFKA-3919
© 2015, Conversant, Inc. All rights reserved.21
FROM KAFKA-4215
© 2015, Conversant, Inc. All rights reserved.22
Config for Data Durability and Consistency Producer config
- acks = -1 (or all)- max.block.ms (blocking on buffer full, default = 60000) and retries- request.timeout.ms (default = 30000) – it triggers retries
Topic config- min.insync.replicas = 2 (or higher)
Broker config- unclean.leader.election.enable = false
- timeout.ms (default = 30000) – inter-broker timeout for acks
© 2015, Conversant, Inc. All rights reserved.23
Config for Availability and Throughput
Producer config- acks = 0 (or 1)- buffer.memory, batch.size, linger.ms (default = 100)- request.timeout.ms, max.block.ms (default = 60000), retries- max.in.flight.requests.per.connection
Topic config- min.insync.replicas = 1 (default)
Broker config- unclean.leader.election.enable = true
© 2015, Conversant, Inc. All rights reserved.24
Data Duplication
© 2015, Conversant, Inc. All rights reserved.25
Data Duplication: How it occursClient: Producer
Client: Consumer BClient: Consumer A
Kafka Broker
Topic: app_events
Producer (API) retries = messages resent after timeout
when retries > 1
Consumer consumes messages more than once after restart from unclean
shutdown / crash
© 2015, Conversant, Inc. All rights reserved.26
Data Loss & Duplication Detection
© 2015, Conversant, Inc. All rights reserved.27
How to Detect Data loss & Duplication - 1
Memcache /HBase /Cassandra / Other
Producer Kafka Consumer
Topic, Partition, Offset | Msg Key or Hash
KEY | VALUE
1) Msg from producer to Kafka
2) Ack from Kafka with details
3) Producer inserts into store
4) Consumer reads msg
5) Consumer validates msg If exists not duplicate consume msg delete msg If missing duplicate msg Audit: Remaining msgs in store are "lost" or "unconsumed" msgs
1
2
3
4
5Store
© 2015, Conversant, Inc. All rights reserved.28
How to Detect Data loss & Duplication - 2
Memcache /HBase /Cassandra / Other
Producer Kafka Consumer
Source, time-window | Msg count or some other checksum (e.g. totals, etc)
KEY | VALUE
1) Msg from producer to Kafka
2) Ack from Kafka with details
3) Producer maintains window stats
4) Consumer reads msg
5) Consumer validates window stats at end of interval
1
2
3
4
5Store
© 2015, Conversant, Inc. All rights reserved.29
Data Duplication: How to minimize at consumerClient: Producer
Client: Consumer BClient: Consumer A
Kafka Broker
Topic: app_events
If possible, lookup last
processed offset in destination at
startup
© 2015, Conversant, Inc. All rights reserved.30
Monitoring
© 2015, Conversant, Inc. All rights reserved.31
Monitoring and Operations: JMX Metrics
Producer JMX Consumer JMX
© 2015, Conversant, Inc. All rights reserved.32
Questions?
© 2015, Conversant, Inc. All rights reserved.33
Jayesh [email protected]