bridging the gap: connecting aws and kafka

Post on 07-Jan-2017

661 Views

Category:

Engineering

3 Downloads

Preview:

Click to see full reader

TRANSCRIPT

Bridging the Gap:Connecting AWS and KafkaRyanne Dolan & Jason LiLinkedIn

Overview

● Motivation● What is Kinesis?● Architecture● Checkpointing● Metrics● DynamoDB Streams● Validation

Story starts at Bizo

● Originally batch-oriented architecture: AWS S3, Spark● Started converting to stream-oriented architecture in 2013: AWS Kinesis● Acquired by LinkedIn in 2014● Originally batch-oriented integration with LinkedIn’s data centers● Started bridge project in 2015● Other teams (including other startups) have started using it

Use Cases

● User-tracking events from Bizo Kinesis -> LI Kafka● Leverage LI A/B tooling from AWS (requires Kafka events)● Leverage LI ELK stack from AWS (Kinesis -> Kafka -> ELK)● Leverage LI call tracing, site speed, and other metrics, reports● Ship application and CloudWatch metric from AWS to LI (Kinesis -> Kafka -

> inGraph)

Requirements

● encoding agnostic (Thrift, Avro, JSON, etc)● to/from Kafka● to/from Kinesis● preserve partitioning● near real-time bidirectional replication● stream multiplexing and joining● support multiple AWS accounts● multiple metrics endpoints (including CloudWatch)

Kinesis Highlights

● VERY similar to Kafka● Dynamic, scalable “shards” (instead of static partitions)● Throughput and cost tied to # of shards● Each shard:

○ $0.015/hour○ 1000 records/second○ 1MB/s ingress○ 2MB/s egress

● Integration with AWS services, e.g. “Firehose” -> S3

Kinesis-Kafka Bridge

Kinesis-Kafka Bridge

● Samza job● Kinesis System (Producer and Consumer)● BridgeTask● input/output mapping● Repartitioner● Transcoder

BridgeTask

Kinesis Consumer

Repartitioner Transcoder

Kinesis Producer

Kafka Consumer

Bridge Mapping

Kafka Producere.g. Partition 1 ->

shard-000001e.g. Thrift -> Avro e.g. llama.abc ->

kafka.xyz

Transcoders

Thrift-encoded bytes

AvroEnvelope Transcoder

AvroEnvelope with Thrift-encoded payload

AvroSerde Avro-encoded bytes

Bridge Mappings

BridgeTask again (notice multiple outputs!)

Kinesis Consumer

Repartitioner Transcoder

Kinesis Producer

Kafka Consumer

Bridge Mapping

Kafka Producere.g. Partition 1 ->

shard-000001e.g. Thrift -> Avro e.g. llama.abc ->

kafka.xyz

BridgeTask

The Kinesis System

KinesisConsumer

● wraps the Kinesis Consumer Library (KCL)● extends Samza’s BlockingEnvelopeMap● creates one KCL Worker per Shard● at least one Worker per KinesisConsumer instance● Workers push envelopes into queue

KinesisProducer

● uses Kinesis PutRecords API (batch)● enqueues envelopes (async)● flushes off-thread

KinesisSystemAdmin

● queries Kinesis API for # shards at start-up● tells Samza: # partitions == # shards● # shards may change at any time, but OK in practice● KCL will load-balance Workers automatically

The Checkpointing Problem

Checkpointing

● TWO sources of checkpoints: Samza and KCL○ Samza checkpoints to a Kafka topic○ KCL checkpoints to a DynamoDB table○ similar semantics

● both systems must agree● otherwise, possible DATA LOSS

Checkpointing Data Loss

1. KCL consumes a record2. KCL checkpoints3. Bridge replays the record to Kafka4. container crashes before Kafka buffer flushed5. container restarts6. KCL restarts at checkpoint

--> buffered records lost

Checkpointing Solution

Checkpoint Kinesis Records only after they are flushed to Kafka.

