Transcript
Page 1: Apache Incubator Samza: Stream Processing at LinkedIn

Apache Samza*Stream Processing at LinkedIn

Chris Riccomini11/13/2013

* Incubating

Page 2: Apache Incubator Samza: Stream Processing at LinkedIn
Page 3: Apache Incubator Samza: Stream Processing at LinkedIn

Stream Processing?

Page 4: Apache Incubator Samza: Stream Processing at LinkedIn

Response latency

0 ms

Page 5: Apache Incubator Samza: Stream Processing at LinkedIn

Response latency

RPC

Synchronous

0 ms

Page 6: Apache Incubator Samza: Stream Processing at LinkedIn

Response latency

RPC

Synchronous Later. Possibly much later.

0 ms

Page 7: Apache Incubator Samza: Stream Processing at LinkedIn

Response latency

Samza

Milliseconds to minutes

RPC

Synchronous Later. Possibly much later.

0 ms

Page 8: Apache Incubator Samza: Stream Processing at LinkedIn

Newsfeed

Page 9: Apache Incubator Samza: Stream Processing at LinkedIn

News

Page 10: Apache Incubator Samza: Stream Processing at LinkedIn

Ad Relevance

Page 11: Apache Incubator Samza: Stream Processing at LinkedIn

Email

Page 12: Apache Incubator Samza: Stream Processing at LinkedIn

Search Indexing Pipeline

Page 13: Apache Incubator Samza: Stream Processing at LinkedIn

Metrics and Monitoring

Page 14: Apache Incubator Samza: Stream Processing at LinkedIn

Motivation

Page 15: Apache Incubator Samza: Stream Processing at LinkedIn

Real-time Feeds

• User activity• Metrics• Monitoring• Database Changes

Page 16: Apache Incubator Samza: Stream Processing at LinkedIn

Real-time Feeds

• 10+ billion writes per day• 172,000 messages per second

(average)• 55+ billion messages per day to real-

time consumers

Page 17: Apache Incubator Samza: Stream Processing at LinkedIn

Stream Processing is Hard

• Partitioning• State• Re-processing• Failure semantics• Joins to services or database• Non-determinism

Page 18: Apache Incubator Samza: Stream Processing at LinkedIn

Samza Concepts &

Architecture

Page 19: Apache Incubator Samza: Stream Processing at LinkedIn

Streams

Partition 0 Partition 1 Partition 2

Page 20: Apache Incubator Samza: Stream Processing at LinkedIn

Streams

Partition 0 Partition 1 Partition 2

123456

12345

1234567

Page 21: Apache Incubator Samza: Stream Processing at LinkedIn

Streams

Partition 0 Partition 1 Partition 2

123456

12345

1234567

Page 22: Apache Incubator Samza: Stream Processing at LinkedIn

Streams

Partition 0 Partition 1 Partition 2

123456

12345

1234567

Page 23: Apache Incubator Samza: Stream Processing at LinkedIn

Streams

Partition 0 Partition 1 Partition 2

123456

12345

1234567

Page 24: Apache Incubator Samza: Stream Processing at LinkedIn

Streams

Partition 0 Partition 1 Partition 2

123456

12345

1234567

Page 25: Apache Incubator Samza: Stream Processing at LinkedIn

Streams

Partition 0 Partition 1 Partition 2

next append

123456

12345

1234567

Page 26: Apache Incubator Samza: Stream Processing at LinkedIn

TasksPartition 0

Page 27: Apache Incubator Samza: Stream Processing at LinkedIn

TasksPartition 0

Task 1

Page 28: Apache Incubator Samza: Stream Processing at LinkedIn

TasksPartition 0

class PageKeyViewsCounterTask implements StreamTask { public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {

GenericRecord record = ((GenericRecord) envelope.getMsg()); String pageKey = record.get("page-key").toString(); int newCount = pageKeyViews.get(pageKey).incrementAndGet(); collector.send(countStream, pageKey, newCount); }}

Page 29: Apache Incubator Samza: Stream Processing at LinkedIn

TasksPartition 0

class PageKeyViewsCounterTask implements StreamTask { public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {

GenericRecord record = ((GenericRecord) envelope.getMsg()); String pageKey = record.get("page-key").toString(); int newCount = pageKeyViews.get(pageKey).incrementAndGet(); collector.send(countStream, pageKey, newCount); }}

Page 30: Apache Incubator Samza: Stream Processing at LinkedIn

TasksPartition 0

