apache incubator samza: stream processing at linkedin
DESCRIPTION
This is the slide deck that was presented at QConf SF on November 13, 2013. The presentation covers what Samza is, why we built it, and how it works.TRANSCRIPT
Apache Samza*Stream Processing at LinkedIn
Chris Riccomini11/13/2013
* Incubating
Stream Processing?
Response latency
0 ms
Response latency
RPC
Synchronous
0 ms
Response latency
RPC
Synchronous Later. Possibly much later.
0 ms
Response latency
Samza
Milliseconds to minutes
RPC
Synchronous Later. Possibly much later.
0 ms
Newsfeed
News
Ad Relevance
Search Indexing Pipeline
Metrics and Monitoring
Motivation
Real-time Feeds
• User activity• Metrics• Monitoring• Database Changes
Real-time Feeds
• 10+ billion writes per day• 172,000 messages per second
(average)• 55+ billion messages per day to real-
time consumers
Stream Processing is Hard
• Partitioning• State• Re-processing• Failure semantics• Joins to services or database• Non-determinism
Samza Concepts &
Architecture
Streams
Partition 0 Partition 1 Partition 2
Streams
Partition 0 Partition 1 Partition 2
123456
12345
1234567
Streams
Partition 0 Partition 1 Partition 2
123456
12345
1234567
Streams
Partition 0 Partition 1 Partition 2
123456
12345
1234567
Streams
Partition 0 Partition 1 Partition 2
123456
12345
1234567
Streams
Partition 0 Partition 1 Partition 2
123456
12345
1234567
Streams
Partition 0 Partition 1 Partition 2
next append
123456
12345
1234567
TasksPartition 0
TasksPartition 0
Task 1
TasksPartition 0
class PageKeyViewsCounterTask implements StreamTask { public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
GenericRecord record = ((GenericRecord) envelope.getMsg()); String pageKey = record.get("page-key").toString(); int newCount = pageKeyViews.get(pageKey).incrementAndGet(); collector.send(countStream, pageKey, newCount); }}
TasksPartition 0
class PageKeyViewsCounterTask implements StreamTask { public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
GenericRecord record = ((GenericRecord) envelope.getMsg()); String pageKey = record.get("page-key").toString(); int newCount = pageKeyViews.get(pageKey).incrementAndGet(); collector.send(countStream, pageKey, newCount); }}
TasksPartition 0
class PageKeyViewsCounterTask implements StreamTask { public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
GenericRecord record = ((GenericRecord) envelope.getMsg()); String pageKey = record.get("page-key").toString(); int newCount = pageKeyViews.get(pageKey).incrementAndGet(); collector.send(countStream, pageKey, newCount); }}
TasksPartition 0
class PageKeyViewsCounterTask implements StreamTask { public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
GenericRecord record = ((GenericRecord) envelope.getMsg()); String pageKey = record.get("page-key").toString(); int newCount = pageKeyViews.get(pageKey).incrementAndGet(); collector.send(countStream, pageKey, newCount); }}
TasksPartition 0
class PageKeyViewsCounterTask implements StreamTask { public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
GenericRecord record = ((GenericRecord) envelope.getMsg()); String pageKey = record.get("page-key").toString(); int newCount = pageKeyViews.get(pageKey).incrementAndGet(); collector.send(countStream, pageKey, newCount); }}
TasksPartition 0
class PageKeyViewsCounterTask implements StreamTask { public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
GenericRecord record = ((GenericRecord) envelope.getMsg()); String pageKey = record.get("page-key").toString(); int newCount = pageKeyViews.get(pageKey).incrementAndGet(); collector.send(countStream, pageKey, newCount); }}
TasksPartition 0
class PageKeyViewsCounterTask implements StreamTask { public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
GenericRecord record = ((GenericRecord) envelope.getMsg()); String pageKey = record.get("page-key").toString(); int newCount = pageKeyViews.get(pageKey).incrementAndGet(); collector.send(countStream, pageKey, newCount); }}
TasksPartition 0
class PageKeyViewsCounterTask implements StreamTask { public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
GenericRecord record = ((GenericRecord) envelope.getMsg()); String pageKey = record.get("page-key").toString(); int newCount = pageKeyViews.get(pageKey).incrementAndGet(); collector.send(countStream, pageKey, newCount); }}
TasksPartition 0
class PageKeyViewsCounterTask implements StreamTask { public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
GenericRecord record = ((GenericRecord) envelope.getMsg()); String pageKey = record.get("page-key").toString(); int newCount = pageKeyViews.get(pageKey).incrementAndGet(); collector.send(countStream, pageKey, newCount); }}
TasksPartition 0
class PageKeyViewsCounterTask implements StreamTask { public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
GenericRecord record = ((GenericRecord) envelope.getMsg()); String pageKey = record.get("page-key").toString(); int newCount = pageKeyViews.get(pageKey).incrementAndGet(); collector.send(countStream, pageKey, newCount); }}
TasksPartition 0
Task 1
Tasks
PageKeyViewsCounterTask
Partition 0 Partition 1
Page Views - Partition 0
1234
Output Count Stream
Tasks
PageKeyViewsCounterTask
Partition 0 Partition 1
Page Views - Partition 0
1234
Output Count Stream
Tasks
PageKeyViewsCounterTask
Partition 0 Partition 1
Page Views - Partition 0
1234
Output Count Stream
Tasks
PageKeyViewsCounterTask
Partition 0Partition 1
Page Views - Partition 0
1234
Output Count Stream
Tasks
PageKeyViewsCounterTask
Partition 0Partition 1
Page Views - Partition 0
1234
Output Count Stream
Tasks
PageKeyViewsCounterTask
Partition 0Partition 1
Page Views - Partition 0
1234
Output Count Stream
Tasks
PageKeyViewsCounterTask
Partition 0 Partition 1
Page Views - Partition 0
1234
Output Count Stream
Tasks
PageKeyViewsCounterTask
Partition 0 Partition 1
Page Views - Partition 0
1234
Output Count Stream
Tasks
PageKeyViewsCounterTask
Partition 0 Partition 1
1234
2
Partition 1Checkpoint
Stream
Page Views - Partition 0
Output Count Stream
Tasks
PageKeyViewsCounterTask
Partition 0 Partition 1
1234
2
Partition 1Checkpoint
Stream
Page Views - Partition 0
Output Count Stream
Tasks
PageKeyViewsCounterTask
Partition 0 Partition 1
1234
2
Partition 1Checkpoint
Stream
Page Views - Partition 0
Output Count Stream
Tasks
PageKeyViewsCounterTask
Partition 0
Partition 1
1234
2
Partition 1Checkpoint
Stream
Page Views - Partition 0
Output Count Stream
Tasks
PageKeyViewsCounterTask
Partition 0
Partition 1
1234
2
Partition 1Checkpoint
Stream
Page Views - Partition 0
Output Count Stream
Tasks
PageKeyViewsCounterTask
Partition 0
Partition 1
1234
2
Partition 1Checkpoint
Stream
Page Views - Partition 0
Output Count Stream
Tasks
PageKeyViewsCounterTask
Partition 0
Partition 1
1234
2
Partition 1Checkpoint
Stream
Page Views - Partition 0
Output Count Stream
Tasks
PageKeyViewsCounterTask
Partition 0
Partition 1
1234
2
Partition 1Checkpoint
Stream
Page Views - Partition 0
Output Count Stream
Tasks
PageKeyViewsCounterTask
Partition 0
Partition 1
1234
2
Partition 1Checkpoint
Stream
Page Views - Partition 0
Output Count Stream
JobsStream A
Task 1 Task 2 Task 3
Stream B
JobsStream A Stream B
Task 1 Task 2 Task 3
Stream C
JobsAdViews AdClicks
Task 1 Task 2 Task 3
AdClickThroughRate
JobsAdViews AdClicks
Task 1 Task 2 Task 3
AdClickThroughRate
JobsStream A Stream B
Task 1 Task 2 Task 3
Stream C
DataflowStream A Stream B Stream C
Stream E
Stream B
Job 1 Job 2
Stream D
Job 3
DataflowStream A Stream B Stream C
Stream E
Stream B
Job 1 Job 2
Stream D
Job 3
YARN
YARN
You: I want to run command X on two machines with 512M of memory.
YARN
You: I want to run command X on two machines with 512M of memory.
YARN: Cool, where’s your code?
YARN
You: I want to run command X on two machines with 512M of memory.
YARN: Cool, where’s your code?
You: http://some-host/jobs/download/my.tgz
YARN
You: I want to run command X on two machines with 512M of memory.
YARN: Cool, where’s your code?
You: http://some-host/jobs/download/my.tgz
YARN: I’ve run your command on grid-node-2 and grid-node-7.
YARN
Host 1 Host 2 Host 3
YARN
Host 1
NM
Host 2
NM
Host 3
NM
YARN
Host 1
NM
Host 2
NM
Host 3
NM
Host 0
RM
YARN
Host 1
NM
Host 2
NM
Host 3
NM
Host 0
RMClient
YARN
Host 1
NM
Host 2
NM
Host 3
NM
Host 0
RMClient
YARN
Host 1
NM
Host 2
NM
Host 3
NM
Host 0
RMClient
YARN
Host 1
NM
Host 2
NM
Host 3
NM
Host 0
RMClient
AM
YARN
Host 1
NM
Host 2
NM
Host 3
NM
Host 0
RMClient
AM
YARN
Host 1
NM
Host 2
NM
Host 3
NM
Host 0
RMClient
AM
YARN
Host 1
NM
Host 2
NM
Host 3
NM
Host 0
RMClient
AM
Container
YARN
Host 1
NM
Host 2
NM
Host 3
NM
Host 0
RMClient
AM
Container
YARN
Host 1
NM
Host 2
NM
Host 3
NM
Host 0
RMClient
AM
YARN
Host 1
NM
Host 2
NM
Host 3
NM
Host 0
RMClient
AM
YARN
Host 1
NM
Host 2
NM
Host 3
NM
Host 0
RMClient
AM
YARN
Host 1
NM
Host 2
NM
Host 3
NM
Host 0
RMClient
AM
YARN
Host 1
NM
Host 2
NM
Host 3
NM
Host 0
RMClient
AM
Container
JobsStream A
Task 1 Task 2 Task 3
Stream B
Containers
Task 1 Task 2 Task 3
Stream B
Stream A
Containers
Stream B
Stream A
Samza Container 1 Samza Container 2
Containers
Samza Container 1 Samza Container 2
YARN
Samza Container 1 Samza Container 2
Host 1 Host 2
YARN
Samza Container 1 Samza Container 2
NodeManager NodeManager
Host 1 Host 2
YARN
Samza Container 1 Samza Container 2
NodeManager NodeManager
Samza YARN AM
Host 1 Host 2
YARN
Samza Container 1 Samza Container 2
NodeManager
Kafka Broker
NodeManager
Samza YARN AM
Kafka Broker
Host 1 Host 2
YARN
MapReduceContainer
MapReduce Container
NodeManager
HDFS
NodeManager
MapReduce YARN AM
HDFS
Host 1 Host 2
YARN
Samza Container 1
NodeManager
Kafka Broker
Host 1
Stream C
Stream A
Samza Container 1 Samza Container 2
YARN
Samza Container 1
NodeManager
Kafka Broker
Host 1
Stream C
Stream A
Samza Container 1 Samza Container 2
YARN
Samza Container 1
NodeManager
Kafka Broker
Host 1
Stream C
Stream A
Samza Container 1 Samza Container 2
YARN
Samza Container 1
NodeManager
Kafka Broker
Host 1
Stream C
Stream A
Samza Container 1 Samza Container 2
YARN
Samza Container 1 Samza Container 2
NodeManager
Kafka Broker
NodeManager
Samza YARN AM
Kafka Broker
Host 1 Host 2
CGroups
Samza Container 1 Samza Container 2
NodeManager
Kafka Broker
NodeManager
Samza YARN AM
Kafka Broker
Host 1 Host 2
(Not Running) Multi-Framework
Samza Container 1 MapReduceContainer
NodeManager
Kafka
NodeManager
Samza YARN AM
HDFS
Host 1 Host 2
Stateful Processing
SELECT col1, count(*) FROM stream1INNER JOIN stream2ON stream1.col3 = stream2.col3WHERE col2 > 20 GROUP BY col1 ORDER BY count(*) DESC LIMIT 50;
SELECT col1, count(*) FROM stream1INNER JOIN stream2ON stream1.col3 = stream2.col3WHERE col2 > 20 GROUP BY col1 ORDER BY count(*) DESC LIMIT 50;
SELECT col1, count(*) FROM stream1INNER JOIN stream2ON stream1.col3 = stream2.col3WHERE col2 > 20 GROUP BY col1 ORDER BY count(*) DESC LIMIT 50;
SELECT col1, count(*) FROM stream1INNER JOIN stream2ON stream1.col3 = stream2.col3WHERE col2 > 20 GROUP BY col1 ORDER BY count(*) DESC LIMIT 10;
How do people do this?
Remote StoresStream A
Task 1 Task 2 Task 3
Stream B
Key-Value Store
Remote RPC is slow
• Stream: ~500k records/sec/container• DB: << less
Online vs. Async
No undo
• Database state is non-deterministic• Can’t roll back mutations if task
crashes
Tables & Streams
Database
put(a, w)
put(b, x)
put(a, y)
put(b, z)
Time
Stateful TasksStream A
Task 1 Task 2 Task 3
Stream B
Stateful TasksStream A
Task 1 Task 2 Task 3
Stream B
Stateful TasksStream A
Task 1 Task 2 Task 3
Stream B Changelog Stream
Stateful TasksStream A
Task 1 Task 2 Task 3
Stream B Changelog Stream
Stateful TasksStream A
Task 1 Task 2 Task 3
Stream B Changelog Stream
Stateful TasksStream A
Task 1 Task 2 Task 3
Stream B Changelog Stream
Stateful TasksStream A
Task 1 Task 2 Task 3
Stream B Changelog Stream
Stateful TasksStream A
Task 1 Task 2 Task 3
Stream B Changelog Stream
Stateful TasksStream A
Task 1 Task 2 Task 3
Stream B Changelog Stream
Stateful TasksStream A
Task 1 Task 2 Task 3
Stream B Changelog Stream
Stateful TasksStream A
Task 1 Task 2 Task 3
Stream B Changelog Stream
Stateful TasksStream A
Task 1 Task 2 Task 3
Stream B Changelog Stream
Stateful TasksStream A
Task 1 Task 2 Task 3
Stream B Changelog Stream
Stateful TasksStream A
Task 1 Task 2 Task 3
Stream B Changelog Stream
Key-Value Store
• put(table_name, key, value)• get(table_name, key)• delete(table_name, key)• range(table_name, key1, key2)
Stateful Stream Task
public class SimpleStatefulTask implements StreamTask, InitableTask { private KeyValueStore<String, String> store; public void init(Config config, TaskContext context) { this.store = context.getStore("mystore"); } public void process( IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
GenericRecord record = (GenericRecord) envelope.getMessage(); String memberId = record.get("member_id"); String name = record.get("name"); System.out.println("old name: " + store.get(memberId)); store.put(memberId, name); }}
Stateful Stream Task
public class SimpleStatefulTask implements StreamTask, InitableTask { private KeyValueStore<String, String> store; public void init(Config config, TaskContext context) { this.store = context.getStore("mystore"); } public void process( IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
GenericRecord record = (GenericRecord) envelope.getMessage(); String memberId = record.get("member_id"); String name = record.get("name"); System.out.println("old name: " + store.get(memberId)); store.put(memberId, name); }}
Stateful Stream Task
public class SimpleStatefulTask implements StreamTask, InitableTask { private KeyValueStore<String, String> store; public void init(Config config, TaskContext context) { this.store = context.getStore("mystore"); } public void process( IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
GenericRecord record = (GenericRecord) envelope.getMessage(); String memberId = record.get("member_id"); String name = record.get("name"); System.out.println("old name: " + store.get(memberId)); store.put(memberId, name); }}
Stateful Stream Task
public class SimpleStatefulTask implements StreamTask, InitableTask { private KeyValueStore<String, String> store; public void init(Config config, TaskContext context) { this.store = context.getStore("mystore"); } public void process( IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
GenericRecord record = (GenericRecord) envelope.getMessage(); String memberId = record.get("member_id"); String name = record.get("name"); System.out.println("old name: " + store.get(memberId)); store.put(memberId, name); }}
Whew!
Let’s be Friends!
• We are incubating, and you can help!• Get up and running in 5 minutes
http://bit.ly/hello-samza• Grab some newbie JIRAs
http://bit.ly/samza_newbie_issues