stream processing made simple with kafka

Post on 16-Apr-2017

675 Views

Category:

Technology

1 Downloads

Preview:

Click to see full reader

TRANSCRIPT

Kafka Streams Stream processing Made Simple with Kafka

1

Guozhang Wang Hadoop Summit, June 28, 2016

2

What is NOT Stream Processing?

3

Stream Processing isn’t (necessarily)

• Transient, approximate, lossy…

• .. that you must have batch processing as safety net

4

5

6

7

8

Stream Processing

• A different programming paradigm

• .. that brings computation to unbounded data

• .. with tradeoffs between latency / cost / correctness

9

Why Kafka in Stream Processing?

10

• Persistent Buffering

• Logical Ordering

• Highly Scalable “source-of-truth”

Kafka: Real-time Platforms

11

Stream Processing with Kafka

12

• Option I: Do It Yourself !

Stream Processing with Kafka

13

• Option I: Do It Yourself !

Stream Processing with Kafka

while (isRunning) { // read some messages from Kafka inputMessages = consumer.poll();

// do some processing…

// send output messages back to Kafka producer.send(outputMessages); }

14

• Ordering

• Partitioning &

Scalability

• Fault tolerance

DIY Stream Processing is Hard

• State Management

• Time, Window &

Out-of-order Data

• Re-processing

15

• Option I: Do It Yourself !

• Option II: full-fledged stream processing system

• Storm, Spark, Flink, Samza, ..

Stream Processing with Kafka

16

MapReduce Heritage?

• Config Management

• Resource Management

• Configuration

• etc..

17

MapReduce Heritage?

• Config Management

• Resource Management

• Deployment

• etc..

18

MapReduce Heritage?

• Config Management

• Resource Management

• Deployment

• etc..

Can I just use my own?!

19

• Option I: Do It Yourself !

• Option II: full-fledged stream processing system

• Option III: lightweight stream processing library

Stream Processing with Kafka

Kafka Streams

• In Apache Kafka since v0.10, May 2016

• Powerful yet easy-to-use stream processing library• Event-at-a-time, Stateful

• Windowing with out-of-order handling

• Highly scalable, distributed, fault tolerant

• and more..20

21

Anywhere, anytime

Ok. Ok. Ok. Ok.

22

Anywhere, anytime

<dependency>

<groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>0.10.0.0</version>

</dependency>

23

Anywhere, anytime

War File

Rsync

Puppet/

Chef

YARN

Mesos

Docker

Kuberne

tes

Very Uncool Very Cool

24

Simple is Beautiful

Kafka Streams DSL

25

public static void main(String[] args) { // specify the processing topology by first reading in a stream from a topic KStream<String, String> words = builder.stream(”topic1”);

// count the words in this stream as an aggregated table KTable<String, Long> counts = words.countByKey(”Counts”);

// write the result table to a new topic counts.to(”topic2”);

// create a stream processing instance and start running it KafkaStreams streams = new KafkaStreams(builder, config); streams.start(); }

Kafka Streams DSL

26

public static void main(String[] args) { // specify the processing topology by first reading in a stream from a topic KStream<String, String> words = builder.stream(”topic1”);

// count the words in this stream as an aggregated table KTable<String, Long> counts = words.countByKey(”Counts”);

// write the result table to a new topic counts.to(”topic2”);

// create a stream processing instance and start running it KafkaStreams streams = new KafkaStreams(builder, config); streams.start(); }

Kafka Streams DSL

27

public static void main(String[] args) { // specify the processing topology by first reading in a stream from a topic KStream<String, String> words = builder.stream(”topic1”);

// count the words in this stream as an aggregated table KTable<String, Long> counts = words.countByKey(”Counts”);

// write the result table to a new topic counts.to(”topic2”);

// create a stream processing instance and start running it KafkaStreams streams = new KafkaStreams(builder, config); streams.start(); }

Kafka Streams DSL

28

public static void main(String[] args) { // specify the processing topology by first reading in a stream from a topic KStream<String, String> words = builder.stream(”topic1”);

// count the words in this stream as an aggregated table KTable<String, Long> counts = words.countByKey(”Counts”);

// write the result table to a new topic counts.to(”topic2”);

// create a stream processing instance and start running it KafkaStreams streams = new KafkaStreams(builder, config); streams.start(); }

Kafka Streams DSL

29

public static void main(String[] args) { // specify the processing topology by first reading in a stream from a topic KStream<String, String> words = builder.stream(”topic1”);

// count the words in this stream as an aggregated table KTable<String, Long> counts = words.countByKey(”Counts”);

// write the result table to a new topic counts.to(”topic2”);

// create a stream processing instance and start running it KafkaStreams streams = new KafkaStreams(builder, config); streams.start(); }

Kafka Streams DSL

30

public static void main(String[] args) { // specify the processing topology by first reading in a stream from a topic KStream<String, String> words = builder.stream(”topic1”);

// count the words in this stream as an aggregated table KTable<String, Long> counts = words.countByKey(”Counts”);

// write the result table to a new topic counts.to(”topic2”);

// create a stream processing instance and start running it KafkaStreams streams = new KafkaStreams(builder, config); streams.start(); }

31

Native Kafka IntegrationProperty cfg = new Properties();

cfg.put(StreamsConfig.APPLICATION_ID_CONFIG, “my-streams-app”);

cfg.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, “broker1:9092”);

cfg.put(ConsumerConfig.AUTO_OFFSET_RESET_CONIFG, “earliest”);

cfg.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, “SASL_SSL”);

cfg.put(KafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, “registry:8081”);

StreamsConfig config = new StreamsConfig(cfg);

KafkaStreams streams = new KafkaStreams(builder, config);

32

Property cfg = new Properties();

cfg.put(StreamsConfig.APPLICATION_ID_CONFIG, “my-streams-app”);

cfg.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, “broker1:9092”);

cfg.put(ConsumerConfig.AUTO_OFFSET_RESET_CONIFG, “earliest”);

cfg.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, “SASL_SSL”);

cfg.put(KafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, “registry:8081”);

StreamsConfig config = new StreamsConfig(cfg);

KafkaStreams streams = new KafkaStreams(builder, config);

Native Kafka Integration

33

API, coding

“Full stack” evaluation

Operations, debugging, …

34

API, coding

“Full stack” evaluation

Operations, debugging, …

Simple is Beautiful

35

Key Idea:

Outsource hard problems to Kafka!

Kafka Concepts: the Log

4 5 5 7 8 9 10 11 12...

Producer Write

Consumer1 Reads (offset 7)

Consumer2 Reads (offset 10)

Messages

3

Topic 1

Topic 2

Partitions

Producers

Producers

Consumers

Consumers

Brokers

Kafka Concepts: the Log

38

Kafka Streams: Key Concepts

Stream and Records

39

Key Value Key Value Key Value Key Value

Stream

Record

Processor Topology

40

Stream

Processor Topology

41

StreamProcessor

Processor Topology

42

KStream<..> stream1 = builder.stream(”topic1”);

KStream<..> stream2 = builder.stream(”topic2”);

KStream<..> joined = stream1.leftJoin(stream2, ...);

KTable<..> aggregated = joined.aggregateByKey(...);

aggregated.to(”topic3”);

Processor Topology

43

KStream<..> stream1 = builder.stream(”topic1”);

KStream<..> stream2 = builder.stream(”topic2”);

KStream<..> joined = stream1.leftJoin(stream2, ...);

KTable<..> aggregated = joined.aggregateByKey(...);

aggregated.to(”topic3”);

Processor Topology

44

KStream<..> stream1 = builder.stream(”topic1”);

KStream<..> stream2 = builder.stream(”topic2”);

KStream<..> joined = stream1.leftJoin(stream2, ...);

KTable<..> aggregated = joined.aggregateByKey(...);

aggregated.to(”topic3”);

Processor Topology

45

KStream<..> stream1 = builder.stream(”topic1”);

KStream<..> stream2 = builder.stream(”topic2”);

KStream<..> joined = stream1.leftJoin(stream2, ...);

KTable<..> aggregated = joined.aggregateByKey(...);

aggregated.to(”topic3”);

Processor Topology

46

KStream<..> stream1 = builder.stream(”topic1”);

KStream<..> stream2 = builder.stream(”topic2”);

KStream<..> joined = stream1.leftJoin(stream2, ...);

KTable<..> aggregated = joined.aggregateByKey(...);

aggregated.to(”topic3”);

Processor Topology

47

Source Processor

Sink Processor

KStream<..> stream1 = builder.stream(

KStream<..> stream2 = builder.stream(

aggregated.to(

Processor Topology

48Kafka Streams Kafka

Kafka Topic B

Data Parallelism

49

Kafka Topic A

MyApp.1 MyApp.2Task2Task1

50

• Ordering

• Partitioning &

Scalability

• Fault tolerance

Stream Processing Hard Parts

• State Management

• Time, Window &

Out-of-order Data

• Re-processing

States in Stream Processing

51

• filter

• map

• join

• aggregate

Stateless

Stateful

52

States in Stream Processing

53

KStream<..> stream1 = builder.stream(”topic1”);

KStream<..> stream2 = builder.stream(”topic2”);

KStream<..> joined = stream1.leftJoin(stream2, ...);

KTable<..> aggregated = joined.aggregateByKey(...);

aggregated.to(”topic2”);

State

Kafka Topic B

Task2Task1

States in Stream Processing

54

Kafka Topic A

State State

It’s all about Time

• Event-time (when an event is created)

• Processing-time (when an event is processed)

55

Event-time 1 2 3 4 5 6 7Processing-time 1999 2002 2005 1997 1980 1983 2015

56

PHAN

TOM

MEN

ACE

ATTA

CK O

F TH

E CL

ON

ES

REV

ENG

E O

F TH

E SI

TH

A N

EW H

OPE

THE

EMPI

RE

STR

IKES

BAC

K

RET

UR

N O

F TH

E JE

DI

THE

FORC

E AW

AKEN

S

Out-of-Order

Timestamp Extractor

57

public long extract(ConsumerRecord<Object, Object> record) {

return System.currentTimeMillis();

}

public long extract(ConsumerRecord<Object, Object> record) {

return record.timestamp();

}

Timestamp Extractor

58

public long extract(ConsumerRecord<Object, Object> record) {

return System.currentTimeMillis();

}

public long extract(ConsumerRecord<Object, Object> record) {

return record.timestamp();

}

processing-time

Timestamp Extractor

59

public long extract(ConsumerRecord<Object, Object> record) {

return System.currentTimeMillis();

}

public long extract(ConsumerRecord<Object, Object> record) {

return record.timestamp();

}

processing-time

event-time

Windowing

60

t…

Windowing

61

t…

Windowing

62

t…

Windowing

63

t…

Windowing

64

t…

Windowing

65

t…

Windowing

66

t…

67

• Ordering

• Partitioning &

Scalability

• Fault tolerance

Stream Processing Hard Parts

• State Management

• Time, Window &

Out-of-order Data

• Re-processing

Stream v.s. Table?

68

KStream<..> stream1 = builder.stream(”topic1”);

KStream<..> stream2 = builder.stream(”topic2”);

KStream<..> joined = stream1.leftJoin(stream2, ...);

KTable<..> aggregated = joined.aggregateByKey(...);

aggregated.to(”topic2”);

State

69

Tables ≈ Streams

70

71

72

The Stream-Table Duality

• A stream is a changelog of a table

• A table is a materialized view at time of a stream

• Example: change data capture (CDC) of databases

73

KStream = interprets data as record stream

~ think: “append-only”

KTable = data as changelog stream

~ continuously updated materialized view

74

75

alice eggs bob lettuce alice milk

alice lnkd bob googl alice msft

KStream

KTable

User purchase history

User employment profile

76

alice eggs bob lettuce alice milk

alice lnkd bob googl alice msft

KStream

KTable

User purchase history

User employment profile

time

“Alice bought eggs.”

“Alice is now at LinkedIn.”

77

alice eggs bob lettuce alice milk

alice lnkd bob googl alice msft

KStream

KTable

User purchase history

User employment profile

time

“Alice bought eggs and milk.”

“Alice is now at LinkedIn Microsoft.”

78

alice 2 bob 10 alice 3

timeKStream.aggregate()

KTable.aggregate()

(key: Alice, value: 2)

(key: Alice, value: 2)

79

alice 2 bob 10 alice 3

time

(key: Alice, value: 2 3)

(key: Alice, value: 2+3)

KStream.aggregate()

KTable.aggregate()

80

KStream KTable

reduce() aggregate() …

toStream()

map() filter() join() …

map() filter() join() …

81

KTable aggregated

KStream joined

KStream stream1KStream stream2

Updates Propagation in KTable

State

82

KTable aggregated

KStream joined

KStream stream1KStream stream2

State

Updates Propagation in KTable

83

KTable aggregated

KStream joined

KStream stream1KStream stream2

State

Updates Propagation in KTable

84

KTable aggregated

KStream joined

KStream stream1KStream stream2

State

Updates Propagation in KTable

85

• Ordering

• Partitioning &

Scalability

• Fault tolerance

Stream Processing Hard Parts

• State Management

• Time, Window &

Out-of-order Data

• Re-processing

86

Remember?

87

StateProcess

StateProcess

StateProcess

Kafka ChangelogFault ToleranceKafka

Kafka Streams

Kafka

88

StateProcess

StateProcess Protoco

l

StateProcess

Fault ToleranceKafka

Kafka Streams

Kafka Changelog

Kafka

89

StateProcess

StateProcess Protoco

l

StateProcess

Fault Tolerance

StateProcess

KafkaKafka Streams

Kafka Changelog

Kafka

90

91

92

93

94

• Ordering

• Partitioning &

Scalability

• Fault tolerance

Stream Processing Hard Parts

• State Management

• Time, Window &

Out-of-order Data

• Re-processing

95

• Ordering

• Partitioning &

Scalability

• Fault tolerance

Stream Processing Hard Parts

• State Management

• Time, Window &

Out-of-order Data

• Re-processing

Simple is Beautiful

96

But how to get data in / out Kafka?

97

98

99

100

Take-aways

• Stream Processing: a new programming paradigm

101

Take-aways

• Stream Processing: a new programming paradigm

• Kafka Streams: stream processing made easy

102

Take-aways

• Stream Processing: a new programming paradigm

• Kafka Streams: stream processing made easy

103

THANKS!

Guozhang Wang | guozhang@confluent.io | @guozhangwang

Visit Confluent at the Syncsort Booth (#1303), live demos @ 29thDownload Kafka Streams: www.confluent.io/product

104

We are Hiring!

top related