class PageKeyViewsCounterTask implements StreamTask { public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {

GenericRecord record = ((GenericRecord) envelope.getMsg()); String pageKey = record.get("page-key").toString(); int newCount = pageKeyViews.get(pageKey).incrementAndGet(); collector.send(countStream, pageKey, newCount); }}

Page 31: Apache Incubator Samza: Stream Processing at LinkedIn

TasksPartition 0

class PageKeyViewsCounterTask implements StreamTask { public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {

GenericRecord record = ((GenericRecord) envelope.getMsg()); String pageKey = record.get("page-key").toString(); int newCount = pageKeyViews.get(pageKey).incrementAndGet(); collector.send(countStream, pageKey, newCount); }}

Page 32: Apache Incubator Samza: Stream Processing at LinkedIn

TasksPartition 0

class PageKeyViewsCounterTask implements StreamTask { public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {

GenericRecord record = ((GenericRecord) envelope.getMsg()); String pageKey = record.get("page-key").toString(); int newCount = pageKeyViews.get(pageKey).incrementAndGet(); collector.send(countStream, pageKey, newCount); }}

Page 33: Apache Incubator Samza: Stream Processing at LinkedIn

TasksPartition 0

class PageKeyViewsCounterTask implements StreamTask { public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {

GenericRecord record = ((GenericRecord) envelope.getMsg()); String pageKey = record.get("page-key").toString(); int newCount = pageKeyViews.get(pageKey).incrementAndGet(); collector.send(countStream, pageKey, newCount); }}

Page 34: Apache Incubator Samza: Stream Processing at LinkedIn

TasksPartition 0

class PageKeyViewsCounterTask implements StreamTask { public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {

GenericRecord record = ((GenericRecord) envelope.getMsg()); String pageKey = record.get("page-key").toString(); int newCount = pageKeyViews.get(pageKey).incrementAndGet(); collector.send(countStream, pageKey, newCount); }}

Page 35: Apache Incubator Samza: Stream Processing at LinkedIn

TasksPartition 0

class PageKeyViewsCounterTask implements StreamTask { public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {

GenericRecord record = ((GenericRecord) envelope.getMsg()); String pageKey = record.get("page-key").toString(); int newCount = pageKeyViews.get(pageKey).incrementAndGet(); collector.send(countStream, pageKey, newCount); }}

Page 36: Apache Incubator Samza: Stream Processing at LinkedIn

TasksPartition 0

class PageKeyViewsCounterTask implements StreamTask { public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {

GenericRecord record = ((GenericRecord) envelope.getMsg()); String pageKey = record.get("page-key").toString(); int newCount = pageKeyViews.get(pageKey).incrementAndGet(); collector.send(countStream, pageKey, newCount); }}

Page 37: Apache Incubator Samza: Stream Processing at LinkedIn

TasksPartition 0

class PageKeyViewsCounterTask implements StreamTask { public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {

GenericRecord record = ((GenericRecord) envelope.getMsg()); String pageKey = record.get("page-key").toString(); int newCount = pageKeyViews.get(pageKey).incrementAndGet(); collector.send(countStream, pageKey, newCount); }}

Page 38: Apache Incubator Samza: Stream Processing at LinkedIn

TasksPartition 0

Task 1

Page 39: Apache Incubator Samza: Stream Processing at LinkedIn

Tasks

PageKeyViewsCounterTask

Partition 0 Partition 1

Page Views - Partition 0

1234

Output Count Stream

Page 40: Apache Incubator Samza: Stream Processing at LinkedIn

Tasks

PageKeyViewsCounterTask

Partition 0 Partition 1

Page Views - Partition 0

1234

Output Count Stream

Page 41: Apache Incubator Samza: Stream Processing at LinkedIn

Tasks

PageKeyViewsCounterTask

Partition 0 Partition 1

Page Views - Partition 0

1234

Output Count Stream

Page 42: Apache Incubator Samza: Stream Processing at LinkedIn

Tasks

PageKeyViewsCounterTask

Partition 0Partition 1

Page Views - Partition 0

1234

Output Count Stream

Page 43: Apache Incubator Samza: Stream Processing at LinkedIn

Tasks

PageKeyViewsCounterTask

Partition 0Partition 1

Page Views - Partition 0

1234

Output Count Stream

Page 44: Apache Incubator Samza: Stream Processing at LinkedIn

Tasks

PageKeyViewsCounterTask

Partition 0Partition 1

Page Views - Partition 0

1234

Output Count Stream

