kafka blr-meetup-presentation - kafka internals

Download Kafka blr-meetup-presentation - Kafka internals

Post on 26-Jan-2017




2 download

Embed Size (px)


Kafka - Linkedins messaging backbone

Kafka InternalsAyyappadas Ravindran Linkedin Bangalore SRE Team

Introduction Who am I ? Ayyappadas Ravindran Staff SRE in Linkedin Responsible for Data Infra Streaming team What is this talk about ? Kafka building blocks in details Operating Kafka Data assurance with KafkaKafka 0.9

Agenda Kafka Reminder ! Zookeeper Kafka Cluster Brokers Kafka Message Producers Schema Registry Consumers Data Assurance What is new in Kafka (Kafka 0.9) Q & A


Kafka Pub/Sub Basics Reminder !Broker


AP1 AP0 AP0ConsumerProducerZookeeper

Kafka is a publish-subscribe messaging system, in which there are four components:- Broker (what we call the Kafka server)- Zookeeper (which serves as a data store for information about the cluster and consumers)- Producer (sends data into the system)- Consumer (reads data out of the system)

Data is organized into topics (here we show a topic named A) and topics are split into partitions (we have partitions 0 and 1 here).A message is a discrete unit of data within Kafka. Producers create messages and send them into the system. The broker stores them, and any number of consumers can then read those messages.

In order to provide scalability, we have multiple brokers. By spreading out the partitions, we can handle more messages in any topic.This also provides redundancy. We can now replicate partitions on separate brokers. When we do this, one broker is the designated leader for each partition. This is the only broker that producers and consumers connect to for that partition. The brokers that hold the replicas are designated followers and all they do with the partition is keep it in sync with the leader.

When a broker fails, one of the brokers holding an in-sync replica takes over as the leader for the partition. The producer and consumer clients have logic built-in to automatically rebalance and find the new leader when the cluster changes like this. When the original broker comes back online, it gets its replicas back in sync, and then it functions as the follower. It does not become the leader again until something else happens to the cluster (such as a manual change of leaders, or another broker going offline).4


Distributed coordination serviceAlso used for maintaining configurationGuaranteesOrderAtomicity ReliabilitySimple APIHierarchical NamespaceEphemeral NodesWatches

In the previous slide we have seen that zookeeper is an integral part of Kafka echo systemSo lets see what is zookeeper, Zookeeper is a distributed coordination service for distributed applicationZookeeper is also used for configuration maintenanceZookeeper exposes simple APIs, using which application can build high level coordination service Zookeeper guarantees ordering, atomicity and reliability Zookeeper is implemented using a shared hierarchal name space. Implemented in the model of a shared Linux file system Every node in zookeeper is called a znodeZnode is similar to file & it stores data, it has ACL and stat informationTwo important concepts in Zookeeper echo system is Ephemeral Nodes & WatchesEphemeral node exists as long as the session that created the ephemeral node existsClient can set watches on Znode, client is informed when there is a change in znodeNow lets quickly see coordination service in action. A leader election Consider that you have multiple clients competing to become the leader. The challenge is on how to elect a leader Zookeeper can be used for leader election Znodes are created by setting SEQUENCE and EPHEMERAL flagBy setting SEQUENCE flag, each node is created with a monotonically increasing number to the end of the pathThe client which manages to create the znode with lowest SEQUENCE ID is elected as leader The znode created is an ephemeral znode, so the znode exists as long as the leader exists.


Zookeeper in Kafka ecosystem Used to store metadata information About brokers About topics & partitions Consumers / Consumer groups Service coordination Controller election For administrative tasks

Ref : https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper

Now Lets see how is zookeeper used in Kafka environmentKafka uses zookeeper both for storing configuration information and also for coordination service (Leader election & executing administrative tasks )Zookeeper is used to store meta data information of broker, topics, consumers When a broker comes live, it registers itself with ZK. Creates a znode & stores the broker ID, hostname and end point details in znodeTwo type of topic related information is stored in ZK. One is the broker related topic information. Which broker hosts which topic/partition & replication information and have information about which replicas are leader and which are followersSecond information related to topic is the config information, this stores per topic configuration information like, retention, clean-up policies etcZookeeper also stores consumer information, like which consumers are consuming from which partition and till what data (in the log) a consumer has consumed , i.e offset informationComing to the co-ordination service. One of the brokers in Kafka cluster, assigns the role of controller. The controller is responsible for managing the state of brokers, partition and replicas. Controller also performs the administrative tasks.Controller election is done using zookeeper.

Zookeeper at LinkedinWe are running Zookeeper 3.4Cluster of 5 (participants) + 1 (observer)Network and power redundancyTransaction logs on SSD. Lesson Learned : Do not over build your cluster

We run zookeeper on 3.4. Its not just used for Kafka but also for other critical applications in Linkedin.We have a cluster size of 5 +1, where 5 are voting members and 1 non-voting member called observer. Primary role of observer is for disaster recovery and also helps in read scalability. We make sure the nodes are in different racks. This is for ensuring power redundancy, we have bond0 (balance-rr bonding). This provides load balancing and fault tolerance. If your system is write heavy its good to have better disk performance, we have SSD for keeping transaction logs. Or at least have a separate drive for transactions logs other than the drive which is used for applications logs and snapshots.Do not over build your cluster, as the cluster size increases the latency for ZK writes transactions increases

Kafka Cluster - Brokers

Brokers Runs Kafka Stores commit logs Why cluster ? Redundancy and fault tolerance Horizontal scalability Improves reads and writes. Better network usage & disk IO Controller special broker

Alright, we have seen zookeeper, now lets talks about brokersBrokers are the nodes which run kafka process on itBrokers store commit logs for topics/partitionsBrokers register themselves with zookeeper when they startMultiple brokers create a kafka clusterCluster is good because they help in redundancy (replica), fault tolerance Default replication policy which we have is 2, so we can afford one node failureCan be horizontally scaled, as and when you want to expand clusters you need to add more brokers. You can have better network usage and disk IO with multiple machines Controller is a broker with additional responsibilityController is the brain of the cluster its a state machines. We keep the data in zookeeper, when there is a state change controller acts on itController manages the brokers, take care of the partitions and replications and does administrative tasks.

Kafka Message Distributed partition replicated commit log. Messages Fixed size Header Variable length Payload (byte array) Payload can have any serialized data. Linkedin uses Avro Commit Logs Stored in sequence file under folders named with topic name contains sequence of log entries

As said earlier in Kafka message is a discrete unit of dataMessages are stored in commit logsCommit logs are distributed, partitioned and replicatedMessage contains headers and payloadHeader contains information like, size of the payload, crc32 check sum, compression used (snappy or gzip)Leaving payload as byte array, give a lot of flexibility In Linkedin our messages are avro formatted messageCommit logs are in stored in sequence filesSequence files are stored under the folder named after topic-partition Sequence files contains logs entries

Kafka Message - continued Logs Log entry (message) have 4 byte header and followed N byte messages offset is a 64 byte integer offset give the position of message from the start of the stream on disk log files are saved as segment files segment files are named with the first offset message in that file. E.g. 00000000000.kafka

The header size is 4 byteThe payload can be of variable size. We at Linkedin caps it at 1 MB Messages in the commit log is identified using offset number An offset number is 64 byte, it represents the position of the message from the start of the stream. Ie start of that topic partition Segments are named after the first offset in the segment

Kafka Message - continued Write to logs Appends to the latest segment file OS flushes the messages to disk either based on number of messages or time Reads from logs Consumer provides offset & a chunk size Returns an iterator to iterate over the message set On failure, consumers can start consuming from either the start of the stream or from latest offset

Write happens to the tail end of the latest segmentMessages are written to OS page cache and its flushed to disk either based on the number of messages or a period of timeWhile reading from log consumers provide the offset number and chunk sizeKafka returns an iterator, which contains a message setIdeally the chunk size will have multipl