ELK at LinkedIn - Kafka, scaling, lessons learned

Download ELK at LinkedIn - Kafka, scaling, lessons learned

Post on 19-Jul-2015




6 download


ELK @ LinkedInScaling ELK with KafkaIntroductionTin Le (tinle@linkedin.com)Senior Site Reliability Engineer

Formerly part of Mobile SRE team, responsible for servers handling mobile apps (IOS, Android, Windows, RIM, etc.) traffic.

Now responsible for guiding ELK @ LinkedIn as a whole50+ % of site traffic come in via mobile

ProblemsMultiple data centers, ten of thousands of servers, hundreds of billions of log recordsLogging, indexing, searching, storing, visualizing and analysing all of those logs all day every daySecurity (access control, storage, transport)Scaling to more DCs, more servers, and even more logsARRGGGGHHH!!!!!

Many applications. Mobile frontend logs: average 2.4TB size (3+ billion docs).

SolutionsCommercialSplunk, Sumo Logic, HP ArcSight Logger, Tibco, XpoLog, Loggly, etc.Open SourceSyslog + GrepGraylogElasticsearchetc.

Evaluated, used by some teams. Some Acquisitions use commercial solutions.Commercial solutions cost prohibitive.

CriteriasScalable - horizontally, by adding more nodesFast - as close to real time as possibleInexpensiveFlexibleLarge user community (support)Open source

ELK!The winner is...Splunk ???ELK at LinkedIn100+ ELK clusters across 20+ teams and 6 data centersSome of our larger clusters have:Greater than 32+ billion docs (30+TB)Daily indices average 3.0 billion docs (~3TB)Expect storage size to increase as we migrate to using doc_valuesELK + KafkaSummary: ELK is a popular open sourced application stack for visualizing and analyzing logs. ELK is currently being used across many teams within LinkedIn. The architecture we use is made up of four components: Elasticsearch, Logstash, Kibana and Kafka.

Elasticsearch: Distributed real-time search and analytics engineLogstash: Collect and parse all data sources into an easy-to-read JSON formatKibana: Elasticsearch data visualization engineKafka: Data transport, queue, buffer and short term storage

Leveraging open source stack. Large community.Leveraging common data transport. Rock solid, proven, dedicated support team.

What is Kafka?Apache Kafka is a high-throughput distributed messaging systemInvented at LinkedIn and Open Sourced in 2011Fast, Scalable, Durable, and Distributed by DesignLinks for more:http://kafka.apache.orghttp://data.linkedin.com/opensource/kafka

Fast : a single Kafka broker can handle hundreds of megabytes of reads and writes per sec from thousands of clients.Scaleable: Kafka is designed to allow a single cluster to serve as the central data backbone for a large organization. It can elastically and transparently expanded without downtime.Data streams are partitioned and spread over a cluster of machines to allow data streams larger than the capability of any single machine and to allow clusters of coordinated consumers.Durable: Messages are persisted on disk and replicated within the cluster to prevent data loss. Each broker can handle terabytes of messages w/o performance impact.Distributed by Design: Kafka has a modern cluster-centric design that offers strong durability and fault-tolerance guarantees.

Kafka at LinkedInCommon data transportAvailable and supported by dedicated team875 Billion messages per day200 TB/day In700 TB/day OutPeak Load10.5 Million messages/s18.5 Gigabits/s Inbound70.5 Gigabits/s OutboundThese numbers are from LinkedIn Kafka presentation at Apache Con 2015.Over 1100 brokers over 50+ clustersOver 32000 topicsOver 350 thousand partitions, not including replication.

Logging using Kafka at LinkedInDedicated cluster for logs in each data centerIndividual topics per applicationDefaults to 4 days of transport level retentionNot currently replicating between data centersCommon logging transport for all services, languages and frameworksLog retention long enough for 3 days weekend. So we can re-index from Kafka if encounter issues.Dedicated log clusters to isolate traffic from other clusters.Two years ago, used our own kafka input plugin. Then switched to using KCC via pipe input for performance reason.Monitoring: . Its important to monitor logstash nodes. LS has a bug where an error in any of its input, filter or output will stop the entire LS process. See metrics filter config file at end of these slides.

ELK Architectural ConcernsNetwork ConcernsBandwidthNetwork partitioningLatencySecurity ConcernsFirewalls and ACLsEncrypting data in transitResource ConcernsA misbehaving application can swamp production resources

