kafka: a distributed messaging system for log...
TRANSCRIPT
KAFKA: A DISTRIBUTED MESSAGING SYSTEM FOR LOG PROCESSING
By J. Kreps, N. Narkhede, and J. Rao
Presented in NetDBWorkshop 2011
VOLUME
� 20B events/day
� 3 terabytes/day
� 150K events/sec
LOGGING OVERVIEW
� Many types of events
� User Activity: Impressions, search, ads etc.
� Operational Metrics: Service metrics
� High Volume: billions of events per day
� Both online & offline use
� reporting, batch analysis
� security, newsfeeds, dashboards
Message queues
•ActiveMQ
•TIBCO
Log aggregators
•Flume
•Scribe
•Low throughput
•Secondary indexes
•Tuned for low latency
•Focus on HDFS
•Push model
•No rewindableconsumption
KAFKA
PUBLISH – SUBSCRIBE SYSTEM
Producer ConsumerConsumer
ProducerProducer
ConsumerConsumer
Topic 1
Topic 2
Topic 3
subscribepublish(topic, msg)
Publish subscribe system
msg
msg
� Producers – Processes that generate events
� Consumers – Subscribe to topics in Kafka and pull data
� Topics – Topics are Queues. They are logical collections of partitions in several brokers
WHAT KAFKA OFFERS
� Very high performance
� Elastically scalable
� Low operational overhead
� Durable, highly available
KAFKA - CONCEPTS
Consumer1 Reads Consumer2 Reads
(offset 7) (offset 10)
Messages Producer Writes
KAFKA - STREAM
� Records are Key-Value pairs
� Stream is a set of these records
STREAM & TABLE
� A stream is a changelog of a table
� KStream = interprets data as record stream
� “append-only”
� A table is a materialized view at time of a stream
� Ktable = data as changelog stream
� continuously updated materialized view
TOPOLOGY PROCESSING
� Arrows are Streams and Nodes are Stream Processors
� Initial nodes are Source Processors and Final node is
Sink Processor
KStream<..> stream1 = builder.stream(”topic1”);
KStream<..> stream2 = builder.stream(”topic2”);
KStream<..> joined = stream1.leftJoin(stream2, ...);
KTable<..> aggregated = joined.aggregateByKey(...);
aggregated.to(”topic3”);
KSTREAM & KTABLE
� Kafka is a multi-subscriber system. i.e. for the same topic, we can have several independent applications consuming the same data and are fully de-coupled.
� Each message has a unique identifier and Consumers ask for message by this identifier (sequential within a topic and partition)
� Consumer has to keep track of what message offset was last consumed
Producer
ConsumerConsumer
ProducerProducer
Broker Broker Broker Broker
ConsumerConsumer
ZK
Zookeeper does the following tasks
� Tracking addition/removal of brokers/consumers
� Keeps track of what messages were consumed in which topics/partitions
� Broker/Consumer registry, & each Consumer group is associated with Ownershipand Offset registry in Zookeeper
• Algorithm 1: rebalance process for consumer Ci in group G
• For each topic T that Ci subscribes to {
• remove partitions owned by Ci from the ownership registry
• read the broker and the consumer registries from Zookeeper
• compute PT = partitions available in all brokers under topic T
• compute CT = all consumers in G that subscribe to topic T
• sort PT and CT
• let j be the index position of Ci in CT and let N = |PT|/|CT|
• assign partitions from j*N to (j+1)*N - 1 in PT to consumer Ci
• for each assigned partition p {
• set the owner of p to Ci in the ownership registry
• let Op = the offset of partition p stored in the offset registry
• invoke a thread to pull data in partition p from offset Op
ALGORITHM FOR REBALANCE PROCESS
AUTOMATIC LOAD BALANCING
ConsumerConsumer
ProducerProducer
Broker Broker
ConsumerConsumer
ProducerProducer
AUTOMATIC LOAD BALANCING
� Brokers and Consumers register in Zookeeper
� Consumers listen to Broker and Consumer changes
� Each change triggers Consumer Rebalancing
EFFICIENCIES
1.) Simple Storage
� Each topic has an ever-growing log
� A log == a list of files
� A message is addressed by a log offset
2.) Easy Transfer
� Batch send and receive
� No message caching in JVM
� Rely on file system buffering
3.) Stateless Broker
� Each consumer maintains its own state
� Message deletion driven by retention policy, not by tracking consumption
BASIC PERFORMANCE METRICS
• Producer batch size = 40K
• Consumer batch size = 1MB
• 100 topics, broker flush interval = 100K
• Producer throughput = 90 MB/sec
• Consumer throughput = 60 MB/sec
• Consumer latency = 220 ms
LATENCY VS THROUGHPUT
0
50
100
150
200
250
0 20 40 60 80 100
Producer throughput in MB/sec
Consumer latency in m
s
(100 topics, 1 producer, 1 broker)
SCALABILITY
101
190
293
381
0
50
100
150
200
250
300
350
400
1 broker 2 brokers 3 brokers 4 brokers
Throughput in M
B/s
(10 topics, broker flush interval 100K)
THROUGHPUT VSUNCONSUMED DATA
0
40000
80000
120000
160000
200000
10
105
199
294
388
473
567
662
756
851
945
1039
(1 topic, broker flush interval 10K)T
hro
ug
hp
ut
in m
sg/s
Unconsumed data in GB