apache incubator samza: stream processing at linkedin
Post on 25-May-2015
765 Views
Preview:
DESCRIPTION
TRANSCRIPT
Apache Samza*Stream Processing at LinkedIn
Chris Riccomini11/13/2013
* Incubating
Stream Processing?
Response latency
0 ms
Response latency
RPC
Synchronous
0 ms
Response latency
RPC
Synchronous Later. Possibly much later.
0 ms
Response latency
Samza
Milliseconds to minutes
RPC
Synchronous Later. Possibly much later.
0 ms
Newsfeed
News
Ad Relevance
Search Indexing Pipeline
Metrics and Monitoring
Motivation
Real-time Feeds
• User activity• Metrics• Monitoring• Database Changes
Real-time Feeds
• 10+ billion writes per day• 172,000 messages per second
(average)• 55+ billion messages per day to real-
time consumers
Stream Processing is Hard
• Partitioning• State• Re-processing• Failure semantics• Joins to services or database• Non-determinism
Samza Concepts &
Architecture
Streams
Partition 0 Partition 1 Partition 2
Streams
Partition 0 Partition 1 Partition 2
123456
12345
1234567
Streams
Partition 0 Partition 1 Partition 2
123456
12345
1234567
Streams
Partition 0 Partition 1 Partition 2
123456
12345
1234567
Streams
Partition 0 Partition 1 Partition 2
123456
12345
1234567
Streams
Partition 0 Partition 1 Partition 2
123456
12345
1234567
Streams
Partition 0 Partition 1 Partition 2
next append
123456
12345
1234567
TasksPartition 0
TasksPartition 0
Task 1
TasksPartition 0
class PageKeyViewsCounterTask implements StreamTask { public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
GenericRecord record = ((GenericRecord) envelope.getMsg()); String pageKey = record.get("page-key").toString(); int newCount = pageKeyViews.get(pageKey).incrementAndGet(); collector.send(countStream, pageKey, newCount); }}
TasksPartition 0
class PageKeyViewsCounterTask implements StreamTask { public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
GenericRecord record = ((GenericRecord) envelope.getMsg()); String pageKey = record.get("page-key").toString(); int newCount = pageKeyViews.get(pageKey).incrementAndGet(); collector.send(countStream, pageKey, newCount); }}
TasksPartition 0
class PageKeyViewsCounterTask implements StreamTask { public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
GenericRecord record = ((GenericRecord) envelope.getMsg()); String pageKey = record.get("page-key").toString(); int newCount = pageKeyViews.get(pageKey).incrementAndGet(); collector.send(countStream, pageKey, newCount); }}
TasksPartition 0
class PageKeyViewsCounterTask implements StreamTask { public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
GenericRecord record = ((GenericRecord) envelope.getMsg()); String pageKey = record.get("page-key").toString(); int newCount = pageKeyViews.get(pageKey).incrementAndGet(); collector.send(countStream, pageKey, newCount); }}
TasksPartition 0
class PageKeyViewsCounterTask implements StreamTask { public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
GenericRecord record = ((GenericRecord) envelope.getMsg()); String pageKey = record.get("page-key").toString(); int newCount = pageKeyViews.get(pageKey).incrementAndGet(); collector.send(countStream, pageKey, newCount); }}
TasksPartition 0
class PageKeyViewsCounterTask implements StreamTask { public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
GenericRecord record = ((GenericRecord) envelope.getMsg()); String pageKey = record.get("page-key").toString(); int newCount = pageKeyViews.get(pageKey).incrementAndGet(); collector.send(countStream, pageKey, newCount); }}
TasksPartition 0
class PageKeyViewsCounterTask implements StreamTask { public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
GenericRecord record = ((GenericRecord) envelope.getMsg()); String pageKey = record.get("page-key").toString(); int newCount = pageKeyViews.get(pageKey).incrementAndGet(); collector.send(countStream, pageKey, newCount); }}
TasksPartition 0
class PageKeyViewsCounterTask implements StreamTask { public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
GenericRecord record = ((GenericRecord) envelope.getMsg()); String pageKey = record.get("page-key").toString(); int newCount = pageKeyViews.get(pageKey).incrementAndGet(); collector.send(countStream, pageKey, newCount); }}
TasksPartition 0
class PageKeyViewsCounterTask implements StreamTask { public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
GenericRecord record = ((GenericRecord) envelope.getMsg()); String pageKey = record.get("page-key").toString(); int newCount = pageKeyViews.get(pageKey).incrementAndGet(); collector.send(countStream, pageKey, newCount); }}
TasksPartition 0
class PageKeyViewsCounterTask implements StreamTask { public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
GenericRecord record = ((GenericRecord) envelope.getMsg()); String pageKey = record.get("page-key").toString(); int newCount = pageKeyViews.get(pageKey).incrementAndGet(); collector.send(countStream, pageKey, newCount); }}
TasksPartition 0
Task 1
Tasks
PageKeyViewsCounterTask
Partition 0 Partition 1
Page Views - Partition 0
1234
Output Count Stream
Tasks
PageKeyViewsCounterTask
Partition 0 Partition 1
Page Views - Partition 0
1234
Output Count Stream
Tasks
PageKeyViewsCounterTask
Partition 0 Partition 1
Page Views - Partition 0
1234
Output Count Stream
Tasks
PageKeyViewsCounterTask
Partition 0Partition 1
Page Views - Partition 0
1234
Output Count Stream
Tasks
PageKeyViewsCounterTask
Partition 0Partition 1
Page Views - Partition 0
1234
Output Count Stream
Tasks
PageKeyViewsCounterTask
Partition 0Partition 1
Page Views - Partition 0
1234
Output Count Stream
Tasks
PageKeyViewsCounterTask
Partition 0 Partition 1
Page Views - Partition 0
1234
Output Count Stream
Tasks
PageKeyViewsCounterTask
Partition 0 Partition 1
Page Views - Partition 0
1234
Output Count Stream
Tasks
PageKeyViewsCounterTask
Partition 0 Partition 1
1234
2
Partition 1Checkpoint
Stream
Page Views - Partition 0
Output Count Stream
Tasks
PageKeyViewsCounterTask
Partition 0 Partition 1
1234
2
Partition 1Checkpoint
Stream
Page Views - Partition 0
Output Count Stream
Tasks
PageKeyViewsCounterTask
Partition 0 Partition 1
1234
2
Partition 1Checkpoint
Stream
Page Views - Partition 0
Output Count Stream
Tasks
PageKeyViewsCounterTask
Partition 0
Partition 1
1234
2
Partition 1Checkpoint
Stream
Page Views - Partition 0
Output Count Stream
Tasks
PageKeyViewsCounterTask
Partition 0
Partition 1
1234
2
Partition 1Checkpoint
Stream
Page Views - Partition 0
Output Count Stream
Tasks
PageKeyViewsCounterTask
Partition 0
Partition 1
1234
2
Partition 1Checkpoint
Stream
Page Views - Partition 0
Output Count Stream
Tasks
PageKeyViewsCounterTask
Partition 0
Partition 1
1234
2
Partition 1Checkpoint
Stream
Page Views - Partition 0
Output Count Stream
Tasks
PageKeyViewsCounterTask
Partition 0
Partition 1
1234
2
Partition 1Checkpoint
Stream
Page Views - Partition 0
Output Count Stream
Tasks
PageKeyViewsCounterTask
Partition 0
Partition 1
1234
2
Partition 1Checkpoint
Stream
Page Views - Partition 0
Output Count Stream
JobsStream A
Task 1 Task 2 Task 3
Stream B
JobsStream A Stream B
Task 1 Task 2 Task 3
Stream C
JobsAdViews AdClicks
Task 1 Task 2 Task 3
AdClickThroughRate
JobsAdViews AdClicks
Task 1 Task 2 Task 3
AdClickThroughRate
JobsStream A Stream B
Task 1 Task 2 Task 3
Stream C
DataflowStream A Stream B Stream C
Stream E
Stream B
Job 1 Job 2
Stream D
Job 3
DataflowStream A Stream B Stream C
Stream E
Stream B
Job 1 Job 2
Stream D
Job 3
YARN
YARN
You: I want to run command X on two machines with 512M of memory.
YARN
You: I want to run command X on two machines with 512M of memory.
YARN: Cool, where’s your code?
YARN
You: I want to run command X on two machines with 512M of memory.
YARN: Cool, where’s your code?
You: http://some-host/jobs/download/my.tgz
YARN
You: I want to run command X on two machines with 512M of memory.
YARN: Cool, where’s your code?
You: http://some-host/jobs/download/my.tgz
YARN: I’ve run your command on grid-node-2 and grid-node-7.
YARN
Host 1 Host 2 Host 3
YARN
Host 1
NM
Host 2
NM
Host 3
NM
YARN
Host 1
NM
Host 2
NM
Host 3
NM
Host 0
RM
YARN
Host 1
NM
Host 2
NM
Host 3
NM
Host 0
RMClient
YARN
Host 1
NM
Host 2
NM
Host 3
NM
Host 0
RMClient
YARN
Host 1
NM
Host 2
NM
Host 3
NM
Host 0
RMClient
YARN
Host 1
NM
Host 2
NM
Host 3
NM
Host 0
RMClient
AM
YARN
Host 1
NM
Host 2
NM
Host 3
NM
Host 0
RMClient
AM
YARN
Host 1
NM
Host 2
NM
Host 3
NM
Host 0
RMClient
AM
YARN
Host 1
NM
Host 2
NM
Host 3
NM
Host 0
RMClient
AM
Container
YARN
Host 1
NM
Host 2
NM
Host 3
NM
Host 0
RMClient
AM
Container
YARN
Host 1
NM
Host 2
NM
Host 3
NM
Host 0
RMClient
AM
YARN
Host 1
NM
Host 2
NM
Host 3
NM
Host 0
RMClient
AM
YARN
Host 1
NM
Host 2
NM
Host 3
NM
Host 0
RMClient
AM
YARN
Host 1
NM
Host 2
NM
Host 3
NM
Host 0
RMClient
AM
YARN
Host 1
NM
Host 2
NM
Host 3
NM
Host 0
RMClient
AM
Container
JobsStream A
Task 1 Task 2 Task 3
Stream B
Containers
Task 1 Task 2 Task 3
Stream B
Stream A
Containers
Stream B
Stream A
Samza Container 1 Samza Container 2
Containers
Samza Container 1 Samza Container 2
YARN
Samza Container 1 Samza Container 2
Host 1 Host 2
YARN
Samza Container 1 Samza Container 2
NodeManager NodeManager
Host 1 Host 2
YARN
Samza Container 1 Samza Container 2
NodeManager NodeManager
Samza YARN AM
Host 1 Host 2
YARN
Samza Container 1 Samza Container 2
NodeManager
Kafka Broker
NodeManager
Samza YARN AM
Kafka Broker
Host 1 Host 2
YARN
MapReduceContainer
MapReduce Container
NodeManager
HDFS
NodeManager
MapReduce YARN AM
HDFS
Host 1 Host 2
YARN
Samza Container 1
NodeManager
Kafka Broker
Host 1
Stream C
Stream A
Samza Container 1 Samza Container 2
YARN
Samza Container 1
NodeManager
Kafka Broker
Host 1
Stream C
Stream A
Samza Container 1 Samza Container 2
YARN
Samza Container 1
NodeManager
Kafka Broker
Host 1
Stream C
Stream A
Samza Container 1 Samza Container 2
YARN
Samza Container 1
NodeManager
Kafka Broker
Host 1
Stream C
Stream A
Samza Container 1 Samza Container 2
YARN
Samza Container 1 Samza Container 2
NodeManager
Kafka Broker
NodeManager
Samza YARN AM
Kafka Broker
Host 1 Host 2
CGroups
Samza Container 1 Samza Container 2
NodeManager
Kafka Broker
NodeManager
Samza YARN AM
Kafka Broker
Host 1 Host 2
(Not Running) Multi-Framework
Samza Container 1 MapReduceContainer
NodeManager
Kafka
NodeManager
Samza YARN AM
HDFS
Host 1 Host 2
Stateful Processing
SELECT col1, count(*) FROM stream1INNER JOIN stream2ON stream1.col3 = stream2.col3WHERE col2 > 20 GROUP BY col1 ORDER BY count(*) DESC LIMIT 50;
SELECT col1, count(*) FROM stream1INNER JOIN stream2ON stream1.col3 = stream2.col3WHERE col2 > 20 GROUP BY col1 ORDER BY count(*) DESC LIMIT 50;
SELECT col1, count(*) FROM stream1INNER JOIN stream2ON stream1.col3 = stream2.col3WHERE col2 > 20 GROUP BY col1 ORDER BY count(*) DESC LIMIT 50;
SELECT col1, count(*) FROM stream1INNER JOIN stream2ON stream1.col3 = stream2.col3WHERE col2 > 20 GROUP BY col1 ORDER BY count(*) DESC LIMIT 10;
How do people do this?
Remote StoresStream A
Task 1 Task 2 Task 3
Stream B
Key-Value Store
Remote RPC is slow
• Stream: ~500k records/sec/container• DB: << less
Online vs. Async
No undo
• Database state is non-deterministic• Can’t roll back mutations if task
crashes
Tables & Streams
Database
put(a, w)
put(b, x)
put(a, y)
put(b, z)
Time
Stateful TasksStream A
Task 1 Task 2 Task 3
Stream B
Stateful TasksStream A
Task 1 Task 2 Task 3
Stream B
Stateful TasksStream A
Task 1 Task 2 Task 3
Stream B Changelog Stream
Stateful TasksStream A
Task 1 Task 2 Task 3
Stream B Changelog Stream
Stateful TasksStream A
Task 1 Task 2 Task 3
Stream B Changelog Stream
Stateful TasksStream A
Task 1 Task 2 Task 3
Stream B Changelog Stream
Stateful TasksStream A
Task 1 Task 2 Task 3
Stream B Changelog Stream
Stateful TasksStream A
Task 1 Task 2 Task 3
Stream B Changelog Stream
Stateful TasksStream A
Task 1 Task 2 Task 3
Stream B Changelog Stream
Stateful TasksStream A
Task 1 Task 2 Task 3
Stream B Changelog Stream
Stateful TasksStream A
Task 1 Task 2 Task 3
Stream B Changelog Stream
Stateful TasksStream A
Task 1 Task 2 Task 3
Stream B Changelog Stream
Stateful TasksStream A
Task 1 Task 2 Task 3
Stream B Changelog Stream
Stateful TasksStream A
Task 1 Task 2 Task 3
Stream B Changelog Stream
Key-Value Store
• put(table_name, key, value)• get(table_name, key)• delete(table_name, key)• range(table_name, key1, key2)
Stateful Stream Task
public class SimpleStatefulTask implements StreamTask, InitableTask { private KeyValueStore<String, String> store; public void init(Config config, TaskContext context) { this.store = context.getStore("mystore"); } public void process( IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
GenericRecord record = (GenericRecord) envelope.getMessage(); String memberId = record.get("member_id"); String name = record.get("name"); System.out.println("old name: " + store.get(memberId)); store.put(memberId, name); }}
Stateful Stream Task
public class SimpleStatefulTask implements StreamTask, InitableTask { private KeyValueStore<String, String> store; public void init(Config config, TaskContext context) { this.store = context.getStore("mystore"); } public void process( IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
GenericRecord record = (GenericRecord) envelope.getMessage(); String memberId = record.get("member_id"); String name = record.get("name"); System.out.println("old name: " + store.get(memberId)); store.put(memberId, name); }}
Stateful Stream Task
public class SimpleStatefulTask implements StreamTask, InitableTask { private KeyValueStore<String, String> store; public void init(Config config, TaskContext context) { this.store = context.getStore("mystore"); } public void process( IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
GenericRecord record = (GenericRecord) envelope.getMessage(); String memberId = record.get("member_id"); String name = record.get("name"); System.out.println("old name: " + store.get(memberId)); store.put(memberId, name); }}
Stateful Stream Task
public class SimpleStatefulTask implements StreamTask, InitableTask { private KeyValueStore<String, String> store; public void init(Config config, TaskContext context) { this.store = context.getStore("mystore"); } public void process( IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
GenericRecord record = (GenericRecord) envelope.getMessage(); String memberId = record.get("member_id"); String name = record.get("name"); System.out.println("old name: " + store.get(memberId)); store.put(memberId, name); }}
Whew!
Let’s be Friends!
• We are incubating, and you can help!• Get up and running in 5 minutes
http://bit.ly/hello-samza• Grab some newbie JIRAs
http://bit.ly/samza_newbie_issues
top related