realtime processing with storm presentation

70
Realtime Processing with Storm

Upload: gabriel-eisbruch

Post on 09-May-2015

11.156 views

Category:

Technology


2 download

DESCRIPTION

Storm is a distributed, reliable, fault-tolerant system for processing streams of data. In this track we will introduce Storm framework, explain some design concepts and considerations, and show some real world examples to explain how to use it to process large amounts of data in real time, in a distributed environment. We will describe how we can scale this solution very easily as more data need to be processed. We will explain all you need to know to get started with Storm and some tips on how to get your Spouts, Bolts and Topologies up and running in the cloud.

TRANSCRIPT

Page 1: Realtime processing with storm presentation

Realtime Processing with Storm

Page 2: Realtime processing with storm presentation

Who are we?

Dario Simonassi@ldsimonassi

Gabriel Eisbruch@geisbruch

Jonathan Leibiusky@xetorthio

Page 3: Realtime processing with storm presentation

Agenda

● The problem

● Storm - Basic concepts

● Trident

● Deploying

● Some other features

● Contributions

Page 4: Realtime processing with storm presentation

http://necromentia.files.wordpress.com/2011/01/20070103213517_iguazu.jpg

Tons of information arriving every second

Page 5: Realtime processing with storm presentation

http://www.computerkochen.de/fun/bilder/esel.jpg

Heavy workload

Page 6: Realtime processing with storm presentation

http://www.asimon.co.il/data/images/a10689.jpg

Do it Realtime!

Page 7: Realtime processing with storm presentation
Page 8: Realtime processing with storm presentation
Page 9: Realtime processing with storm presentation

Complex

Information Source

Worker

Worker

Worker

Queue

Worker

Worker

Queue

Queue

Queue

Queue

Queue

Queue

Worker

Worker

Worker

Worker

DB

Page 10: Realtime processing with storm presentation

What is Storm?

● Is a highly distributed realtime computation system.

● Provides general primitives to do realtime computation.

● Can be used with any programming language.

● It's scalable and fault-tolerant.

Page 11: Realtime processing with storm presentation

Example - Main Conceptshttps://github.com/geisbruch/strata2012

Page 12: Realtime processing with storm presentation

Main Concepts - Example

"Count tweets related to strataconf and show hashtag totals."

Page 13: Realtime processing with storm presentation

Main Concepts - Spout

● First thing we need is to read all tweets related to strataconf from twitter

● A Spout is a special component that is a source of messages to be processed.

SpoutTuplesTweets

Page 14: Realtime processing with storm presentation

public class ApiStreamingSpout extends BaseRichSpout {SpoutOutputCollector collector;TwitterReader reader;

public void nextTuple() {Tweet tweet = reader.getNextTweet();if(tweet != null)

collector.emit(new Values(tweet));}public void open(Map conf, TopologyContext context,

SpoutOutputCollector collector) {reader = new TwitterReader(conf.get("track"), conf.get("user"), conf.get("pass"));this.collector = collector;

}public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("tweet"));}

}

Main Concepts - Spout

SpoutTuplesTweets

Page 15: Realtime processing with storm presentation

Main Concepts - Bolt

● For every tweet, we find hashtags

● A Bolt is a component that does the processing.

Spouttweet stream tweet

HashtagsSplitter

hashtagshashtagshashtags

Page 16: Realtime processing with storm presentation

public class HashtagsSplitter extends BaseBasicBolt {public void execute(Tuple input, BasicOutputCollector collector) {

Tweet tweet = (Tweet)input.getValueByField("tweet");for(String hashtag : tweet.getHashTags()){

collector.emit(new Values(hashtag));}

}public void declareOutputFields(OutputFieldsDeclarer declarer) {

declarer.declare(new Fields("hashtag"));}

}

Main Concepts - Bolt

Spouttweet stream tweet

HashtagsSplitter

hashtagshashtagshashtags

Page 17: Realtime processing with storm presentation

Main Concepts - Boltpublic class HashtagsCounterBolt extends BaseBasicBolt {

private Jedis jedis;

public void execute(Tuple input, BasicOutputCollector collector) {String hashtag = input.getStringByField("hashtag");jedis.hincrBy("hashs", key, val);

}

public void prepare(Map conf, TopologyContext context) {jedis = new Jedis("localhost");

}}

