kafka audit - kafka meetup - january 27th, 2015
Post on 15-Jul-2015
897 Views
Preview:
TRANSCRIPT
KAFKA AUDITJanuary 27th, 2015 - LinkedIn Meetup
+
ProducerKafka Cluster
Local
Mp
Mp = {Plain old Kafka message}
ProducerKafka Cluster
Local
Kafka Cluster
Aggregate
Mp
Mp
Mp = Plain old Kafka message
Producer
Kafka Cluster
Aggregate
Mp
Mp
Kafka Cluster
Aggregate
Kafka Cluster
Local
Kafka Cluster
Local
Mp Mp
Mp = Plain old Kafka message
Datacenter A Datacenter B
Producer
Kafka Cluster
Aggregate
Mp
Mp
Kafka Cluster
Local
Mp = Plain old Kafka message
ProducerKafka Cluster
Local
Kafka Cluster
Aggregate
Kafka Cluster
Aggregate
Mp
Mp
Mp
Mp = Plain old Kafka message
ProducerKafka Cluster
Local
Kafka Cluster
Aggregate
Kafka Cluster
Aggregate
Offline
processing
Mp
Mp
Mp
Mp
Mp = Plain old Kafka message
Producer Kafka Cluster
Ma = {
Ma
Plain old Kafka message
Producer creation timestamp
Producer identification string}
Producer Kafka Cluster
Ma = {
Ma
Plain old Kafka message
Producer creation timestamp
Producer identification string}
Mm
Mm = {Count of messages
The topic this count is for
Tier identification string
Time bucket interval
}
ProducerKafka Cluster
Local
Kafka Cluster
Aggregate
Kafka Cluster
Aggregate
Offline
processing
Ma
Mm
Ma
Ma
Ma
Ma = Message with audit data
Mm = Monitoring message
ProducerKafka Cluster
Local
Kafka Cluster
Aggregate
Kafka Cluster
Aggregate
Offline
processing
Audit
Consumer
Ma
Mm Mm
Ma
Ma
Ma
Ma
Ma = Message with audit data
Mm = Monitoring message
ProducerKafka Cluster
Local
Kafka Cluster
Aggregate
Kafka Cluster
Aggregate
Offline
processing
Audit
Consumer
Audit
Consumer
Audit
Consumer
Ma
Mm Mm
Ma
Mm
Ma
Mm
Ma
Ma
Ma
Ma
Ma = Message with audit data
Mm = Monitoring message
ProducerKafka Cluster
Local
Kafka Cluster
Aggregate
Offline
processing
Audit
Consumer
Audit
Consumer
Audit
Consumer
Audit
App
Ma
Mm Mm
Ma
Mm
Ma
Mm
Ma
Ma
Ma
Mm
Ma = Message with audit data
Mm = Monitoring message
Ma
Kafka Cluster
Aggregate
ProducerKafka Cluster
Local
Kafka Cluster
Aggregate
Kafka Cluster
Aggregate
Offline
processing
Audit
Consumer
Audit
Consumer
Audit
Consumer
Audit
AppREST API
Ma
Mm Mm
Ma
Mm
Ma
Mm
Ma
Ma
Ma
Mm
Ma = Message with audit data
Mm = Monitoring message
Ma
Mm
Audit MySQL
Audit UI
AUDIT UI
Tier Count
Local 123
Aggregate
Aggregate Offline
123
123
Producer 123
(for each topic and time window)
AUDIT UI
Tier Count
Local 123
Aggregate
Aggregate Offline
119
119
Producer 123
We lost 4 messages between local and aggregate!
(for each topic and time window)
CAVEATS
• Audit consumers need to consume
everything.
• Intermediate tiers are tough to drill down into.
QUESTIONS?
users@kafka.apache.org
https://kafka.apache.org/
irc://irc.freenode.net/#apache-kafka
Many folks on the mailing list know the details
of how Kafka Audit works.
LATE MESSAGE
RESOLUTION
LATE MESSAGE
RESOLUTION
Producer
Local
Aggregate
Aggregate
Hadoop
10:10 10:20 10:30 10:40
341
10:00
341
341
341
341
352
299
299
299
299 337
337
337
337
337 326
326
326
326
326
From the 10:10 to 10:20 time bucket, 53 messages were
lost from the producer to the Kafka local cluster.
Unhealthy!
Current time
LATE MESSAGE
RESOLUTION
Producer
Local
Aggregate
Aggregate
Hadoop
10:10 10:20 10:30 10:40
341
10:00
341
341
341
341
352
299+53
299
299
299 337
337
337
337
337 326
326
326
326
326
Another message Mm arrives later with the missing count of 53!
Current time
LATE MESSAGE
RESOLUTION
Producer
Local
Aggregate
Aggregate
Hadoop
10:10 10:20 10:30 10:40
341
10:00
341
341
341
341
352
352
352
352
352 337
337
337
337
337 326
326
326
326
326
All time periods match after arrival of late Mm message.
Healthy state now.
Current time
The producer timestamp determines the time bucket
the message is placed into — deterministic.
Mm = {Count of messages
The topic this count is for
Tier identification string
Time bucket interval
}
TRANSPORT TIME
Producer
Kafka Cluster
Local
Kafka Cluster
Aggregate
Kafka Cluster
Aggregate
Audit
Consumer
Audit
Consumer
Audit
Consumer
Ma
Ma
Ma
Ma
Ma
Ma
Tt = {Time Ma seen by audit consumer
}Topic name
Tt
Tt
Tt
Metrics
(e.g. RRDs)
Tt = { Time seen by audit consumer}Topic name
Tt can be sampled,
no need to emit for all messages
Tt[time] = <Audit Consumer NTPd Time> - Ma[time]
CAVEATS
• Depends on the Audit Consumer lag.
• Producer batching can skew timestamps.
SCHEMA RESOLUTION
WHAT IS A SCHEMA?{
"type":"record",
"name":"User",
"fields":[
{
"name":"name",
"type":"string"
},
{
"name":"favorite_number",
"type":[
"int",
"null"
]
}
]
}
Every message should be formatted to a schema!
SCHEMA REGISTRYA REST API to go from schema to ID, and ID to schema.
Schema ID = hash(Raw Schema)
Schema Registry Database
Registration
TimestampSchema ID Raw Schema
History of registrations is maintained.
ProducerSchema
Registry
1.
2.
Kafka
3.
1. Producer registers schema.
2. Registry returns schema ID (hash of schema).
3. Schema ID prepended to all Kafka messages.
Ms = { }<Schema ID> + Mall
Ms
top related