Page 45: Apache Incubator Samza: Stream Processing at LinkedIn

Tasks

PageKeyViewsCounterTask

Partition 0 Partition 1

Page Views - Partition 0

1234

Output Count Stream

Page 46: Apache Incubator Samza: Stream Processing at LinkedIn

Tasks

PageKeyViewsCounterTask

Partition 0 Partition 1

Page Views - Partition 0

1234

Output Count Stream

Page 47: Apache Incubator Samza: Stream Processing at LinkedIn

Tasks

PageKeyViewsCounterTask

Partition 0 Partition 1

1234

2

Partition 1Checkpoint

Stream

Page Views - Partition 0

Output Count Stream

Page 48: Apache Incubator Samza: Stream Processing at LinkedIn

Tasks

PageKeyViewsCounterTask

Partition 0 Partition 1

1234

2

Partition 1Checkpoint

Stream

Page Views - Partition 0

Output Count Stream

Page 49: Apache Incubator Samza: Stream Processing at LinkedIn

Tasks

PageKeyViewsCounterTask

Partition 0 Partition 1

1234

2

Partition 1Checkpoint

Stream

Page Views - Partition 0

Output Count Stream

Page 50: Apache Incubator Samza: Stream Processing at LinkedIn

Tasks

PageKeyViewsCounterTask

Partition 0

Partition 1

1234

2

Partition 1Checkpoint

Stream

Page Views - Partition 0

Output Count Stream

Page 51: Apache Incubator Samza: Stream Processing at LinkedIn

Tasks

PageKeyViewsCounterTask

Partition 0

Partition 1

1234

2

Partition 1Checkpoint

Stream

Page Views - Partition 0

Output Count Stream

Page 52: Apache Incubator Samza: Stream Processing at LinkedIn

Tasks

PageKeyViewsCounterTask

Partition 0

Partition 1

1234

2

Partition 1Checkpoint

Stream

Page Views - Partition 0

Output Count Stream

Page 53: Apache Incubator Samza: Stream Processing at LinkedIn

Tasks

PageKeyViewsCounterTask

Partition 0

Partition 1

1234

2

Partition 1Checkpoint

Stream

Page Views - Partition 0

Output Count Stream

Page 54: Apache Incubator Samza: Stream Processing at LinkedIn

Tasks

PageKeyViewsCounterTask

Partition 0

Partition 1

1234

2

Partition 1Checkpoint

Stream

Page Views - Partition 0

Output Count Stream

Page 55: Apache Incubator Samza: Stream Processing at LinkedIn

Tasks

PageKeyViewsCounterTask

Partition 0

Partition 1

1234

2

Partition 1Checkpoint

Stream

Page Views - Partition 0

Output Count Stream

Page 56: Apache Incubator Samza: Stream Processing at LinkedIn

JobsStream A

Task 1 Task 2 Task 3

Stream B

Page 57: Apache Incubator Samza: Stream Processing at LinkedIn

JobsStream A Stream B

Task 1 Task 2 Task 3

Stream C

Page 58: Apache Incubator Samza: Stream Processing at LinkedIn

JobsAdViews AdClicks

Task 1 Task 2 Task 3

AdClickThroughRate

Page 59: Apache Incubator Samza: Stream Processing at LinkedIn

JobsAdViews AdClicks

Task 1 Task 2 Task 3

AdClickThroughRate

Page 60: Apache Incubator Samza: Stream Processing at LinkedIn

JobsStream A Stream B

Task 1 Task 2 Task 3

Stream C

Page 61: Apache Incubator Samza: Stream Processing at LinkedIn

DataflowStream A Stream B Stream C

Stream E

Stream B

Job 1 Job 2

Stream D

Job 3

Page 62: Apache Incubator Samza: Stream Processing at LinkedIn

DataflowStream A Stream B Stream C

Stream E

Stream B

Job 1 Job 2

Stream D

Job 3

Page 63: Apache Incubator Samza: Stream Processing at LinkedIn

YARN

Page 64: Apache Incubator Samza: Stream Processing at LinkedIn
Page 65: Apache Incubator Samza: Stream Processing at LinkedIn

YARN

You: I want to run command X on two machines with 512M of memory.

Page 66: Apache Incubator Samza: Stream Processing at LinkedIn

YARN

You: I want to run command X on two machines with 512M of memory.

YARN: Cool, where’s your code?

Page 67: Apache Incubator Samza: Stream Processing at LinkedIn

YARN

You: I want to run command X on two machines with 512M of memory.

YARN: Cool, where’s your code?

