type safe, versioned, and rewindable stream processing with apache {avro, kafka} and scala

24
Type-safe, Versioned, and Rewindable Stream Processing with Apache {Avro, Kafka} and Scala -=[ confoo.ca ]=- Thursday February 19th 2015 Hisham Mardam-Bey Mate1 Inc.

Upload: hisham-mardam-bey

Post on 14-Apr-2017

250 views

Category:

Software


0 download

TRANSCRIPT

Type-safe, Versioned, and Rewindable Stream Processing

with Apache {Avro, Kafka} and Scala

-=[ confoo.ca ]=-Thursday February 19th 2015

Hisham Mardam-BeyMate1 Inc.

Overview

● Who is this guy? + quick Mate1 intro● Before message queues● How we use message queues?● Some examples

Who is this guy?

● Linux user and developer since 1996● Started out hacking on Enlightenment

○ X11 window manager● Worked with OpenBSD

○ building embedded network gear● Did a whole bunch of C followed by Ruby● Working with the JVM since 2007● Lately enjoying Erlang and Haskell; FP-

FTW! (=github: mardambeytwitter: codewarrior

Mate1: quick intro

● Online dating, since 2003, based in Montreal● Initially team of 3, around 40 now● Engineering team has 13 geeks / geekettes● We own and run our own hardware

○ fun!○ mostly…

https://github.com/mate1

Some of our features...

● Lots of communication, chatting, push notifs● Searching, matching, recommendations,

geo-location features● Lists of... friends, blocks, people interested,

more● News & activity feeds, counters, rating

Before message queues

● Events via DAOs into MySQL○ More data, more events lead to more latency○ Or build an async layer around DAOs

■ Surely better solutions exist!● Logs rsync’ed into file servers and Hadoop

○ Once every 24 hours● MySQL Data partitioned functionally

○ Application layer sharding● Custom MySQL replication for BI servers

○ Built fan-in replication for MySQL● Data processed through Java, Jython, SQL

Message queues

● Apache Kafka: fast, durable, distributed● Stored data as JSON, in plain text● Mapped JSON to Scala classes manually● Used Kafka + Cassandra a lot

○ low latency reactive system (push, not pull)○ used them to build:

■ near real time data / events feeds■ live counters■ lots of lists

● This was awesome; but we had some issues and wanted some improvements.

Issues / improvements

● Did not want to keep manually marshalling data; potential mistakes -> type safety

● Code gets complicated when maintaining backward compatibility -> versioning

● Losing events is costly if a bug creeps out into production -> rewindable

● Wanted to save time and reuse certain logic and parts of the system -> reusable patterns○ more of an improvement than an issue

Type-safe

● Avoid stringified types, maps (no structure)● Used Apache Avro for serialization:

○ Avro provides JSON / binary ser/de○ Avro provides structuring and type safety

● Mapped Avro to Java/Scala classes● Effectively tied:

○ Kafka topic <-> Avro schema <-> POJO● Producers / consumers now type-safe and

compile time checked

Versioning, why?

● All was fine… until we had to alter schemas!● Distributed producers means:

○ multiple versions of the data being generated● Distributed consumers means:

○ multiple versions of the data being processed● Rolling upgrades are the only way in prod● Came up with a simple data format

Simple (extensible) data format

● magic: byte identifying data format / version● schemaId: version of the schema to use● data: plain text / binary bytes

○ ex: JSON encoded data● assumption: schema name = Kafka topic

---------------------

| magic | 1 byte |

| schemaId | 2 bytes |

| data | N bytes |

---------------------

Schema loading

● Load schemas based on:○ Kafka topic name (ex: WEB_LOGS, MSG_SENT, ...)○ Schema ID / version (ex: 0, 1, 2)

● How do we store / fetch schemas?○ local file system○ across the network (database? some repository?)

● Decided to integrate AVRO-1124○ a few patches in a Jira ticket○ not part of mainstream Avro

Avro Schema Repository & Resolution

● What is an Avro schema repository?○ HTTP based repo, originally filesystem backed

● AVRO-1124: integrated (and now improved)○ Back on Github (Avro + AVRO-1124)

■ https://github.com/mate1/avro○ Also a WIP fork into a standalone project

■ https://github.com/schema-repo/schema-repo● Avro has schema resolution / evolution

○ provides rules guaranteeing version compatibility○ allows for data to be decoded using multiple

schemas (old and new)

Rolling upgrades, how?

● Make new schema available in repository● Rolling producer upgrades

○ produce old and new version of data● Rolling consumer upgrades

○ consumers consume old and new version of data● Eventually...

○ producers produce new version (now current)○ consumers consume new version (now current)

Rewindable

● Why?○ Re-process data due to downstream data loss○ Buggy code causes faulty data / statistics○ Rebuild downstream state after system crash or

restart● How?

○ We take advantage of Kafka design○ Let’s take a closer look at that...

Kafka Consumers and Offsets

● Kafka consumers manage their offsets○ Offsets not managed by the broker○ Data is not deleted upon consumption○ Offsets stored in Zookeeper, usually (<= 0.8.1.1)

■ This changed with Kafka 0.8.2.0! Finally!● Kafka data retention policies

○ time / size based retention○ key based compaction

■ infinite retention!● Need to map offsets to points in time

○ Allows for resetting offsets to a point in time

Currently, manual rewinding

● 2 types of Kafka consumers:○ ZK based, one event at a time○ MySQL based, batch processing

■ Kafka + MySQL offset store + ZFS = transactional rollbacks

■ Used to transactionally get data into MySQL● Working on tools to automate the process

○ Specifically to take advantage of 0.8.2.0’s offset management API

Reusable

● Abstracted out some patterns, like:○ Enrichment○ Filtering○ Splitting / Routing○ Merging

● Let’s see how we use them...

Reusable

System Events

Hadoop

Device Detection

MySQL

App Events

WebLogConsumer

Enricher

PushNotifConsumer

Router

XMPPConsumer

APNConsumer

GCMConsumer

GeoIP Service

EmailNotifConsumer

Filter

X msgs / hr / user

MTA Service Internet

BatchConsumers

NearRealTimeConsumers Dashboards

Kafka XMPP

Kafka Apple

Kafka Google

Kafka

Kafka Enriched WebLog

InboxCacheConsumer

Redis

Web Servers

Cache

Fin!That’s all folks (=

Thanks!Questions?

Reusable

● Emerging patterns● Enrichment

abstract class Enricher [Input <: SpecificRecord, Output <: SpecificRecord] { def enrich(input: Input): Output }

● Filteringabstract class Filter [Input <: SpecificRecord] { def filter(input: Input): Option[Input]}

Reusable

● More patterns● Splitting

abstract class Splitter2 [Input <: SpecificRecord, Output <: SpecificRecord] { def split(input: Input): Tuple2[Output, Output] }

● Mergingabstract class Merger2 [Input1 <: SpecificRecord, Input2 <: SpecificRecord, Output <: SpecificRecord] {def merge(input1: SpecificRecord, input2: SpecificRecord):Output}

Reusable

● Usage examples:○ Enrich web logs

■ GeoIP■ User-Agent, mobile device details

○ Push notifications message router / scheduler■ OS specific notifications■ A/B tests

○ News feed type systems○ Cache maintenance

■ Users’ inbox, friend lists■ Consumable data by time interval (Redis)

Data Pipeline Diagram (partial)

App servers

Web servers

Other services

EventManager

EventManager

EventManager Kafka

Kafka

Kafka

ZK

ZK

Consumers

Consumers

C*

C*

C*

Play

Play

SOLR

RedisEjabberdEjabberdEjabberd

APN

NRT searchGeo-location

TTL flagstransient data