Kafka blr-meetup-presentation - Kafka internals

Download Kafka blr-meetup-presentation - Kafka internals

Post on 26-Jan-2017




2 download


Kafka - Linkedins messaging backbone

Kafka InternalsAyyappadas Ravindran Linkedin Bangalore SRE Team

Introduction Who am I ? Ayyappadas Ravindran Staff SRE in Linkedin Responsible for Data Infra Streaming team What is this talk about ? Kafka building blocks in details Operating Kafka Data assurance with KafkaKafka 0.9

Agenda Kafka Reminder ! Zookeeper Kafka Cluster Brokers Kafka Message Producers Schema Registry Consumers Data Assurance What is new in Kafka (Kafka 0.9) Q & A


Kafka Pub/Sub Basics Reminder !Broker


AP1 AP0 AP0ConsumerProducerZookeeper

Kafka is a publish-subscribe messaging system, in which there are four components:- Broker (what we call the Kafka server)- Zookeeper (which serves as a data store for information about the cluster and consumers)- Producer (sends data into the system)- Consumer (reads data out of the system)

Data is organized into topics (here we show a topic named A) and topics are split into partitions (we have partitions 0 and 1 here).A message is a discrete unit of data within Kafka. Producers create messages and send them into the system. The broker stores them, and any number of consumers can then read those messages.

In order to provide scalability, we have multiple brokers. By spreading out the partitions, we can handle more messages in any topic.This also provides redundancy. We can now replicate partitions on separate brokers. When we do this, one broker is the designated leader for each partition. This is the only broker that producers and consumers connect to for that partition. The brokers that hold the replicas are designated followers and all they do with the partition is keep it in sync with the leader.

When a broker fails, one of the brokers holding an in-sync replica takes over as the leader for the partition. The producer and consumer clients have logic built-in to automatically rebalance and find the new leader when the cluster changes like this. When the original broker comes back online, it gets its replicas back in sync, and then it functions as the follower. It does not become the leader again until something else happens to the cluster (such as a manual change of leaders, or another broker going offline).4


Distributed coordination serviceAlso used for maintaining configurationGuaranteesOrderAtomicity ReliabilitySimple APIHierarchical NamespaceEphemeral NodesWatches

In the previous slide we have seen that zookeeper is an integral part of Kafka echo systemSo lets see what is zookeeper, Zookeeper is a distributed coordination service for distributed applicationZookeeper is also used for configuration maintenanceZookeeper exposes simple APIs, using which application can build high level coordination service Zookeeper guarantees ordering, atomicity and reliability Zookeeper is implemented using a shared hierarchal name space. Implemented in the model of a shared Linux file system Every node in zookeeper is called a znodeZnode is similar to file & it stores data, it has ACL and stat informationTwo important concepts in Zookeeper echo system is Ephemeral Nodes & WatchesEphemeral node exists as long as the session that created the ephemeral node existsClient can set watches on Znode, client is informed when there is a change in znodeNow lets quickly see coordination service in action. A leader election Consider that you have multiple clients competing to become the leader. The challenge is on how to elect a leader Zookeeper can be used for leader election Znodes are created by setting SEQUENCE and EPHEMERAL flagBy setting SEQUENCE flag, each node is created with a monotonically increasing number to the end of the pathThe client which manages to create the znode with lowest SEQUENCE ID is elected as leader The znode created is an ephemeral znode, so the znode exists as long as the leader exists.


Zookeeper in Kafka ecosystem Used to store metadata information About brokers About topics & partitions Consumers / Consumer groups Service coordination Controller election For administrative tasks

Ref : https://cwiki.apache.org/confluence/display/KAFKA/Kafka+data+structures+in+Zookeeper

