Download - Heatmap
1
ProcessingProcessing“BIG-DATA”“BIG-DATA”In In Real TimeReal Time
Yanai Franchi , TikalYanai Franchi , Tikal
2
3
Vacation to BarcelonaVacation to Barcelona
4
After a Long Travel DayAfter a Long Travel Day
5
Going to a Salsa Club
6
Best Salsa Club NOW
● Good Music
● Crowded – Now!
7
Same Problem in “gogobot”
8
9
gogobot checkinHeat Map Service
Lets' Develop“Gogobot Checkins Heat-Map”
10
Key Notes● Collector Service - Collects checkins as text addresses
– We need to use GeoLocation ServiceWe need to use GeoLocation Service
● Upon elapsed interval, the last locations list will be displayed as Heat-Map in GUI.
● Web Scale service – 10Ks checkins/seconds all over the world (imaginary, but lets do it for the exercise).
● Accuracy – Sample data, NOT critical data.
– Proportionately representative
– Data volume is large enough to is large enough to compensate for data loss.compensate for data loss.
11
Heat-Map Context
Text-Address
Checkins Heat-MapService
Gogobot System
GogobotMicro Service
GogobotMicro Service
GogobotMicro Service
Geo LocationService
Get-GeoCode(Address)
Heat-Map
Last Interval Locations
12
Database
Persist Checkin Intervals
ProcessingCheckins
ReadText Address
Check-in #1Check-in #2Check-in #3Check-in #4Check-in #5Check-in #6Check-in #7Check-in #8Check-in #9...
Simulate Checkins with a File
Plan A
GET Geo Location
Geo LocationService
13
Tons of Addresses Arriving Every Second
14
Architect - First Reaction...
15
Second Reaction...
16
DeveloperFirst
Reaction
17
SecondReaction
18
Problems ?
● Tedious: Spend time confi guring where to send messages, deploying workers, and deploying intermediate queues.
● Brittle: There's little fault-tolerance.
● Painful to scale: Partition of running worker/s is complicated.
19
What We Want ?● Horizontal scalability● Fault-tolerance● No intermediate message brokers!● Higher level abstraction than message
passing● “Just works”● Guaranteed data processing (not in this
case)
20
Apache Storm
✔Horizontal scalability
✔Fault-tolerance
✔No intermediate message brokers!
✔Higher level abstraction than message passing
✔“Just works”
✔Guaranteed data processing
21
Anatomy of Storm
22
What is Storm ?
● CEP - Open source and distributed realtime computation system. – Makes it easy to Makes it easy to reliably process unboundedreliably process unbounded streams streams ofof
tuplestuples– Doing for realtime processing what Hadoop did for batch Doing for realtime processing what Hadoop did for batch
processing.processing.
● Fast - 1M Tuples/sec per node. – It is scalable,fault-tolerant, guarantees your data will be It is scalable,fault-tolerant, guarantees your data will be
processed, and is easy to set up and operate.processed, and is easy to set up and operate.
23
Streams
Tuple Tuple Tuple Tuple Tuple Tuple
Unbounded sequence of tuples
24
Spouts
TupleTuple
Sources of Streams
Tuple Tuple
25
Bolts
Tuple
TupleTuple
Processes input streams and produces new streams
TupleTupleTupleTuple
Tuple TupleTuple
26
Storm Topology
Network of spouts and bolts
Tuple
TupleTuple
TupleTuple TupleTupleTuple TupleTupleTuple
Tuple
Tuple
TupleTuple TupleTupleTuple
27
Guarantee for Processing
● Storm guarantees the full processing of a tuple by tracking its state
● In case of failure, Storm can re-process it.● Source tuples with full “acked” trees are removed
from the system
28
Tasks (Bolt/Spout Instance)
Spouts and bolts execute asmany tasks across the cluster
29
Stream Grouping
When a tuple is emitted, which task (instance) does it go to?
30
Stream Grouping
● Shuffl e grouping: pick a random task● Fields grouping: consistent hashing on a subset of
tuple fi elds● All grouping: send to all tasks● Global grouping: pick task with lowest id
31
Tasks , Executors , Workers
Task Task Task
Worker Process
Sput /Bolt
Sput /Bolt
Sput /Bolt=
Executor Thread
JVM
Executor Thread
32
Bolt B Bolt B
Worker Process
Executor
Spout A
Executor
Node
SupervisorBolt C Bolt C
Executor
Bolt B Bolt B
Worker Process
Executor
Spout A
Executor
Node
SupervisorBolt C Bolt C
Executor
33
Nimbus
Supervisor Supervisor
Supervisor Supervisor
Supervisor Supervisor
Upload/Rebalance Heat-Map Topology
Zoo Keeper Nodes
Storm Architecture
Master Node (similar to Hadoop JobTracker)
NOT criticalfor running topology
34
Nimbus
Supervisor Supervisor
Supervisor Supervisor
Supervisor Supervisor
Upload/Rebalance Heat-Map Topology
Zoo Keeper
Storm Architecture
Used For Cluster Coordination
A few nodes
35
Nimbus
Supervisor Supervisor
Supervisor Supervisor
Supervisor Supervisor
Upload/Rebalance Heat-Map Topology
Zoo Keeper
Storm Architecture
Run Worker Processes
36
Assembling Heatmap Topology
37
HeatMap Input/Output Tuples
● Input Tuples: Timestamp and Text Address : – (9:00:07 PM , “287 Hudson St New York NY 10013”)(9:00:07 PM , “287 Hudson St New York NY 10013”)
● Output Tuple: Time interval, and a list of points for it:– (9:00:00 PM to 9:00:15 PM, (9:00:00 PM to 9:00:15 PM,
ListList((((40.719,-73.98740.719,-73.987),(40.726,-74.001),(),(40.726,-74.001),(40.719,-73.98740.719,-73.987))))
38
Checkins Spout
GeocodeLookup
Bolt
HeatmapBuilder
Bolt
PersistorBolt
(9:01 PM @ 287 Hudson st)
(9:01 PM , (40.736, -74,354)))
Heat Map Storm
Topology(9:00 PM – 9:15 PM , List((40.73, -74,34),
(51.36, -83,33),(69.73, -34,24))
Upon Elapsed Interval
39
Checkins Spoutpublic class CheckinsSpout extends BaseRichSpout {
private List<String> sampleLocations;private int nextEmitIndex;private SpoutOutputCollector outputCollector;
@Overridepublic void open(Map map, TopologyContext topologyContext,
SpoutOutputCollector spoutOutputCollector) {this.outputCollector = spoutOutputCollector;this.nextEmitIndex = 0;sampleLocations = IOUtils.readLines(
ClassLoader.getSystemResourceAsStream("sanple-locations.txt"));}
@Overridepublic void nextTuple() {
String address = checkins.get(nextEmitIndex);String checkin = new Date().getTime()+"@ADDRESS:"+address;
outputCollector.emit(new Values(checkin));nextEmitIndex = (nextEmitIndex + 1) % sampleLocations.size();
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {declarer.declare(new Fields("str"));
}}
We hold stateNo need for thread safety
Declare output fields
Been called iteratively by Storm
40
Geocode Lookup Bolt
public class GeocodeLookupBolt extends BaseBasicBolt {private LocatorService locatorService;
@Overridepublic void prepare(Map stormConf, TopologyContext context) {
locatorService = new GoogleLocatorService();}
@Overridepublic void execute(Tuple tuple, BasicOutputCollector outputCollector) {
String str = tuple.getStringByField("str");String[] parts = str.split("@");Long time = Long.valueOf(parts[0]);String address = parts[1];
LocationDTO locationDTO = locatorService.getLocation(address);if(checkinDTO!=null)
outputCollector.emit(new Values(time,locationDTO) );}
@Overridepublic void declareOutputFields(OutputFieldsDeclarer fieldsDeclarer) {
fieldsDeclarer.declare(new Fields("time", "location"));}
}
Get Geocode, Create DTO
41
Tick Tuple – Repeating Mantra
42
Two Streams to Heat-Map Builder
On tick tuple, we fl ush our Heat-Map
Checkin 1 Checkin 4 Checkin 5 Checkin 6
HeatMap-Builder Bolt
43
Tick Tuple in Actionpublic class HeatMapBuilderBolt extends BaseBasicBolt {
private Map<String, List<LocationDTO>> heatmaps;
@Overridepublic Map<String, Object> getComponentConfiguration() {
Config conf = new Config();conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS, 60 );return conf;
}
@Overridepublic void execute(Tuple tuple, BasicOutputCollector outputCollector) {
if (isTickTuple(tuple)) {// Emit accumulated intervals
} else {// Add check-in info to the current interval in the Map
}}
private boolean isTickTuple(Tuple tuple) { return tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID)
&& tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID);}
Tick interval
Hold latest intervals
44
Persister Boltpublic class PersistorBolt extends BaseBasicBolt {
private Jedis jedis;
@Overridepublic void execute(Tuple tuple, BasicOutputCollector outputCollector) {
Long timeInterval = tuple.getLongByField("time-interval"); String city = tuple.getStringByField("city"); String locationsList = objectMapper.writeValueAsString
( tuple.getValueByField("locationsList"));
String dbKey = "checkins-" + timeInterval+"@"+city;
jedis.setex(dbKey, 3600*24 ,locationsList);
jedis.publish("location-key", dbKey);
}}
Publish in Redis channel for debugging
Persist in Redisfor 24h
45
Shuffle Grouping
Shuffle Grouping
Check-in #1Check-in #2Check-in #3Check-in #4Check-in #5Check-in #6Check-in #7Check-in #8Check-in #9...
Sample Checkins File
Read Text Addresses
Transforming the TuplesCheckins
Spout
GeocodeLookup
Bolt
HeatmapBuilder
Bolt
DatabasePersistor
Bolt
Get Geo Location
Geo LocationService
Field Grouping(city)
Group by city
46
Heat Map Topologypublic class LocalTopologyRunner { public static void main(String[] args) {
TopologyBuilder builder = buildTopolgy();StormSubmitter.submitTopology(
"local-heatmap", new Config(), builder.createTopology());
}
private static TopologyBuilder buildTopolgy() { topologyBuilder builder = new TopologyBuilder(); builder.setSpout("checkins", new CheckinsSpout());
builder.setBolt("geocode-lookup", new GeocodeLookupBolt() ).shuffleGrouping("checkins");
builder.setBolt("heatmap-builder", new HeatMapBuilderBolt() ).fieldsGrouping("geocode-lookup", new Fields("city"));
builder.setBolt("persistor", new PersistorBolt() ).shuffleGrouping("heatmap-builder");
return builder;
}}
47
Its NOT Scaled
48
49
Scaling the Topologypublic class LocalTopologyRunner {conf.setNumWorkers(20); public static void main(String[] args) {
TopologyBuilder builder = buildTopolgy();Config conf = new Config();conf.setNumWorkers(2);StormSubmitter.submitTopology(
"local-heatmap", conf, builder.createTopology()); }
private static TopologyBuilder buildTopolgy() { topologyBuilder builder = new TopologyBuilder(); builder.setSpout("checkins", new CheckinsSpout(), 4 );
builder.setBolt("geocode-lookup", new GeocodeLookupBolt() , 8 ).shuffleGrouping("checkins").setNumTasks(64);
builder.setBolt("heatmap-builder", new HeatMapBuilderBolt() , 4).fieldsGrouping("geocode-lookup", new Fields("city"));
builder.setBolt("persistor", new PersistorBolt() , 2 ).shuffleGrouping("heatmap-builder").setNumTasks(4);
return builder;
}}
Parallelism hint
Increase TasksFor Future
Set no. of workers
50
Demo
51
Database
Storm Heat-Map Topology
Persist Checkin Intervals
GET Geo Location
Check-in #1Check-in #2Check-in #3Check-in #4Check-in #5Check-in #6Check-in #7Check-in #8Check-in #9...
ReadText Address
Sample Checkins File
Recap – Plan A
Geo LocationService
52
We have something working
53
Add Kafka Messaging
54
Plan B - Kafka Spout&Bolt to HeatMap
GeocodeLookup
Bolt
HeatmapBuilder
Bolt
KafkaCheckins
Spout
Database
PersistorBolt
Geo LocationService
Read Text Addresses
Checkin Kafka Topic
Publish Checkins
Locations Topic
KafkaLocations
Bolt
55
56
They all are GoodBut not for all use-cases
57
KafkaA little introduction
58
59
Pub-Sub Messaging System
60
61
62
63
64
Stateless Broker &Doesn't Fear the File System
65
66
67
68
Topics● Logical collections of partitions (the physical fi les). ● A broker contains some of the partitions for a topic
69
A partition is Consumed byExactly One Group's Consumer
70
Distributed & Fault-Tolerant
71
Broker 1 Broker 3Broker 2
Zoo Keeper
Consumer 1 Consumer 2
Producer 1 Producer 2
72
Broker 1 Broker 4Broker 3Broker 2
Zoo Keeper
Consumer 1 Consumer 2
Producer 1 Producer 2
73
Broker 1 Broker 4Broker 3Broker 2
Zoo Keeper
Consumer 1 Consumer 2
Producer 1 Producer 2
74
Broker 1 Broker 4Broker 3Broker 2
Zoo Keeper
Consumer 1 Consumer 2
Producer 1 Producer 2
75
Broker 1 Broker 4Broker 3Broker 2
Zoo Keeper
Consumer 1 Consumer 2
Producer 1 Producer 2
76
Broker 1 Broker 4Broker 3Broker 2
Zoo Keeper
Consumer 1 Consumer 2
Producer 1 Producer 2
77
Broker 1 Broker 4Broker 3Broker 2
Zoo Keeper
Consumer 1 Consumer 2
Producer 1 Producer 2
78
Broker 1 Broker 3Broker 2
Zoo Keeper
Consumer 1 Consumer 2
Producer 1 Producer 2
79
Broker 1 Broker 3Broker 2
Zoo Keeper
Consumer 1 Consumer 2
Producer 1 Producer 2
80
Broker 1 Broker 3Broker 2
Zoo Keeper
Consumer 1 Consumer 2
Producer 1 Producer 2
81
Broker 1 Broker 3Broker 2
Zoo Keeper
Consumer 1
Producer 1 Producer 2
82
Broker 1 Broker 3Broker 2
Zoo Keeper
Consumer 1
Producer 1 Producer 2
83
Broker 1 Broker 3Broker 2
Zoo Keeper
Consumer 1
Producer 1 Producer 2
84
Performance Benchmark1 Broker
1 Producer1 Consumer
85
86
87
Add Kafka to our Topologypublic class LocalTopologyRunner { ...
private static TopologyBuilder buildTopolgy() {
... builder.setSpout("checkins", new KafkaSpout(kafkaConfig)); ... builder.setBolt("kafkaProducer", new KafkaOutputBolt ( "localhost:9092",
"kafka.serializer.StringEncoder","locations-topic"))
.shuffleGrouping("persistor");
return builder;}
}
Kafka Bolt
Kafka Spout
88
Checkin HTTP Reactor
Publish Checkins
Plan C – Add Reactor
Database
Checkin Kafka Topic
Consume Checkins
Storm Heat-Map Topology
Locations Kafka Topic
Publish Interval Key
Persist Checkin Intervals
Geo LocationServiceGET Geo
Location
Index Interval Locations
Search Server
Index
Text-Address
89
Why Reactor ?
90
C10KProblem
91
2008:Thread Per Request/Response
92
...events trigger handlers
Application registers handlers
Reactor Pattern Paradigm
93
Reactor Pattern – Key Points
● Single thread / single event loop● EVERYTHING runs on it● You MUST NOT block the event loop● Many Implementations (partial list):
– Node.js (JavaScrip), EventMachine (Ruby), Twisted Node.js (JavaScrip), EventMachine (Ruby), Twisted (Python)... and Vert.X(Python)... and Vert.X
94
Reactor Pattern Problems
● Some work is naturally blocking:– Intensive data crunchingIntensive data crunching– 3rd-party blocking API’s (e.g. JDBC)3rd-party blocking API’s (e.g. JDBC)
● Pure reactor (e.g. Node.js) is not a good fi t for this kind of work!
95
96
● Vertciles are Execution unit of Vert.x● Single threaded● Verticles communicate by message passing
Verticles
For Blocking IORun in Thread-PoolRun in
Event Loop
97
Vert.X Architecture
Event Bus
Vert.X Architecture
98
Vert.X Goodies● Growing Module● Repository● web server● Persistors (Mongo,
JDBC, ...)● Work queue● Authentication● Manager● Session manager● Socket.IO
● TCP/SSL servers/clients● HTTP/HTTPS servers/
clients● WebSockets support● SockJS support● Timers● Buffers● Streams and Pumps● Routing● Asynchronous File I/O
99
Node.JS vs Vert.X
100
Node.js vs Vert.X
● Node.js– JavaScript OnlyJavaScript Only
– Inherently Single Inherently Single ThreadedThreaded
– No help much with IPCNo help much with IPC
– All code MUST be in All code MUST be in Event loopEvent loop
● Vert.X– Polyglot (JavaScript, Polyglot (JavaScript,
Java, Ruby, Python...)Java, Ruby, Python...)
– Leverages JVM multi-Leverages JVM multi-threadingthreading
– Nervous Event BusNervous Event Bus
– Blocking work can be Blocking work can be done off the event loopdone off the event loop
101
Node.js vs Vert.X Benchmark
AMD Phenom II X6 (6 core), 8GB RAM, Ubuntu 11.04
http://vertxproject.wordpress.com/2012/05/09/vert-x-vs-node-js-simple-http-benchmarks/
102
Event Bus
HTTPServer Verticle
Kafka module
Kafka Topic
StormTopology
HeatMap Reactor ArchitectureVert.X Instance
Automatically sendsEventBus Msg → KafkaTopic
Vert.X Instance
103
Heat-Map Server – Only 6 LOC !
var vertx = require('vertx');var container = require('vertx/container');var console = require('vertx/console');var config = container.config;
vertx.createHttpServer().requestHandler(function(request) {request.dataHandler(function(buffer) {
vertx.eventBus.send(config.address, {payload:buffer.toString()});});request.response.end();
}).listen(config.httpServerPort, config.httpServerHost);
console.log("HTTP CheckinsReactor started on port "+config.httpServerPort);
Send checkin to Vert.X EventBus
104
Database
Checkin HTTP Reactor Checkin
Kafka Topic
Consume Checkins
Storm Heat-Map Topology
Hotzones Kafka Topic
Publish Interval Key
Persist Checkin Intervals
Web App
Geo LocationServiceGET Geo
Location
Get Interval Locations
Consume Intervals Keys
Push via WebSocket
Publish Checkins
Search Server
Index
Index Interval Locations
Search
Checkin HTTP Firehose
105
Demo
106
Lambda Architecture
107
Until Now...
108
Doesn't Answer Many Answers...
● What are the most popular Salsa club in last month?
● How many unique visitors this year , per Salsa club?
● Show histogram of “bouncing” checkins for the last year?
109
Batch Processing
110
Sing in Concert ?
111
Complementary Views
Batch Views Real TimeViews
Just a few hours of data
Time
Now
112
Lambda Architecture
New Data01101001101...
real-timeview
real-timeview
real-timeview
Speed Layer
master dataset
batchview
batchview
batchview
Serving Layer
Batch Layer
QueryHow Many ?
113
Lambda Advantages
● Recover from Human mistakes● No need for random writes on Batch DB● Processed with high precision, and involve
algorithms without losing short-term information
114
Summary
115
When You go out to Salsa Club
● Good Music
● Crowded
116
More Conclusions..
● Storm – Great for real-time BigData processing. Complementary for Hadoop batch jobs.
● Kafka – Great messaging for logs/events data, been served as a good “source” for Storm spout
● Vert.X – Worth trial and check as an alternative for reactor.
● Lambda Architecture – Bring Real-Time and Batch-Processing concert for Big Data.
117
Thanks