Download - Real-time Data Processing
![Page 1: Real-time Data Processing](https://reader038.vdocuments.us/reader038/viewer/2022100517/554f0ad4b4c90577698b5412/html5/thumbnails/1.jpg)
Real-Time Data ProcessingEmerging Business Meetup - 07/24/13Bryan Warner @ Traackr
![Page 2: Real-time Data Processing](https://reader038.vdocuments.us/reader038/viewer/2022100517/554f0ad4b4c90577698b5412/html5/thumbnails/2.jpg)
About Me● Bryan Warner - Engineer @ Traackr
● Primary background is in Java○ Breaking into Scala development this past year
● Interested in search, data scalability, and distributed computing
![Page 3: Real-time Data Processing](https://reader038.vdocuments.us/reader038/viewer/2022100517/554f0ad4b4c90577698b5412/html5/thumbnails/3.jpg)
About Traackr● Influencer search engine
○ Platform for discovering and engaging online individuals who matter
● We track content and metrics for our database of influential people○ Both in RT and daily processes
● Some of our back-end stack includes: ElasticSearch, MongoDb, Java/Spring, Scala/Akka, etc.
● Looking for developers to use our API!
![Page 4: Real-time Data Processing](https://reader038.vdocuments.us/reader038/viewer/2022100517/554f0ad4b4c90577698b5412/html5/thumbnails/4.jpg)
Overview● Review Traackr's use case for real-time data
processing
● Technical solution we decided on
● Questions
![Page 5: Real-time Data Processing](https://reader038.vdocuments.us/reader038/viewer/2022100517/554f0ad4b4c90577698b5412/html5/thumbnails/5.jpg)
Traackr Use Case1. Real-time content stream for a targeted group of
influencers within our platform
a. Primarily to show real-time tweets via our Twitter data provider (GNIP)
2. On-demand content tracking and searching for new influencers
a. Users can add up to a hundred people at onceb. Expect that new influencer content is searchable
near real-time
![Page 6: Real-time Data Processing](https://reader038.vdocuments.us/reader038/viewer/2022100517/554f0ad4b4c90577698b5412/html5/thumbnails/6.jpg)
Traackr Use CaseData Processing Requirements
1. Incoming data is not lost
2. Data needs to be analyzed and enriched
3. Each type of data has its own processing component* Blog Posts, Tweets, Videos, Images, etc.
4. Components should be configurable for maximum throughput!
5. Components should act like small building blocks
![Page 7: Real-time Data Processing](https://reader038.vdocuments.us/reader038/viewer/2022100517/554f0ad4b4c90577698b5412/html5/thumbnails/7.jpg)
Bird's Eye View
Tracking App
MongoDb
Initial Persist
Content "Enrichment" Pipeline
RabbitMQ Broker
Queue
ElasticSearch
GNIP Listener App
"Post" Payload
Queue Listener
Make content searchable
![Page 8: Real-time Data Processing](https://reader038.vdocuments.us/reader038/viewer/2022100517/554f0ad4b4c90577698b5412/html5/thumbnails/8.jpg)
Content Pipeline● Apache Camel (http://camel.apache.org/)
○ Integration framework based on Enterprise Integration patterns (EIP)
● Flexible route building○ Supports direct and asynchronous components○ Integrates with DI frameworks (e.g. Spring, Guice)○ Tons of native support for various transports (http,
jms, amqp, tcp, imap, etc.)
● Good support for unit testing○ org.apache.camel.component.mock.MockEndpoint
![Page 9: Real-time Data Processing](https://reader038.vdocuments.us/reader038/viewer/2022100517/554f0ad4b4c90577698b5412/html5/thumbnails/9.jpg)
Content PipelineQueue
Queue Listener
Search Indexer
Tweet Processor
Blog Processor
Image Processor
Routing Filter
ROUTE
![Page 10: Real-time Data Processing](https://reader038.vdocuments.us/reader038/viewer/2022100517/554f0ad4b4c90577698b5412/html5/thumbnails/10.jpg)
Content PipelineInitial Approach:
● Route(s) live within a CamelContext in your JVM● Initial route is utilizing direct components (serial)
from(<queue.uri>).routeId("my-route").choice()
.when(simple("${in.body.isTweet()}")) .to("bean:languageAnalyzer?method=detectLanguage") .to("bean:tweetAnalyzer?method=extractMentions")
.when(simple("${in.body.isBlog()}")).to("bean:httpService?method=fetchFullContent")
.to("bean:languageAnalyzer?method=detectLanguage") .otherwise() .to("bean:imageAnalyzer?method=categorizeImage")
.end()
.to("bean:searchService?method=indexContent");
But there's a throughput problem...
![Page 11: Real-time Data Processing](https://reader038.vdocuments.us/reader038/viewer/2022100517/554f0ad4b4c90577698b5412/html5/thumbnails/11.jpg)
Content PipelineTWEET
TWEET
IMAGE
TWEET
TWEET
BLOG
TWEET
TWEET
TWEET
BLOG
TWEET
TWEET
● If Tweets come into the system at 5/sec, then Tweet processing rate has to be >= 5/sec
● If a blog post takes 5 seconds to process (on average)...
● And an image takes 30 seconds to process (on average)...
then...
Queue
HEAD
![Page 12: Real-time Data Processing](https://reader038.vdocuments.us/reader038/viewer/2022100517/554f0ad4b4c90577698b5412/html5/thumbnails/12.jpg)
Content PipelineExpanded Approach:
● Utilize Seda Components (http://camel.apache.org/seda.html)○ Underlying Thread pool with BlockingQueue
from(<queue.uri>).routeId("my-route").choice()
.when(simple("${in.body.isTweet()}")).to("seda:tweetEnricher") .when(simple("${in.body.isBlog()}")).to("seda:blogEnricher") .otherwise().to("seda:imageEnricher")
.end();
from("seda:tweetEnricher?concurrentConsumers=10").routeId("tweet-route").to("bean:languageAnalyzer?method=detectLanguage").to("bean:tweetAnalyzer?method=extractMentions").to("seda:searchService")
from("seda:blogEnricher?concurrentConsumers=2").routeId("blog-route") ...from("seda:imageEnricher?concurrentConsumers=2").routeId("img-route") ...
// Routes re-join from("seda:searchService?concurrentConsumers=X").routeId("s-indexer") .to("bean:searchService?method=indexContent");
![Page 13: Real-time Data Processing](https://reader038.vdocuments.us/reader038/viewer/2022100517/554f0ad4b4c90577698b5412/html5/thumbnails/13.jpg)
Content PipelineQueue
Queue Listener
Search Indexer
Tweet Processor
Blog Processor
Image Processor
ROUTE
Routing Filter
ThreadPool + BlockingQueue
![Page 14: Real-time Data Processing](https://reader038.vdocuments.us/reader038/viewer/2022100517/554f0ad4b4c90577698b5412/html5/thumbnails/14.jpg)
Content PipelineCaveats:
● No visibility into SEDA's thread pool state (e.g. how many objects on its internal queue?)
● If VM crashes, those payloads on the SEDA thread pool blocking queue are lost
● Our route is assuming that each payload consists of only one message○ In reality, our payloads are a mix of different post types ... how to
handle this efficiently?
![Page 15: Real-time Data Processing](https://reader038.vdocuments.us/reader038/viewer/2022100517/554f0ad4b4c90577698b5412/html5/thumbnails/15.jpg)
Content PipelineFinal Solution:
from(<queue.uri>).routeId("my-route").split().method("payloadSplitterService", "splitMessage").choice()
.when(header("enrichTweets").isEqualTo(true)).to(<queue.uri.tweet>) .when(header("enrichBlogs").isEqualTo(true)).to(<queue.uri.blogs>)
.otherwise().to("<queue.uri.images>").end();
from(<queue.uri.tweet>).routeId("queue-in-tweet-route") .to("seda:tweetEnricher?timeout=0");
from("seda:tweetEnricher?concurrentConsumers=10&size=0&blockWhenFull=true").routeId("tweet-route")
.to("bean:languageAnalyzer?method=detectLanguage") .to("bean:tweetAnalyzer?method=extractMentions") .to("seda:searchService")
from("seda:searchService?concurrentConsumers=X").routeId("s-indexer") .to("bean:searchService?method=indexContent");
![Page 16: Real-time Data Processing](https://reader038.vdocuments.us/reader038/viewer/2022100517/554f0ad4b4c90577698b5412/html5/thumbnails/16.jpg)
Questions