Now Lets see how is zookeeper used in Kafka environmentKafka uses zookeeper both for storing configuration information and also for coordination service (Leader election & executing administrative tasks )Zookeeper is used to store meta data information of broker, topics, consumers When a broker comes live, it registers itself with ZK. Creates a znode & stores the broker ID, hostname and end point details in znodeTwo type of topic related information is stored in ZK. One is the broker related topic information. Which broker hosts which topic/partition & replication information and have information about which replicas are leader and which are followersSecond information related to topic is the config information, this stores per topic configuration information like, retention, clean-up policies etcZookeeper also stores consumer information, like which consumers are consuming from which partition and till what data (in the log) a consumer has consumed , i.e offset informationComing to the co-ordination service. One of the brokers in Kafka cluster, assigns the role of controller. The controller is responsible for managing the state of brokers, partition and replicas. Controller also performs the administrative tasks.Controller election is done using zookeeper.

Zookeeper at LinkedinWe are running Zookeeper 3.4Cluster of 5 (participants) + 1 (observer)Network and power redundancyTransaction logs on SSD. Lesson Learned : Do not over build your cluster

We run zookeeper on 3.4. Its not just used for Kafka but also for other critical applications in Linkedin.We have a cluster size of 5 +1, where 5 are voting members and 1 non-voting member called observer. Primary role of observer is for disaster recovery and also helps in read scalability. We make sure the nodes are in different racks. This is for ensuring power redundancy, we have bond0 (balance-rr bonding). This provides load balancing and fault tolerance. If your system is write heavy its good to have better disk performance, we have SSD for keeping transaction logs. Or at least have a separate drive for transactions logs other than the drive which is used for applications logs and snapshots.Do not over build your cluster, as the cluster size increases the latency for ZK writes transactions increases

Kafka Cluster - Brokers

Brokers Runs Kafka Stores commit logs Why cluster ? Redundancy and fault tolerance Horizontal scalability Improves reads and writes. Better network usage & disk IO Controller special broker

Alright, we have seen zookeeper, now lets talks about brokersBrokers are the nodes which run kafka process on itBrokers store commit logs for topics/partitionsBrokers register themselves with zookeeper when they startMultiple brokers create a kafka clusterCluster is good because they help in redundancy (replica), fault tolerance Default replication policy which we have is 2, so we can afford one node failureCan be horizontally scaled, as and when you want to expand clusters you need to add more brokers. You can have better network usage and disk IO with multiple machines Controller is a broker with additional responsibilityController is the brain of the cluster its a state machines. We keep the data in zookeeper, when there is a state change controller acts on itController manages the brokers, take care of the partitions and replications and does administrative tasks.

Kafka Message Distributed partition replicated commit log. Messages Fixed size Header Variable length Payload (byte array) Payload can have any serialized data. Linkedin uses Avro Commit Logs Stored in sequence file under folders named with topic name contains sequence of log entries

As said earlier in Kafka message is a discrete unit of dataMessages are stored in commit logsCommit logs are distributed, partitioned and replicatedMessage contains headers and payloadHeader contains information like, size of the payload, crc32 check sum, compression used (snappy or gzip)Leaving payload as byte array, give a lot of flexibility In Linkedin our messages are avro formatted messageCommit logs are in stored in sequence filesSequence files are stored under the folder named after topic-partition Sequence files contains logs entries

Kafka Message - continued Logs Log entry (message) have 4 byte header and followed N byte messages offset is a 64 byte integer offset give the position of message from the start of the stream on disk log files are saved as segment files segment files are named with the first offset message in that file. E.g. 00000000000.kafka

The header size is 4 byteThe payload can be of variable size. We at Linkedin caps it at 1 MB Messages in the commit log is identified using offset number An offset number is 64 byte, it represents the position of the message from the start of the stream. Ie start of that topic partition Segments are named after the first offset in the segment

Kafka Message - continued Write to logs Appends to the latest segment file OS flushes the messages to disk either based on number of messages or time Reads from logs Consumer provides offset & a chunk size Returns an iterator to iterate over the message set On failure, consumers can start consuming from either the start of the stream or from latest offset