You: http://some-host/jobs/download/my.tgz

Page 68: Apache Incubator Samza: Stream Processing at LinkedIn

YARN

You: I want to run command X on two machines with 512M of memory.

YARN: Cool, where’s your code?

You: http://some-host/jobs/download/my.tgz

YARN: I’ve run your command on grid-node-2 and grid-node-7.

Page 69: Apache Incubator Samza: Stream Processing at LinkedIn

YARN

Host 1 Host 2 Host 3

Page 70: Apache Incubator Samza: Stream Processing at LinkedIn

YARN

Host 1

NM

Host 2

NM

Host 3

NM

Page 71: Apache Incubator Samza: Stream Processing at LinkedIn

YARN

Host 1

NM

Host 2

NM

Host 3

NM

Host 0

RM

Page 72: Apache Incubator Samza: Stream Processing at LinkedIn

YARN

Host 1

NM

Host 2

NM

Host 3

NM

Host 0

RMClient

Page 73: Apache Incubator Samza: Stream Processing at LinkedIn

YARN

Host 1

NM

Host 2

NM

Host 3

NM

Host 0

RMClient

Page 74: Apache Incubator Samza: Stream Processing at LinkedIn

YARN

Host 1

NM

Host 2

NM

Host 3

NM

Host 0

RMClient

Page 75: Apache Incubator Samza: Stream Processing at LinkedIn

YARN

Host 1

NM

Host 2

NM

Host 3

NM

Host 0

RMClient

AM

Page 76: Apache Incubator Samza: Stream Processing at LinkedIn

YARN

Host 1

NM

Host 2

NM

Host 3

NM

Host 0

RMClient

AM

Page 77: Apache Incubator Samza: Stream Processing at LinkedIn

YARN

Host 1

NM

Host 2

NM

Host 3

NM

Host 0

RMClient

AM

Page 78: Apache Incubator Samza: Stream Processing at LinkedIn

YARN

Host 1

NM

Host 2

NM

Host 3

NM

Host 0

RMClient

AM

Container

Page 79: Apache Incubator Samza: Stream Processing at LinkedIn

YARN

Host 1

NM

Host 2

NM

Host 3

NM

Host 0

RMClient

AM

Container

Page 80: Apache Incubator Samza: Stream Processing at LinkedIn

YARN

Host 1

NM

Host 2

NM

Host 3

NM

Host 0

RMClient

AM

Page 81: Apache Incubator Samza: Stream Processing at LinkedIn

YARN

Host 1

NM

Host 2

NM

Host 3

NM

Host 0

RMClient

AM

Page 82: Apache Incubator Samza: Stream Processing at LinkedIn

YARN

Host 1

NM

Host 2

NM

Host 3

NM

Host 0

RMClient

AM

Page 83: Apache Incubator Samza: Stream Processing at LinkedIn

YARN

Host 1

NM

Host 2

NM

Host 3

NM

Host 0

RMClient

AM

Page 84: Apache Incubator Samza: Stream Processing at LinkedIn

YARN

Host 1

NM

Host 2

NM

Host 3

NM

Host 0

RMClient

AM

Container

Page 85: Apache Incubator Samza: Stream Processing at LinkedIn

JobsStream A

Task 1 Task 2 Task 3

Stream B

Page 86: Apache Incubator Samza: Stream Processing at LinkedIn

Containers

Task 1 Task 2 Task 3

Stream B

Stream A

Page 87: Apache Incubator Samza: Stream Processing at LinkedIn

Containers

Stream B

Stream A

Samza Container 1 Samza Container 2

Page 88: Apache Incubator Samza: Stream Processing at LinkedIn

Containers

Samza Container 1 Samza Container 2

Page 89: Apache Incubator Samza: Stream Processing at LinkedIn

YARN

Samza Container 1 Samza Container 2

Host 1 Host 2

Page 90: Apache Incubator Samza: Stream Processing at LinkedIn

YARN

Samza Container 1 Samza Container 2

NodeManager NodeManager

Host 1 Host 2

Page 91: Apache Incubator Samza: Stream Processing at LinkedIn

YARN

Samza Container 1 Samza Container 2

NodeManager NodeManager

Samza YARN AM

Host 1 Host 2

Page 92: Apache Incubator Samza: Stream Processing at LinkedIn

YARN

Samza Container 1 Samza Container 2

NodeManager

Kafka Broker

NodeManager

Samza YARN AM

Kafka Broker

Host 1 Host 2