Checkpoint Kafka Records only after they are flushed to Kinesis.

Producers must notify Consumers when it is safe to checkpoint.

Consumers must be able to request and wait for a Producer flush.

CheckpointableEnvelope

● KinesisConsumer registers onSynced listener● BridgeTask registers listeners for each output stream● KafkaProducer fires event after successful flush● envelope is checkpointed only after ALL output streams have flushed● each individual envelope is tracked this way, but...● checkpoints only occur at sentinel envelopes at the end of each

GetRecords batch● (for non-sentinels, onSynced is a noop)

CheckpointableEnvelope

SyncedProducerProxy wraps KafkaProducer

Metrics

Two Stacks...

● Bizo infra is on AWS CloudWatch, including metrics, alarms, paging● LinkedIn has inGraphs for same purpose

Need to be able to monitor the bridge from both.

https://engineering.linkedin.com/32/eric-intern-origin-ingraphs

a custom MetricTracker

● publishes metrics to CW and inGraphs● locally aggregates metrics to minimize API calls● each metric has dimensions:

○ shard○ partition○ stream○ system

● each metric re-published with hierarchy of dimensions

shard-level metrics

partition-level metrics

stream-level metrics

system-level metrics

application-level metrics

heartbeat metrics

(due to 1 minute buffer)

30sec RTT

DynamoDB Stream to Kafka Bridge

Motivation

● Some of our services are running on AWS, e.g. video transcoding● We want to replicate AWS data in LinkedIn data center

○ Serve requests from LinkedIn data center directly○ Migrate off AWS easily

What is DynamoDB Stream

“A DynamoDB stream is an ordered flow of information about changes to items in an Amazon DynamoDB table. When you enable a stream on a table, DynamoDB captures information about every modification to data items in the table.”AWS documentation

Example DynamoDB Stream Record

{

"EventID":"f561f0491ce42a95a60ad1fc082ae98b",

"EventName":"MODIFY",

"EventVersion":"1.0",

"EventSource":"aws:dynamodb",

"AwsRegion":"us-east-1",

"Dynamodb":{

"Keys":{

"uuid":{

"S":"255"

}

},

"NewImage":{

<json representation of new image>

},

"OldImage":{

<json representation of old image>

},

"SequenceNumber":"593721700000000002066915768",

"SizeBytes":326,

"StreamViewType":"NEW_AND_OLD_IMAGES"

}

}

DynamoDB Stream Bridge Design

DynamoDB Stream Record to Kafka Message

● Concatenate sorted DynamoDB keys as Kafka partition key● Put the DynamoDB Stream record in Kafka message. e.g.

{'kafkaMessageSegmentHeader': None, 'payload': '{"EventID":"f5b5e336f056f2656b23bfeed3cd45c8","EventName":"MODIFY","EventVersion":"1.0","EventSource":"aws:dynamodb","AwsRegion":"us-east-1","Dynamodb":{"Keys":{"RecordId":{"N":"0"}},"NewImage":{"ReadableTime":{"S":"Wed Feb 17 01:13:02 UTC 2016"},"RecordId":{"N":"0"},"Timestamp":{"N":"1455671582888"}},"OldImage":{"ReadableTime":{"S":"Wed Feb 17 01:13:02 UTC 2016"},"RecordId":{"N":"0"},"Timestamp":{"N":"1455671582394"}},"SequenceNumber":"29102800000000002523034570","SizeBytes":141,"StreamViewType":"NEW_AND_OLD_IMAGES"}}'}

Use Cases

● Replicate rich media platform video transcoding metadata to LinkedIn data center (DynamoDB Stream -> Kafka -> Espresso)

Kinesis vs DynamoDB Stream

Validation

validation pipeline

Summary

With the ability to ship data from AWS Stream to LinkedIn Kafka and vice versa using Samza, we can now seamlessly integrate AWS with LinkedIn.

Q & A

top related