Write happens to the tail end of the latest segmentMessages are written to OS page cache and its flushed to disk either based on the number of messages or a period of timeWhile reading from log consumers provide the offset number and chunk sizeKafka returns an iterator, which contains a message setIdeally the chunk size will have multiple messagesThere can be a corner case in which the message is larger than the chunk size provided. In that case, the consumer doubles its chunk size and retriesOn consumer failures (here failure means consumer trying to fetch an offset which doesnt exits), that is when consumer tries to read an offset which doesnt exists, consumer can fail or consumer has the option to reset offset to start or current offset

Message Retention Kafka retains and expires messages via three options Time-based (the default, which keeps messages for at least 168 hours) Size-based (configurable amount of messages per-partition) Key-based (one message is retained for each discrete key)

Time and size retention can work together, but not with key-based With time and size configured, messages are retained either until the size limit is reached OR the time limit is reached, whichever comes first

Retention can be overridden per-topic Use the kafka-topics.sh CLI to set these configs

Now you have the commit logs which keep on getting data, at one point in time this is gonna fill you disk, so you need a retention policy to rotate and purge the logKafka provides two clean up policies, You can either rotate the logs or compact the logsRotation can happen based on time or sizeLog compaction is interesting, here we dont purge an entire segment of log. Instead we remove logs having the same key and just retain the latest entry. Compaction can only happen with sematic partitioning You can have per topic retention policy. Kafka ships a CLI, which can be used to set this value

Kafka Producer Producer publishes message to topic metadata.broker.list serializer.class partitioner.class request.required.acks (0,1,-1) topics Partition strategy DefaultPartitioner Round Robin DefaultPartitioner with Keyed messages Hashing

Ref : https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example

Application which writes data into kafka topic is called producerProducer code need to be given the details of the broker from where the producer can fetch meta data. Meta data contains the information about the brokers and broker ID where leader partition for the topic residesSerialization class is pluggable, you can specify encoder class. In linkedin we use avro serializationPartitioner class specifies how the messages should be partitioned Or to which partition a message should be written to. Request.requires.acks specifies whether producer need to wait for an ack from broker or not. It has 3 values, 0-dont wait, 1 get at least ack from leader, -1 or all get ack from all followers as well

Kafka Producer - Continued Message Batching Compression (gzip, snappy & lz4) Sticky partition CLI Create a topic bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic newtopic --replication-factor 1 --partitions 1

Produce messages bin/kafka-console-producer.sh broker-list localhost:9092 -topic newtopic

Ref : https://cwiki.apache.org/confluence/display/KAFKA/Clients

When producer sends messages to broker, you can ask producer to batch multiple messages and send it in one go. This way you can compress the messages and also lesser overhead in terms of creating connections to the brokerDifferent type of compression are supported like gzip, snappy and lz4Sticky partition is specific to Linkedin, this make sure that we are sending messages only to one partition for a given period of time. This way we can reduce the connection count

Schema Registry

So we talked about messagesIn Linkedin we send messages in avro formatAvro message contains schema of the data and data. Data is stored in binary format (serialized)Linkedin custom producer adds extra information to each messages for tracking and auditing purpose To save on storage and on n/w. Schema is stripped off from the message and is stored in a centralized location. Message has a schema ID to retrieve the schemaWhen the consumer wants to read the data it retrieves the schema from schema registry and reads the messageSchema is caches locally so as to reduce load on schema registryWe dont want to break existing consumers so old backward compactible schemas are also stored in schema registry

Kafka consumer Consumer are the processes subscribed to a topic and that processes the feedsHigh level consumer multi threaded manages offset for you Simple consumer Greater control over consumption Need to manage offset Need to find broker for leader partition