Page 93: Apache Incubator Samza: Stream Processing at LinkedIn

YARN

MapReduceContainer

MapReduce Container

NodeManager

HDFS

NodeManager

MapReduce YARN AM

HDFS

Host 1 Host 2

Page 94: Apache Incubator Samza: Stream Processing at LinkedIn

YARN

Samza Container 1

NodeManager

Kafka Broker

Host 1

Stream C

Stream A

Samza Container 1 Samza Container 2

Page 95: Apache Incubator Samza: Stream Processing at LinkedIn

YARN

Samza Container 1

NodeManager

Kafka Broker

Host 1

Stream C

Stream A

Samza Container 1 Samza Container 2

Page 96: Apache Incubator Samza: Stream Processing at LinkedIn

YARN

Samza Container 1

NodeManager

Kafka Broker

Host 1

Stream C

Stream A

Samza Container 1 Samza Container 2

Page 97: Apache Incubator Samza: Stream Processing at LinkedIn

YARN

Samza Container 1

NodeManager

Kafka Broker

Host 1

Stream C

Stream A

Samza Container 1 Samza Container 2

Page 98: Apache Incubator Samza: Stream Processing at LinkedIn

YARN

Samza Container 1 Samza Container 2

NodeManager

Kafka Broker

NodeManager

Samza YARN AM

Kafka Broker

Host 1 Host 2

Page 99: Apache Incubator Samza: Stream Processing at LinkedIn

CGroups

Samza Container 1 Samza Container 2

NodeManager

Kafka Broker

NodeManager

Samza YARN AM

Kafka Broker

Host 1 Host 2

Page 100: Apache Incubator Samza: Stream Processing at LinkedIn

(Not Running) Multi-Framework

Samza Container 1 MapReduceContainer

NodeManager

Kafka

NodeManager

Samza YARN AM

HDFS

Host 1 Host 2

Page 101: Apache Incubator Samza: Stream Processing at LinkedIn

Stateful Processing

Page 102: Apache Incubator Samza: Stream Processing at LinkedIn

SELECT col1, count(*) FROM stream1INNER JOIN stream2ON stream1.col3 = stream2.col3WHERE col2 > 20 GROUP BY col1 ORDER BY count(*) DESC LIMIT 50;

Page 103: Apache Incubator Samza: Stream Processing at LinkedIn

SELECT col1, count(*) FROM stream1INNER JOIN stream2ON stream1.col3 = stream2.col3WHERE col2 > 20 GROUP BY col1 ORDER BY count(*) DESC LIMIT 50;

Page 104: Apache Incubator Samza: Stream Processing at LinkedIn

SELECT col1, count(*) FROM stream1INNER JOIN stream2ON stream1.col3 = stream2.col3WHERE col2 > 20 GROUP BY col1 ORDER BY count(*) DESC LIMIT 50;

Page 105: Apache Incubator Samza: Stream Processing at LinkedIn

SELECT col1, count(*) FROM stream1INNER JOIN stream2ON stream1.col3 = stream2.col3WHERE col2 > 20 GROUP BY col1 ORDER BY count(*) DESC LIMIT 10;

Page 106: Apache Incubator Samza: Stream Processing at LinkedIn

How do people do this?

Page 107: Apache Incubator Samza: Stream Processing at LinkedIn

Remote StoresStream A

Task 1 Task 2 Task 3

Stream B

Key-Value Store

Page 108: Apache Incubator Samza: Stream Processing at LinkedIn

Remote RPC is slow

• Stream: ~500k records/sec/container• DB: << less

Page 109: Apache Incubator Samza: Stream Processing at LinkedIn

Online vs. Async

Page 110: Apache Incubator Samza: Stream Processing at LinkedIn

No undo

• Database state is non-deterministic• Can’t roll back mutations if task

crashes

Page 111: Apache Incubator Samza: Stream Processing at LinkedIn

Tables & Streams

Database

put(a, w)

put(b, x)

put(a, y)

put(b, z)

Time

Page 112: Apache Incubator Samza: Stream Processing at LinkedIn

Stateful TasksStream A

Task 1 Task 2 Task 3

Stream B

Page 113: Apache Incubator Samza: Stream Processing at LinkedIn

Stateful TasksStream A

Task 1 Task 2 Task 3

Stream B

Page 114: Apache Incubator Samza: Stream Processing at LinkedIn

Stateful TasksStream A

Task 1 Task 2 Task 3

Stream B Changelog Stream

Page 115: Apache Incubator Samza: Stream Processing at LinkedIn

