stream application development with apache kafka

Post on 09-Jan-2017

985 Views

Category:

Software

2 Downloads

Preview:

Click to see full reader

TRANSCRIPT

1Confidential

Stream Application Development withApache® KafkaTM

Matthias J. Sax | Software Engineer

matthias@confluent.io

@MatthiasJSax

2Confidential

Apache Kafka is … ...a distributed streaming platform

Consumers

Producers

Connectors

Processing

3Confidential

Confluent is ...…a company founded by the original creators of Apache Kafka...a distributed streaming platform

• Built on Apache Kafka• Confluent Open Source• Confluent Enterprise

All components but Kafkaare optional to run Confluent.Mix-and-match them as required.

…a company founded by the original creators of Apache Kafka

4Confidential

Kafka Streams is ...… the easiest way to process data in Kafka (as of v0.10)

• Easy to use library

• Real stream processing / record by record / ms latency

• DSL

• Focus on applications

• No cluster / “cluster to-go”

• ”DB to-go”

• Expressive

• Single record transformations

• Aggregations / Joins

• Time, windowing, out-of-order data

• Stream-table duality

• Tightly integrated within Kafka

• Fault-tolerant

• Scalable (s/m/l/xl), elastic

• Encryption, authentication, authorization

• Stateful

• Backed by Kafka

• Queryable / “DB to-go”

• Date reprocessing

• Application “reset button”

5Confidential

Before Kafka Streams

Do-it-yourself stream Processing• Hard to get right / lots of “glue code”• Fault-tolerance / scalability … ???

plain consumer/producer clients

6Confidential

Before Kafka Streams

7Confidential

Before Kafka Streams

Do-it-yourself stream Processing• Hard to get right / lots of “clue code”• Fault-tolerance / scalability … ???

Using a framework• Requires a cluster

• Bare metal – hard to manage• YARN / Mesos

• Test locally – deploy remotely• “Can you please deploy my code?”

• Jar und dependency hell

How does you application interact with you stream processing job?

plain consumer/producer clients

and others...

8Confidential

Before Kafka Streams

9Confidential

Build apps, not clusters!

10Confidential

Easy to use!

$ java -cp MyApp.jar \ io.confluent.MyApp

11Confidential

Easy to integrate!

12Confidential

Queryable / “DB to-go”

13Confidential

How to install Kafka Streams?Not at all. It’s a library.

<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>0.10.1.0</version></dependency>

14Confidential

How do I deploy my app?Whatever works for you. It’s just an app as any other!

15Confidential

If it’s just a regular application…

• How does it scale?• How can it be fault-tolerant?• How does it handle distributed state?

Off-load hard problems to brokers.

• Kafka is a streaming platform: no need to reinvent the wheel• Exploit consumer groups and group management protocol

16Confidential

Scaling

17Confidential

Scaling

18Confidential

Scaling

Easy to scale!It’s elastic!“cluster to-go”

19Confidential

Fault-tolerance

RebalanceConsumer Group

20Confidential

Distributed State

State stores

21Confidential

Distributed State

State stores

22Confidential

Distributed State

23Confidential

Yes it’s complicated…

API, coding

Org. processes

Reality™

Operations

Security

Architecture

24Confidential

But…

API, coding

Org. processes

Reality™

Operations

Security

Architecture

You

Kafka core / Kafka Streams

25Confidential

KStream/KTable

• KStream• Record stream• Each record describes an event in the real world• Example: click stream

• KTable• Changelog stream

• Each record describes a change to a previous record• Example: position report stream• In Kafka Streams:

• KTable holds a materialized view of the latest update per key as internal state

26Confidential

KTable

User profile/location information

alice paris bob zurich alice berlinChangelog stream

alice parisKTable state

alice parisKTable state

bob zurichalice berlinKTable state

bob zurich

27Confidential

KTable (count moves)

alice paris bob zurich alice berlinRecord stream

alice 0KTable state

count()

Changelog stream (output)alice 0

alice 0KTable state

bob 0

count()

bob 0

alice 1KTable state

bob 0

count()

alice 1

28Confidential

KTable (cont.)

• Internal state:• Continuously updating materialized view of the latest status

• Downstream result (“output”)• Changelog stream, describing every update to the materialized view

KStream stream = …KTable table = stream.aggregate(...)

It’s the changelog!

29Confidential

KStream/KTable

30Confidential

Time and Windows• Event time (default)

• Create time• (Broker) Ingestion time• Customized

• (Hopping) Time windows• Overlapping or non-overlapping (tumbling)• For aggregations

• Processing Time

• Sliding windows• For KStream-KStream joins

KStream stream = …

KTable table = stream.aggregate(TimeWindow.of(10 * 1000), ...);

31Confidential

KTable Semantics

• Non-windowed:• State is kept forever:

• Out-of-order/late-arriving records can be handled straightforward• KTable aggregation can be viewed as a landmark window (ie, window size ==

infinite)• Output is a changelog stream

• Windowed:• Windows (ie, state) is kept ”forever” (well, there is a configurable retention time)

• Out-of-order/late-arriving records can be handled straightforward• Output is a changelog stream

• No watermarks required• Early updates/results

32Confidential

Show Code!

33Confidential

Page Views per Region

Stream/TablejoinClick Stream

Profile Changelog

key val

Current User Info

Cnt

PageViews per Region

<userId:region>

<userId:page>

<region:page>

34Confidential

Page Views per Region final KStreamBuilder builder = new KStreamBuilder();