Consumers are the process responsible for consuming from the topics. They subscribe to a topic will consume the messages and will process themKafka offers consumer abstraction called Consumer Group. A consumer group will have one or more consumer instances. Multiple consumer instances label themselves as consumer groupTraditionally consumers work either in queue mode or pub-sub mode. In queue mode each message is send to one consumer instance. In pub-sub mode, a message is send to all instance.Messaging system guarantee ordering, but when delivered to multiple consumers asynchronously the messages may not be received in order. The work around is to use one single consumer, but in this approach there wont be any parallel consumption. Kafka does this via partitions. For an N partition topic you can have N consumers, this way one consumer will be consuming from one partition. This guarantee ordering and with multiple partition you can get parallelism. High level consumers are multithreaded, manages offset for you. Has consumer groups. Does rebalancing, when new consumer instance joins or leaves consumer groupSimple consumer provides you greater control. You can read a subset of partitions, the messages can be read repeatedly. The drawback is that you need to deal with the offsets and need to find leader partitions

Kafka Consumer -- continued Important options to provide while consuming Zookeeper details Topic name Where to start consuming (from beginning or from the tail) auto.offset.reset group.id auto.commit.enable (true) console consumer Helps in debugging issues & can be used inside application bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic mytopic --from-beginning

Consumers keeps the offset information in zookeeper. This is the point till where a particular consumer has consumed. In case if that consumer thread dies, it exactly knows from where need to consume againObviously you need to tell consumers from which topic it need to consumeYou have an option to start consuming at a particular offset, or from start or end of the stream Auto.offset.reset, remember mentioning that consumers store offset in zookeeper. Say in case if the consumer was not able to fetch the zookeeper or the consumer provided an offset which doesnt exists, what should be the default behavior, whether the consumer should consume from the tail end or beginning. This is controlled by offset.restConsumer group is an abstraction, it help the consumer instance to consume message either in a queuing fashion or in pub-sub mode. Group.id is used to set consumer gourp name. Takes a stringAuto.commit.enabled : consumer store the value of offset which they have consumed. By enabling this consumer automatically sets offset String that represents consumer group, should be unique

Basic Kafka operations Add a topic bin/kafka-topics.sh --zookeeper zk_host:port/chroot --create --topic newtopic --partitions 10 --replication-factor 3 --config x=y Modify topic bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic newtopic partitions 20 beware this may impact semantically partitioned topic Modify configuration bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic newtopic --config x=y Delete configurations bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic newtopic --deleteConfig x

1. Kafka ships command line tools to manage the Kafka clusters. These tools are used for maintenance and debug, we will quickly go through this

Basic Kafka operations -- continued DO NOT DELETE TOPICS ! Though you have an option to do that What happens when a broker dies ? Leader fail over corrupted index / log files URP Uneven leader distribution Preferred replica election bin/kafka-preferred-replica-election.sh --zookeeper zk_host:port/chroot or auto.leader.rebalance.enable=true

Now lets see the operational challenges when a broker diesKafka does the leader fail over, so one of the followers in ISR becomes the new leaderYou will end-up having corrupt index/log filesYou will end-up having under replicated partitions. Obviously, since you lost one of the replicasKafka takes care of the corrupt index/log files. It discards incomplete log entryURPs are fixed when the broker comes up. Point to remember is that the replicas will come back as follower This creates a challenge, now you have uneven leader distribution across your clusterKafka ships CLI, using which you can rebalance the leader distribution There is also an option to automatically do, but its not very clean

Adding a broker20BrokersConsumersProducers





-Partition Reassignment-Broker leveling script moves data to even out data volume per broker

As I mentioned when you add a broker to a cluster it wont be used by existing partitions. With the 0.8.1 release of Kafka there is a new feature, Partition Reassignment! Now when you add a broker to the cluster, it can be used by your existing topics and partitions! Existing partitions can be moved around live and be completely transparent to all consumers and producers. We have developed a tool sits on top of the partition reassignment tool that will balance a cluster after you add new brokers, or if your cluster is simply unbalanced(there are many ways you can wind up in this state). What it does is it goes out to each broker and figures out how big each partition is(on disk), and the total amount of storage used on each broker. Next it starts calling the partition reassignment tool to make the larger brokers smaller, and the smaller brokers larger. It stops once the overall datasize is within 1GB between the smallest and largest brokers. This is just one example of the many possible ways to optimize a cluster with the partition reassignment tool. 20