Spouttweet stream tweet

HashtagsSplitterhashtag

HashtagsCounterBolthashtaghashtag

Page 18: Realtime processing with storm presentation

Main Concepts - Topologypublic static Topology createTopology() {

TopologyBuilder builder = new TopologyBuilder();

builder.setSpout("tweets-collector", new ApiStreamingSpout(), 1);builder.setBolt("hashtags-splitter", new HashTagsSplitter(), 2)

.shuffleGrouping("tweets-collector");builder.setBolt("hashtags-counter", new HashtagsCounterBolt(), 2)

.fieldsGrouping("hashtags-splitter", new Fields("hashtags"));

return builder.createTopology();}

SpoutHashtagsSplitter HashtagsCounterBolt

HashtagsSplitter HashtagsCounterBolt

shuffle fields

Page 19: Realtime processing with storm presentation

Example Overview - Read Tweet

Spout Splitter Counter Redis

This tweet has #hello #world hashtags!!!

Page 20: Realtime processing with storm presentation

Example Overview - Split Tweet

Spout Splitter Counter Redis

This tweet has #hello #world hashtags!!! #hello

#world

Page 21: Realtime processing with storm presentation

Example Overview - Split Tweet

#hello = 1

#world = 1

Spout Splitter Counter Redis

This tweet has #hello #world hashtags!! #hello

#world