Weve chosen to keep all of our clients local to the clusters and use a tiered architecture due to several major concerns.The primary concern is around the networking itself. Kafka enables multiple consumers to read the same topic, which means if we are reading remotely, we are copying messages over expensive inter-datacenter connections multiple times. We also have to handle problems like network partitioning in every client. Granted, you can have a partition even within a single datacenter, but it happens much more frequently when you are dealing with large distances. Theres also the concern of latency in connections distance increases latency. Latency can cause interesting problems in client applications, and I like life to be boring.There are also security concerns around talking across datacenters. If we keep all of our clients local, we do not have to worry about ACL problems between the clients and the brokers (and Zookeeper as well). We can also deal with the problem of encrypting data in transit much more easily. This is one problem we have not worried about as much, but it is becoming a big concern now.The last concern is over resource usage. Everything at LinkedIn talks to Kafka, and a problem that takes out a production cluster is a major event. It could mean we have to shift traffic out of the datacenter until we resolve it, or it could result in inconsistent behavior in applications. Any application could overwhelm a cluster, but there are some, such as applications that run in Hadoop, that are more prone to this. By keeping those clients talking to a cluster that is separate from the front end, we mitigate resource contention.

Multi-colo ELK ArchitectureELK Dashboard#ServicesELK SearchClustersLogTransportKafkaELK Search ClustersLinkedIn ServicesDC1ServicesKafkaELK Search ClustersDC2ServicesKafkaELK Search ClustersDC3TribesCorp Data CentersFor security reasons, data/logs generated in each DC stays there. Indexed by local ELK cluster. Aggregated views via Tribe nodes.All logstash use common filters to catch most common data leakage.How services log to Kafka - imposed common logging library. All services use common library, which automatically log to Kafka, WARN or above.

ELK Search ArchitectureKibanaElasticsearch(tribe)KafkaElasticsearch(master)LogstashElasticsearch(data node)LogstashElasticsearch(data node)

UsersGeneral architecture for each ELK cluster.Dedicated masters.Tribe client node (HTTP services).

Operational ChallengesData, lots of it.Transporting, queueing, storing, securing, reliabilityIngesting & Indexing fast enoughScaling infrastructureWhich data? (right data needed?)Formats, mapping, transformationData from many sources: Java, Scala, Python, Node.js, Go

Data - reliable transport, storing, queueing, consuming, indexing.Some data (java service logs for example) not in right format.Solutions to DataKafka as transport, storage queue, backbone.More logstash instances, more Kafka partitions.Using KCC we can consume faster than ES can index.To increase indexing speedMore ES nodes (horizontal).More shards (distribute work)Customized templates

Operational Challenges...Centralized vs Siloed Cluster ManagementAggregated views of data across the entire infrastructureConsistent view (trace up/down app stack)Scaling - horizontally or vertically?Monitoring, alerting, auto-remediatingUsing local Kafka log clusters instead of aggregated metrics.Tribe to aggregate clusters.Use internal tool call Nurse to monitor and auto-remediate (restarts) hung/dead instances of LS and ES.

The future of ELK at LinkedInMore ELK clusters being used by even more teamsClusters with 300+ billion docs (300+TB)Daily indices average 10+ billion docs, 10TB - move to hourly indices~5,000 shards per clusterThese numbers are estimate based on growth rate and plans.Beside logs, we have other application use cases internally.

Extra slidesNext two slides contain example logstash configs to show how we use input pipe plugin with Kafka Console Consumer, and how to monitor logstash using metrics filter.

KCC pipe input configpipe { type => "mobile" command => "/opt/bin/kafka-console-consumer/kafka-console-consumer.sh \ --formatter com.linkedin.avro.KafkaMessageJsonWithHexFormatter \ --property schema.registry.url=http://schema-server.example.com:12250/schemaRegistry/schemas \ --autocommit.interval.ms=60000 \ --zookeeper zk.example.com:12913/kafka-metrics \ --topic log_stash_event \ --group logstash1" codec => json }

This is how we use Logstash pipe input plugin to call out to Kafka Console Consumer. This currently give us the highest ingestion throughput.

Monitoring Logstash metricsfilter { metrics { meter => "events" add_tag => "metric" }}output { if metric in [tags] [ stdout { codec => line { format => Rate: %{events.rate_1m} } }}

Its important to monitor logstash nodes. LS has a bug where an error in any of its input, filter or output will stop the entire LS process. You can use Logstash metrics filter to make sure that LS is still processing data. Sometime LS runs but no data goes through. This will let you know when that happens.