Kafka operations continued Expanding Kafka cluster Create a brokers with new broker ID Will not automatically move topics to new brokers Admin need to initiate the move Generate the plan : bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --topics-to-move-json-file topics-to-move.json --broker-list "5,6" generate Execute the plan : bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json execute Verify the execution : bin/kafka-reassign-partitions.sh --zookeeper localhost:2181 --reassignment-json-file expand-cluster-reassignment.json --verify

Expanding kafka cluster is very simple. You need to create brokers with unique broker ID and start Kafka server. The server will be automatically added to the cluster But on adding new brokers wont trigger automatic balancing in kafka cluster. An admin need to move topic to the new broker. Only the initiation is manual, the process is automated

Data Assurance No data loss or no reordering Critical for applications like DB replication Can Kafka do this ? Yes ! Cause of data loss on producer side setting block.on.buffer.full=false retires exhausting sending messages with out ack=all How can you fix ? set block.on.buffer.full=true set retired to Long.MAX_VALUE set acks to all have resend in your call back function (producer.send(record, callback))

Is not a big deal in application like pageview event tracking, Loss of one or two messages in a million messages is fineBecomes critical for applications like DB replication and transactions which involve money Where all can the loss happen, it can happen on producer end , consumer side and broker side Lets see issues on each sideCause of data loss on producer endSetting block on buffer full to false will throw and error and discards the messagesAnother cause can be number of retries being exhausted. Whether you are running cluster in asyn mode or syn mode. In sync mode are you waiting for a commit message from all replicas or not ?When setting block on buffer full as true, producer wont take any more message when its buffer is full If you set retries to long.Max_value, it will retry for 2^63 -1 times Set ack to all

Data Assurance - Continued Cause of data loss on consumer side offsets are carelessly committed data loss can happen if consumer committed the offset, but died while processing the message Fixing data loss on consumer side commit offset only after processing of the message is completed disable auto.offset.commit Fixing on Broker Side have replication factor >= 3 have min.isr 2 disable unclean leader election

Cause of data on consumer end is because you are careless ! Just kidding This can happen if you consume the messages and commit the offset before really processing the message. You can have failure during processing How do you fix this ?One commit offset after processing the messageDisable auto.offset.commit

Data Assurance - Continued Message reordering If more than one message is in transit and also retry is enabled Fixing message reordering set max.in.flight.requests.per.connection=1

Kafka 0.9 (Beta release) Security Kerberos or TLS based authentication Unix like permission to restrict who can access data Encryption on the wire Via SSL Kafka Connect support large-scale real-time import and export for Kafka takes care of fault tolerance, offset management and delivery management will be supporting connectors for Hadoop and database User defined quota To manage abusive clients rate limit traffic or producer side and consumer side

Data need to be moved in an out of Kafka and other systemsPeople uses multiple solutions,

Kafka 0.9 (Beta release) Allows only 10MBps for read and 5MBps for write If clients violate, slows down Can be overridden New Consumer Removes distinction between high level consumer and simple consumer Unified consumer API No longer zookeeper dependent Offers pluggable offset management

How Can You Get Involved?http://kafka.apache.org

Join the mailing listsusers@kafka.apache.org

irc.freenode.net - #apache-kafka


So how can you get more involved in the Kafka community?

The most obvious answer is to go apache.kafka.org. From there you canJoin the mailing lists, either on the development or the user sideYou can also dive into the source repository, and work on and contribute your own tools back.

Kafka may be young, but its a critical piece of data infrastructure for many of us.27

Q & AWant to contact us ?

Akash Vacher (avacher@linkedin.com)Ayyappadas Ravindran (appu@linkedin.com)Talent Partner : Syed Hussain (sshussain@linkedin.com)Mob : +91 953 581 8876