incr(#hello)

incr(#world)

Page 22: Realtime processing with storm presentation

Example Overview - Split Tweet

#hello = 1

#world = 1

Spout Splitter Counter Redis

This tweet has #bye #world hashtags

Page 23: Realtime processing with storm presentation

Example Overview - Split Tweet

#hello = 1

#world = 1

Spout Splitter Counter Redis

#bye#world

This tweet has #bye #world hashtags

Page 24: Realtime processing with storm presentation

Example Overview - Split Tweet

#hello = 1

#world = 2#bye =1

Spout Splitter Counter Redis

#bye#world

incr(#bye)

incr(#world)

This tweet has #bye #world hashtags

Page 25: Realtime processing with storm presentation

Main Concepts - In summary

○ Topology

○ Spout

○ Stream

○ Bolt

Page 26: Realtime processing with storm presentation

Guaranteeing Message Processing

http://www.steinborn.com/home_warranty.php

Page 27: Realtime processing with storm presentation

Guaranteeing Message Processing

SpoutHashtagsSplitter HashtagsCounterBolt

HashtagsSplitter HashtagsCounterBolt

Page 28: Realtime processing with storm presentation

Guaranteeing Message Processing

SpoutHashtagsSplitter HashtagsCounterBolt

HashtagsSplitter HashtagsCounterBolt

_collector.ack(tuple);

Page 29: Realtime processing with storm presentation

Guaranteeing Message Processing

SpoutHashtagsSplitter HashtagsCounterBolt

HashtagsSplitter HashtagsCounterBolt

Page 30: Realtime processing with storm presentation

Guaranteeing Message Processing

SpoutHashtagsSplitter HashtagsCounterBolt

HashtagsSplitter HashtagsCounterBolt

Page 31: Realtime processing with storm presentation

Guaranteeing Message Processing

SpoutHashtagsSplitter HashtagsCounterBolt

HashtagsSplitter HashtagsCounterBolt

void fail(Object msgId);

Page 32: Realtime processing with storm presentation

Trident

http://www.raytekmarine.com/images/trident.jpg

Page 33: Realtime processing with storm presentation

Trident - What is Trident?

● High-level abstraction on top of Storm.

● Will make it easier to build topologies.

● Similar to Hadoop, Pig

Page 34: Realtime processing with storm presentation

Trident - Topology

TridentTopology topology = new TridentTopology();

Spouttweet stream tweet

HashtagsSplitter

hashtagshashtagshashtags

Memory

Page 35: Realtime processing with storm presentation

Trident - Stream

Spouttweet stream

Stream tweetsStream = topology .newStream("tweets", new ApiStreamingSpout());

Page 36: Realtime processing with storm presentation

Trident

Spouttweet stream tweet

HashtagsSplitter

hashtagshashtagshashtags

Stream hashtagsStream = tweetsStrem.each(new Fields("tweet"),new BaseFunction() {

public void execute(TridentTuple tuple, TridentCollector collector) {

Tweet tweet = (Tweet)tuple.getValueByField("tweet"); for(String hashtag : tweet.getHashtags()) collector.emit(new Values(hashtag)); } }, new Fields("hashtag")

);

Page 37: Realtime processing with storm presentation

Trident - States

TridentState hashtagCounts = hashtagsStream.groupBy(new Fields("hashtag")) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

Spouttweet stream tweet

HashtagsSplitter

hashtagshashtagshashtags

Memory (State)

Page 38: Realtime processing with storm presentation

Trident - States

TridentState hashtagCounts = hashtagsStream.groupBy(new Fields("hashtag")) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

Spouttweet stream tweet

HashtagsSplitter

hashtagshashtagshashtags

Memory (State)

Page 39: Realtime processing with storm presentation

Trident - States

TridentState hashtagCounts = hashtagsStream.groupBy(new Fields("hashtag")) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

Spouttweet stream tweet

HashtagsSplitter

hashtagshashtagshashtags

Memory (State)

Page 40: Realtime processing with storm presentation

Trident - States

TridentState hashtagCounts = hashtagsStream.groupBy(new Fields("hashtag")) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

Spouttweet stream tweet

HashtagsSplitter

hashtagshashtagshashtags

Memory (State)

Page 41: Realtime processing with storm presentation

Trident - Complete example

TridentState hashtags = topology .newStream("tweets", new ApiStreamingSpout()) .each(new Fields("tweet"),new HashTagSplitter(), new Fields("hashtag")) .groupBy(new Fields("hashtag")) .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))

Spouttweet stream tweet

HashtagsSplitter

hashtagshashtagshashtags

Memory (State)

Page 42: Realtime processing with storm presentation

Transactions

Page 43: Realtime processing with storm presentation

Transactions

● Exactly once message processing semantic

Page 44: Realtime processing with storm presentation

Transactions

● The intermediate processing may be executed in a parallel way

Page 45: Realtime processing with storm presentation

Transactions

● But persistence should be strictly sequential

Page 46: Realtime processing with storm presentation

Transactions

● Spouts and persistence should be designed to be transactional

Page 47: Realtime processing with storm presentation

Trident - Queries

We want to know on-line the count of tweets with hashtag hadoop and the amount of tweets with strataconf hashtag

Page 48: Realtime processing with storm presentation

Trident - DRPC

Page 49: Realtime processing with storm presentation

Trident - Query

DRPCClient drpc = new DRPCClient("MyDRPCServer", "MyDRPCPort");

drpc.execute("counts", "strataconf hadoop");

Page 50: Realtime processing with storm presentation

Trident - Query

Stream drpcStream = topology.newDRPCStream("counts",drpc)

Page 51: Realtime processing with storm presentation

HashtagSplitter

Trident - Query

Stream singleHashtag = drpcStream.each(new Fields("args"), new Split(), new Fields("hashtag"))

DRPC streamDRPC

Server

Page 52: Realtime processing with storm presentation

Trident - Query

Stream hashtagsCountQuery = singleHashtag.stateQuery(hashtagCountMemoryState, new Fields("hashtag"), new MapGet(), new Fields("count"));

DRPC streamDRPC

Server

Hashtag Count Query

HashtagSplitter

Page 53: Realtime processing with storm presentation

Trident - Query

Stream drpcCountAggregate = hashtagsCountQuery.groupBy(new Fields("hashtag")) .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

DRPC streamDRPC

Server

Hashtag Count Query

Aggregate Count

HashtagSplitter

Page 54: Realtime processing with storm presentation

Trident - Query - Complete Example

topology.newDRPCStream("counts",drpc) .each(new Fields("args"), new Split(), new Fields("hashtag")) .stateQuery(hashtags, new Fields("hashtag"),

new MapGet(), new Fields("count")) .each(new Fields("count"), new FilterNull()) .groupBy(new Fields("hashtag")) .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

Page 55: Realtime processing with storm presentation

Trident - Query - Complete Example

DRPC streamDRPC

Server

Hashtag Count Query

Aggregate Count

HashtagSplitter

Page 56: Realtime processing with storm presentation

Trident - Query - Complete Example

DRPC streamDRPC

Server

Hashtag Count Query

Aggregate Count

HashtagSplitter

Client Call

Page 57: Realtime processing with storm presentation

Hashtag Count Query

Trident - Query - Complete Example

DRPC streamDRPC

Server

Aggregate Count

HashtagSplitter

Execute Hold the Request

Page 58: Realtime processing with storm presentation

Trident - Query - Complete Example

DRPC streamDRPC

Server

Hashtag Count Query

Aggregate Count

HashtagSplitter

Request continue holded

On finish return to DRPC server

Page 59: Realtime processing with storm presentation

Trident - Query - Complete Example

DRPC streamDRPC

Server

Hashtag Count Query

Aggregate Count

HashtagSplitter

Return response to the client

Page 60: Realtime processing with storm presentation

Running Topologies

Page 61: Realtime processing with storm presentation

Running Topologies - Local ClusterConfig conf = new Config();conf.put("some_property","some_value")LocalCluster cluster = new LocalCluster();cluster.submitTopology("twitter-hashtag-summarizer", conf, builder.createTopology());

● The whole topology will run in a single machine.

● Similar to run in a production cluster

● Very Useful for test and development

● Configurable parallelism

● Similar methods than StormSubmitter

Page 62: Realtime processing with storm presentation

Running Topologies - Production

package twitter.streaming;public class Topology {

public static void main(String[] args){ StormSubmitter.submitTopology("twitter-hashtag-summarizer",

conf, builder.createTopology());}

}

mvn package

storm jar Strata2012-0.0.1.jar twitter.streaming.Topology \"$REDIS_HOST" $REDIS_PORT "strata,strataconf"

$TWITTER_USER $TWITTER_PASS

Page 63: Realtime processing with storm presentation

Running Topologies - Cluster Layout

Zookeeper Cluster(Only for coordination and cluster state)

Nimbus(The master

Process)

Supervisor(where the processing is done)

Supervisor(where the processing is done)

Supervisor(where the processing is done)

Storm Cluster● Single Master process

● No SPOF

● Automatic Task distribution

● Amount of task configurable by process

● Automatic tasks re-distribution on supervisors fails

● Work continue on nimbus fails

Page 64: Realtime processing with storm presentation

Running Topologies - Cluster Architecture - No SPOF

Nimbus(The master Process)

Supervisor(where the processing is done)

Supervisor(where the processing is done)

Supervisor(where the processing is done)

Storm Cluster

Page 65: Realtime processing with storm presentation

Running Topologies - Cluster Architecture - No SPOF

Nimbus(The master Process)

Supervisor(where the processing is done)

Supervisor(where the processing is done)

Supervisor(where the processing is done)

Storm Cluster

Page 66: Realtime processing with storm presentation

Running Topologies - Cluster Architecture - No SPOF

Nimbus(The master Process)

Supervisor(where the processing is done)

Supervisor(where the processing is done)

Supervisor(where the processing is done)

Storm Cluster

Page 67: Realtime processing with storm presentation

Other features

Page 68: Realtime processing with storm presentation

Other features - Non JVM languages{ "conf": { "topology.message.timeout.secs": 3, // etc }, "context": { "task->component": { "1": "example-spout", "2": "__acker", "3": "example-bolt" }, "taskid": 3 }, "pidDir": "..."}

● Use Non-JVM Languages

● Simple protocol

● Use stdin & stdout for communication

● Many implementations and abstraction layers done by the community

Page 69: Realtime processing with storm presentation

Contribution● A collection of community developed spouts, bolts, serializers, and other goodies.● You can find it in: https://github.com/nathanmarz/storm-contrib

● Some of those goodies are:

○ cassandra: Integrates Storm and Cassandra by writing tuples to a Cassandra Column Family.

○ mongodb: Provides a spout to read documents from mongo, and a bolt to write documents to mongo.

○ SQS: Provides a spout to consume messages from Amazon SQS.

○ hbase: Provides a spout and a bolt to read and write from HBase.

○ kafka: Provides a spout to read messages from kafka.

○ rdbms: Provides a bolt to write tuples to a table.

○ redis-pubsub: Provides a spout to read messages from redis pubsub.

○ And more...

Page 70: Realtime processing with storm presentation

Questions?