// read record stream from topic “PageView” and changelog stream from topic “UserProfiles” final KStream<String, String> views = builder.stream("PageViews"); // <userId : page> final KTable<String, String> userProfiles = builder.table("UserProfiles", "UserProfilesStore"); // <userId : region>

35Confidential

Page Views per Region final KStreamBuilder builder = new KStreamBuilder();

// read record stream from topic “PageView” and changelog stream from topic “UserProfiles” final KStream<String, String> views = builder.stream("PageViews"); // <userId : page> final KTable<String, String> userProfiles = builder.table("UserProfiles", "UserProfilesStore"); // <userId : region>

// enrich page views with user’s region -- stream-table-join final KStream<String, String> viewsWithRegionKey = views.leftJoin(userProfiles, (page, userRegion) -> page + “,” + userRegion ) // and set “region” as new key .map( (userId, pageAndRegion) -> new KeyValue<>(pageAndRegion.split(“,”)[1], pageAndRegion.split(“,”)[0]) );

36Confidential

Page Views per Region final KStreamBuilder builder = new KStreamBuilder();

// read record stream from topic “PageView” and changelog stream from topic “UserProfiles” final KStream<String, String> views = builder.stream("PageViews"); // <userId : page> final KTable<String, String> userProfiles = builder.table("UserProfiles", "UserProfilesStore"); // <userId : region>

// enrich page views with user’s region -- stream-table-join AND set “region” as new key final KStream<String, String> viewsWithRegionKey = views.leftJoin(userProfiles, ...).map(...); // <region : page>

// count views by region, using hopping windows of size 5 minutes that advance every 1 minute final KTable<Windowed<String>, Long> viewsPerRegion = viewsWithRegionKey .groupByKey() // redistribute data .count(TimeWindow.of(5 * 60 * 1000L).advanceBy(60 * 1000L), "GeoPageViewsStore");

37Confidential

Page Views per Region final KStreamBuilder builder = new KStreamBuilder();

// read record stream from topic “PageView” and changelog stream from topic “UserProfiles” final KStream<String, String> views = builder.stream("PageViews"); // <userId : page> final KTable<String, String> userProfiles = builder.table("UserProfiles", "UserProfilesStore"); // <userId : region>

// enrich page views with user’s region -- stream-table-join AND set “region” as new key final KStream<String, String> viewsWithRegionKey = views.leftJoin(userProfiles, ...).map(...); // <region : page> // count views by region, using hopping windows of size 5 minutes that advance every 1 minute final KTable<Windowed<String>, Long> viewsByRegion = viewsWithRegionKey.groupByKey().count(TimeWindow.of(...)..., ...);

// write result viewsByRegion.toStream( (windowedRegion, count) -> windowedRegion.toString() ) // prepare result .to(stringSerde, longSerde, "PageViewsByRegion"); // write to topic “PageViewsByResion”

38Confidential

Page Views per Region final KStreamBuilder builder = new KStreamBuilder();

// read record stream from topic “PageView” and changelog stream from topic “UserProfiles” final KStream<String, String> views = builder.stream("PageViews"); // <userId : page> final KTable<String, String> userProfiles = builder.table("UserProfiles", "UserProfilesStore"); // <userId : region>

// enrich page views with user’s region -- stream-table-join AND set “region” as new key final KStream<String, String> viewsWithRegionKey = views.leftJoin(userProfiles, ...).map(...); // <region : page> // count views by region, using hopping windows of size 5 minutes that advance every 1 minute final KTable<Windowed<String>, Long> viewsByRegion = viewsWithRegionKey.groupByKey().count(TimeWindow.of(...)..., ...);

// write result to topic “PageViewsByResion” viewsByRegion.toStream(...).to(..., "PageViewsByRegion");

// start application final KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration); // streamsConfiguration omitted for brevity streams.start(); // stop application streams.close();

/* https://github.com/confluentinc/examples/blob/3.1.x/kafka-streams/src/main/java/io/confluent/examples/streams/PageViewRegionExample.java */

39Confidential

Interactive Queries

• KTable is a changelog stream with materialized internal view (state)• KStream-KTable join can do lookups into the materialized view• What if the application could do lookups, too?

https://www.confluent.io/blog/unifying-stream-processing-and-interactive-queries-in-apache-kafka/

Yes, it can!“DB to-go“

40Confidential

Interactive Queries

charlie 3bob 5 alice 2

New API to accesslocal state stores ofan app instance

41Confidential

Interactive Queries

charlie 3bob 5 alice 2

New API to discoverrunning app instances

“host1:4460”

“host5:5307”

“host3:4777”

42Confidential

Interactive Queries

charlie 3bob 5 alice 2

You: inter-app communication (RPC layer)

43Confidential

Wrapping Up• Kafka Streams is available in Apache Kafka 0.10 and Confluent Platform 3.1

• http://kafka.apache.org/• http://www.confluent.io/download (OS + enterprise versions, tar/zip/deb/rpm)

• Kafka Streams demos at https://github.com/confluentinc/examples • Java 7, Java 8+ with lambdas, and Scala• WordCount, Joins, Avro integration, Top-N computation, Windowing, Interactive

Queries

• Apache Kafka documentation: http://kafka.apache.org/documentation.html• Confluent documentation: http://docs.confluent.io/current/streams/

• Quickstart, Concepts, Architecture, Developer Guide, FAQ

• Join our bi-weekly Confluent Developer Roundtable sessions on Kafka Streams• Contact me at matthias@confluent.io for detail

44Confidential

Thank YouWe are hiring!

top related