Stateful TasksStream A

Task 1 Task 2 Task 3

Stream B Changelog Stream

Page 116: Apache Incubator Samza: Stream Processing at LinkedIn

Stateful TasksStream A

Task 1 Task 2 Task 3

Stream B Changelog Stream

Page 117: Apache Incubator Samza: Stream Processing at LinkedIn

Stateful TasksStream A

Task 1 Task 2 Task 3

Stream B Changelog Stream

Page 118: Apache Incubator Samza: Stream Processing at LinkedIn

Stateful TasksStream A

Task 1 Task 2 Task 3

Stream B Changelog Stream

Page 119: Apache Incubator Samza: Stream Processing at LinkedIn

Stateful TasksStream A

Task 1 Task 2 Task 3

Stream B Changelog Stream

Page 120: Apache Incubator Samza: Stream Processing at LinkedIn

Stateful TasksStream A

Task 1 Task 2 Task 3

Stream B Changelog Stream

Page 121: Apache Incubator Samza: Stream Processing at LinkedIn

Stateful TasksStream A

Task 1 Task 2 Task 3

Stream B Changelog Stream

Page 122: Apache Incubator Samza: Stream Processing at LinkedIn

Stateful TasksStream A

Task 1 Task 2 Task 3

Stream B Changelog Stream

Page 123: Apache Incubator Samza: Stream Processing at LinkedIn

Stateful TasksStream A

Task 1 Task 2 Task 3

Stream B Changelog Stream

Page 124: Apache Incubator Samza: Stream Processing at LinkedIn

Stateful TasksStream A

Task 1 Task 2 Task 3

Stream B Changelog Stream

Page 125: Apache Incubator Samza: Stream Processing at LinkedIn

Stateful TasksStream A

Task 1 Task 2 Task 3

Stream B Changelog Stream

Page 126: Apache Incubator Samza: Stream Processing at LinkedIn

Key-Value Store

• put(table_name, key, value)• get(table_name, key)• delete(table_name, key)• range(table_name, key1, key2)

Page 127: Apache Incubator Samza: Stream Processing at LinkedIn

Stateful Stream Task

public class SimpleStatefulTask implements StreamTask, InitableTask { private KeyValueStore<String, String> store; public void init(Config config, TaskContext context) { this.store = context.getStore("mystore"); } public void process( IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {

GenericRecord record = (GenericRecord) envelope.getMessage(); String memberId = record.get("member_id"); String name = record.get("name"); System.out.println("old name: " + store.get(memberId)); store.put(memberId, name); }}

Page 128: Apache Incubator Samza: Stream Processing at LinkedIn

Stateful Stream Task

public class SimpleStatefulTask implements StreamTask, InitableTask { private KeyValueStore<String, String> store; public void init(Config config, TaskContext context) { this.store = context.getStore("mystore"); } public void process( IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {

GenericRecord record = (GenericRecord) envelope.getMessage(); String memberId = record.get("member_id"); String name = record.get("name"); System.out.println("old name: " + store.get(memberId)); store.put(memberId, name); }}

Page 129: Apache Incubator Samza: Stream Processing at LinkedIn

Stateful Stream Task

public class SimpleStatefulTask implements StreamTask, InitableTask { private KeyValueStore<String, String> store; public void init(Config config, TaskContext context) { this.store = context.getStore("mystore"); } public void process( IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {

GenericRecord record = (GenericRecord) envelope.getMessage(); String memberId = record.get("member_id"); String name = record.get("name"); System.out.println("old name: " + store.get(memberId)); store.put(memberId, name); }}

Page 130: Apache Incubator Samza: Stream Processing at LinkedIn

Stateful Stream Task

public class SimpleStatefulTask implements StreamTask, InitableTask { private KeyValueStore<String, String> store; public void init(Config config, TaskContext context) { this.store = context.getStore("mystore"); } public void process( IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {

GenericRecord record = (GenericRecord) envelope.getMessage(); String memberId = record.get("member_id"); String name = record.get("name"); System.out.println("old name: " + store.get(memberId)); store.put(memberId, name); }}

Page 131: Apache Incubator Samza: Stream Processing at LinkedIn

Whew!

Page 132: Apache Incubator Samza: Stream Processing at LinkedIn

Let’s be Friends!

• We are incubating, and you can help!• Get up and running in 5 minutes

http://bit.ly/hello-samza• Grab some newbie JIRAs

http://bit.ly/samza_newbie_issues


Top Related