integrate solr with real-time stream processing applications
DESCRIPTION
Presented by Timothy Potter, Founder, Text Centrix Storm is a real-time distributed computation system used to process massive streams of data. Many organizations are turning to technologies like Storm to complement batch-oriented big data technologies, such as Hadoop, to deliver time-sensitive analytics at scale. This talk introduces on an emerging architectural pattern of integrating Solr and Storm to process big data in real time. There are a number of natural integration points between Solr and Storm, such as populating a Solr index or supplying data to Storm using Solr’s real-time get support. In this session, Timothy will cover the basic concepts of Storm, such as spouts and bolts. He’ll then provide examples of how to integrate Solr into Storm to perform large-scale indexing in near real-time. In addition, we'll see how to embed Solr in a Storm bolt to match incoming tuples against pre-configured queries, commonly known as percolator. Attendees will come away from this presentation with a good introduction to stream processing technologies and several real-world use cases of how to integrate Solr with Storm.TRANSCRIPT
![Page 1: Integrate Solr with real-time stream processing applications](https://reader034.vdocuments.us/reader034/viewer/2022042601/55502e70b4c9058f2f8b4cfc/html5/thumbnails/1.jpg)
![Page 2: Integrate Solr with real-time stream processing applications](https://reader034.vdocuments.us/reader034/viewer/2022042601/55502e70b4c9058f2f8b4cfc/html5/thumbnails/2.jpg)
INTEGRATE SOLR WITH REAL-TIME STREAM PROCESSING APPLICATIONS Timothy Potter
@thelabdude linkedin.com/thelabdude
![Page 3: Integrate Solr with real-time stream processing applications](https://reader034.vdocuments.us/reader034/viewer/2022042601/55502e70b4c9058f2f8b4cfc/html5/thumbnails/3.jpg)
whoami
independent consultant search / big data projects soon to be joining engineering team @LucidWorks
co-author Solr In Action previously big data architect Dachis Group
h"p://www.linkedin.com/in/thelabdude
![Page 4: Integrate Solr with real-time stream processing applications](https://reader034.vdocuments.us/reader034/viewer/2022042601/55502e70b4c9058f2f8b4cfc/html5/thumbnails/4.jpg)
my storm story
re-designed a complex batch-oriented indexing pipeline based on Hadoop (Oozie, Pig, Hive, Sqoop)
to real-time storm topology
![Page 5: Integrate Solr with real-time stream processing applications](https://reader034.vdocuments.us/reader034/viewer/2022042601/55502e70b4c9058f2f8b4cfc/html5/thumbnails/5.jpg)
agenda
walk through how to develop a storm topology common integration points with Solr
(near real-time indexing, percolator, real-time get)
![Page 6: Integrate Solr with real-time stream processing applications](https://reader034.vdocuments.us/reader034/viewer/2022042601/55502e70b4c9058f2f8b4cfc/html5/thumbnails/6.jpg)
example listen to click events from 1.usa.gov URL shortener (bit.ly) to determine trending US government sites
stream of click events: http://developer.usa.gov/1usagov
http://www.smartgrid.gov -> http://1.usa.gov/ayu0Ru
![Page 7: Integrate Solr with real-time stream processing applications](https://reader034.vdocuments.us/reader034/viewer/2022042601/55502e70b4c9058f2f8b4cfc/html5/thumbnails/7.jpg)
beyond word count
tackle real challenges you’ll encounter when developing a storm topology
and what about ... unit testing, dependency injection, measure runtime behavior of your components, separation
of concerns, reducing boilerplate, hiding complexity ...
![Page 8: Integrate Solr with real-time stream processing applications](https://reader034.vdocuments.us/reader034/viewer/2022042601/55502e70b4c9058f2f8b4cfc/html5/thumbnails/8.jpg)
storm
open source distributed computation system scalability, fault-tolerance, guaranteed message
processing (optional)
![Page 9: Integrate Solr with real-time stream processing applications](https://reader034.vdocuments.us/reader034/viewer/2022042601/55502e70b4c9058f2f8b4cfc/html5/thumbnails/9.jpg)
storm primitives
• tuple: ordered list of values • stream: unbounded sequence of tuples • spout: emit a stream of tuples (source) • bolt: performs some operation on each tuple • topology: dag of spouts and tuples
![Page 10: Integrate Solr with real-time stream processing applications](https://reader034.vdocuments.us/reader034/viewer/2022042601/55502e70b4c9058f2f8b4cfc/html5/thumbnails/10.jpg)
solution requirements
• receive click events from 1.usa.gov stream • count frequency of pages in a time window • rank top N sites per time window • extract title, body text, image for each link • persist rankings and metadata for visualization
![Page 11: Integrate Solr with real-time stream processing applications](https://reader034.vdocuments.us/reader034/viewer/2022042601/55502e70b4c9058f2f8b4cfc/html5/thumbnails/11.jpg)
trending snapshot (sept 12, 2013)
![Page 12: Integrate Solr with real-time stream processing applications](https://reader034.vdocuments.us/reader034/viewer/2022042601/55502e70b4c9058f2f8b4cfc/html5/thumbnails/12.jpg)
h"p://solarsystem.nasa.gov/mul8media/display.cfm?Category=Spacecra?&IM_ID=17966
![Page 13: Integrate Solr with real-time stream processing applications](https://reader034.vdocuments.us/reader034/viewer/2022042601/55502e70b4c9058f2f8b4cfc/html5/thumbnails/13.jpg)
Solr
Metrics DB
EnrichLink Bolt
Solr Indexing
Bolt
1.usa.gov Spout
Rolling Count
Bolt
Intermediate Rankings
Bolt
Total Rankings
Bolt
embed.ly API
field grouping bit.ly hash
field grouping bit.ly hash
global grouping
Persist Rankings
Bolt
field grouping
obj global
grouping
provided by in the storm-starter project
data store
bolt
spout
stream
grouping
![Page 14: Integrate Solr with real-time stream processing applications](https://reader034.vdocuments.us/reader034/viewer/2022042601/55502e70b4c9058f2f8b4cfc/html5/thumbnails/14.jpg)
stream grouping
• shuffle: random distribution of tuples to all instances of a bolt
• field(s): group tuples by one or more fields in common • global: reduce down to one • all: replicate stream to all instances of a bolt
source: https://github.com/nathanmarz/storm/wiki/Concepts
![Page 15: Integrate Solr with real-time stream processing applications](https://reader034.vdocuments.us/reader034/viewer/2022042601/55502e70b4c9058f2f8b4cfc/html5/thumbnails/15.jpg)
useful storm concepts
• bolts can receive input from many spouts • tuples in a stream can be grouped together • streams can be split and joined • bolts can inject new tuples into the stream • components can be distributed across a cluster at a
configurable parallelism level • optionally, storm keeps track of each tuple emitted by a
spout (ack or fail)
![Page 16: Integrate Solr with real-time stream processing applications](https://reader034.vdocuments.us/reader034/viewer/2022042601/55502e70b4c9058f2f8b4cfc/html5/thumbnails/16.jpg)
tools
• Spring framework – dependency injection, configuration, unit testing, mature, etc.
• Groovy – keeps your code tidy and elegant • Mockito – ignore stuff your test doesn’t care about • Netty – fast & powerful NIO networking library • Coda Hale metrics – get visibility into how your bolts and
spouts are performing (at a very low-level)
![Page 17: Integrate Solr with real-time stream processing applications](https://reader034.vdocuments.us/reader034/viewer/2022042601/55502e70b4c9058f2f8b4cfc/html5/thumbnails/17.jpg)
spout
easy! just produce a stream of tuples ...
and ... avoid blocking when waiting for more data, ease off throttle if topology is not processing fast enough, deal with failed tuples, choose if it
should use message Ids for each tuple emitted, data model / schema, etc ...
![Page 18: Integrate Solr with real-time stream processing applications](https://reader034.vdocuments.us/reader034/viewer/2022042601/55502e70b4c9058f2f8b4cfc/html5/thumbnails/18.jpg)
SpringBolt SpringSpout
Streaming DataAction
(POJO)
Streaming DataProvider
(POJO)
Spring container (1 per topology per JVM)
Spring Dependency
Injection
JDBC WebService
Hide complexity of implementing Storm contract
developer focuses on business
logic
![Page 19: Integrate Solr with real-time stream processing applications](https://reader034.vdocuments.us/reader034/viewer/2022042601/55502e70b4c9058f2f8b4cfc/html5/thumbnails/19.jpg)
streaming data provider class OneUsaGovStreamingDataProvider implements StreamingDataProvider, MessageHandler { MessageStream messageStream ... void open(Map stormConf) { messageStream.receive(this) } boolean next(NamedValues nv) { String msg = queue.poll() if (msg) { OneUsaGovRequest req = objectMapper.readValue(msg, OneUsaGovRequest) if (req != null && req.globalBitlyHash != null) { nv.set(OneUsaGovTopology.GLOBAL_BITLY_HASH, req.globalBitlyHash) nv.set(OneUsaGovTopology.JSON_PAYLOAD, req) return true } } return false } void handleMessage(String msg) { queue.offer(msg) }
Spring Dependency Injection
non-blocking call to get the next message from 1.usa.gov
callback to receive messages from Netty HttpClient
use Jackson JSON parser to create an object from the raw incoming data
![Page 20: Integrate Solr with real-time stream processing applications](https://reader034.vdocuments.us/reader034/viewer/2022042601/55502e70b4c9058f2f8b4cfc/html5/thumbnails/20.jpg)
jackson json to java @JsonIgnoreProperties(ignoreUnknown = true) class OneUsaGovRequest implements Serializable { @JsonProperty("a") String userAgent; @JsonProperty("c") String countryCode; @JsonProperty("nk") int knownUser; @JsonProperty("g") String globalBitlyHash; @JsonProperty("h") String encodingUserBitlyHash; @JsonProperty("l") String encodingUserLogin; ... }
Spring converts json to java object for you: <bean id="restTemplate" class="org.springframework.web.client.RestTemplate"> <property name="messageConverters"> <list> <bean id="messageConverter” class="...json.MappingJackson2HttpMessageConverter"> </bean> </list> </property> </bean>
![Page 21: Integrate Solr with real-time stream processing applications](https://reader034.vdocuments.us/reader034/viewer/2022042601/55502e70b4c9058f2f8b4cfc/html5/thumbnails/21.jpg)
spout data provider spring-managed bean
<bean id="oneUsaGovStreamingDataProvider" class="com.bigdatajumpstart.storm.OneUsaGovStreamingDataProvider"> <property name="messageStream"> <bean class="com.bigdatajumpstart.netty.HttpClient"> <constructor-‐arg index="0" value="${streamUrl}"/> </bean> </property> </bean>
builder.setSpout("1.usa.gov-‐spout", new SpringSpout("oneUsaGovStreamingDataProvider", spoutFields), 1)
Note: when building the StormTopology to submit to Storm, you do:
![Page 22: Integrate Solr with real-time stream processing applications](https://reader034.vdocuments.us/reader034/viewer/2022042601/55502e70b4c9058f2f8b4cfc/html5/thumbnails/22.jpg)
class OneUsaGovStreamingDataProviderTest extends StreamingDataProviderTestBase { @Test void testDataProvider() { String jsonStr = '''{ "a": "user-‐agent", "c": "US", "nk": 0, "tz": "America/Los_Angeles", "gr": "OR", "g": "2BktiW", "h": "12Me4B2", "l": "usairforce", "al": "en-‐us", "hh": "1.usa.gov", "r": "http://example.com/foo", ... }''' OneUsaGovStreamingDataProvider dataProvider = new OneUsaGovStreamingDataProvider() dataProvider.setMessageStream(mock(MessageStream)) dataProvider.open(stormConf) // Config setup in base class dataProvider.handleMessage(jsonStr) NamedValues record = new NamedValues(OneUsaGovTopology.spoutFields) assertTrue dataProvider.next(record) ... } }
spout data provider unit test
mock json to simulate data from 1.usa.gov feed
use Mockito to satisfy dependencies not needed for this test
asserts to verify data provider works correctly
![Page 23: Integrate Solr with real-time stream processing applications](https://reader034.vdocuments.us/reader034/viewer/2022042601/55502e70b4c9058f2f8b4cfc/html5/thumbnails/23.jpg)
rolling count bolt
• counts frequency of links in a sliding time window • emits topN in current window every M seconds • uses tick tuple trick provided by Storm to emit
every M seconds (configurable) • provided with the storm-starter project
http://www.michael-noll.com/blog/2013/01/18/implementing-real-time-trending-topics-in-storm/
![Page 24: Integrate Solr with real-time stream processing applications](https://reader034.vdocuments.us/reader034/viewer/2022042601/55502e70b4c9058f2f8b4cfc/html5/thumbnails/24.jpg)
• calls out to embed.ly API • caches results locally in the bolt instance • relies on field grouping (incoming tuples) • outputs data to be indexed in Solr • benefits from parallelism to enrich more
links concurrently (watch those rate limits)
enrich link metadata bolt
![Page 25: Integrate Solr with real-time stream processing applications](https://reader034.vdocuments.us/reader034/viewer/2022042601/55502e70b4c9058f2f8b4cfc/html5/thumbnails/25.jpg)
embed.ly service class EmbedlyService { @Autowired RestTemplate restTemplate String apiKey private Timer apiTimer = MetricsSupport.timer(EmbedlyService, "apiCall") Embedly getLinkMetadata(String link) { String urlEncoded = URLEncoder.encode(link,"UTF-‐8") URI uri = new URI("https://api.embed.ly/1/oembed?key=${apiKey}&url=${urlEncoded}") Embedly embedly = null MetricsSupport.withTimer(apiTimer, { embedly = restTemplate.getForObject(uri, Embedly) }) return embedly } }
simple closure to time our requests to the Web service
integrate coda hale metrics
![Page 26: Integrate Solr with real-time stream processing applications](https://reader034.vdocuments.us/reader034/viewer/2022042601/55502e70b4c9058f2f8b4cfc/html5/thumbnails/26.jpg)
• capture runtime behavior of the components in your topology
• Coda Hale metrics - http://metrics.codahale.com/ • output metrics every N minutes • report metrics to JMX, ganglia, graphite, etc
metrics
![Page 27: Integrate Solr with real-time stream processing applications](https://reader034.vdocuments.us/reader034/viewer/2022042601/55502e70b4c9058f2f8b4cfc/html5/thumbnails/27.jpg)
-‐-‐ Meters -‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐ EnrichLinkBoltLogic.solrQueries count = 97 mean rate = 0.81 events/second 1-‐minute rate = 0.89 events/second 5-‐minute rate = 1.62 events/second 15-‐minute rate = 1.86 events/second SolrBoltLogic.linksIndexed count = 60 mean rate = 0.50 events/second 1-‐minute rate = 0.41 events/second 5-‐minute rate = 0.16 events/second 15-‐minute rate = 0.06 events/second -‐-‐ Timers -‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐-‐ EmbedlyService.apiCall count = 60 mean rate = 0.50 calls/second 1-‐minute rate = 0.40 calls/second 5-‐minute rate = 0.16 calls/second 15-‐minute rate = 0.06 calls/second min = 138.70 milliseconds max = 7642.92 milliseconds mean = 1148.29 milliseconds stddev = 1281.40 milliseconds median = 652.83 milliseconds 75% <= 1620.96 milliseconds ...
![Page 28: Integrate Solr with real-time stream processing applications](https://reader034.vdocuments.us/reader034/viewer/2022042601/55502e70b4c9058f2f8b4cfc/html5/thumbnails/28.jpg)
storm cluster concepts
• nimbus: master node (~job tracker in Hadoop) • zookeeper: cluster management / coordination • supervisor: one per node in the cluster to manage worker
processes • worker: one or more per supervisor (JVM process) • executor: thread in worker • task: work performed by a spout or bolt
![Page 29: Integrate Solr with real-time stream processing applications](https://reader034.vdocuments.us/reader034/viewer/2022042601/55502e70b4c9058f2f8b4cfc/html5/thumbnails/29.jpg)
Worker 1 (port 6701)
Nimbus
Supervisor (1 per node)
Topology JAR
Node 1
JVM process
executor (thread) ... N workers
... M nodes
Each component (spout or bolt) is distributed across a cluster of workers based on a configurable parallelism
Zookeeper
![Page 30: Integrate Solr with real-time stream processing applications](https://reader034.vdocuments.us/reader034/viewer/2022042601/55502e70b4c9058f2f8b4cfc/html5/thumbnails/30.jpg)
@Override StormTopology build(StreamingApp app) throws Exception { ... TopologyBuilder builder = new TopologyBuilder() builder.setSpout("1.usa.gov-‐spout", new SpringSpout("oneUsaGovStreamingDataProvider", spoutFields), 1) builder.setBolt("enrich-‐link-‐bolt", new SpringBolt("enrichLinkAction", enrichedLinkFields), 3) .fieldsGrouping("1.usa.gov-‐spout", globalBitlyHashGrouping) ...
parallelism hint to the framework
(can be rebalanced)
![Page 31: Integrate Solr with real-time stream processing applications](https://reader034.vdocuments.us/reader034/viewer/2022042601/55502e70b4c9058f2f8b4cfc/html5/thumbnails/31.jpg)
solr integration points
• real-time get • near real-time indexing (NRT) • percolate (match incoming docs to pre-existing
queries)
![Page 32: Integrate Solr with real-time stream processing applications](https://reader034.vdocuments.us/reader034/viewer/2022042601/55502e70b4c9058f2f8b4cfc/html5/thumbnails/32.jpg)
real-time get use Solr for fast lookups by document ID
class SolrClient { @Autowired SolrServer solrServer SolrDocument get(String docId, String... fields) { SolrQuery q = new SolrQuery() q.setRequestHandler("/get") q.set("id", docId) q.setFields(fields) QueryRequest req = new QueryRequest(q) req.setResponseParser(new BinaryResponseParser()) QueryResponse rsp = req.process(solrServer) return (SolrDocument)rsp.getResponse().get("doc") } }
send the request to the “get” request handler
![Page 33: Integrate Solr with real-time stream processing applications](https://reader034.vdocuments.us/reader034/viewer/2022042601/55502e70b4c9058f2f8b4cfc/html5/thumbnails/33.jpg)
near real-time indexing
• If possible, use CloudSolrServer to route documents directly to the correct shard leaders (SOLR-4816)
• Use <openSearcher>false</openSearcher> for auto “hard” commits
• Use auto soft commits as needed • Use parallelism of Storm bolt to distribute indexing work to
N nodes
http://searchhub.org/2013/08/23/understanding-transaction-logs-softcommit-and-commit-in-sorlcloud/
![Page 34: Integrate Solr with real-time stream processing applications](https://reader034.vdocuments.us/reader034/viewer/2022042601/55502e70b4c9058f2f8b4cfc/html5/thumbnails/34.jpg)
percolate
• match incoming documents to pre-configured queries (inverted search) – example: Is this tweet related to campaign Y for brand X?
• use storm’s distributed computation support to evaluate M pre-configured queries per doc
![Page 35: Integrate Solr with real-time stream processing applications](https://reader034.vdocuments.us/reader034/viewer/2022042601/55502e70b4c9058f2f8b4cfc/html5/thumbnails/35.jpg)
two possible approaches
• Lucene-only solution using MemoryIndex – See presentation by Charlie Hull and Alan Woodward
• EmbeddedSolrServer – Full solrconfig.xml / schema.xml – RAMDirectory – Relies on Storm to scale up documents / second – Easy solution for up to a few thousand queries
![Page 36: Integrate Solr with real-time stream processing applications](https://reader034.vdocuments.us/reader034/viewer/2022042601/55502e70b4c9058f2f8b4cfc/html5/thumbnails/36.jpg)
Twi"er Spout
PercolatorBolt 1
Embedded SolrServer
Pre-‐configured queries stored in
a database
PercolatorBolt N
Embedded SolrServer
... Could be 100’s of these
random stream grouping ZeroMQ
pub/sub to push query changes to percolator
![Page 37: Integrate Solr with real-time stream processing applications](https://reader034.vdocuments.us/reader034/viewer/2022042601/55502e70b4c9058f2f8b4cfc/html5/thumbnails/37.jpg)
tick tuples
• send a special kind of tuple to a bolt every N seconds
if (TupleHelpers.isTickTuple(input)) { // do special work }
used in percolator to delete accumulated documents every minute or so ...
![Page 38: Integrate Solr with real-time stream processing applications](https://reader034.vdocuments.us/reader034/viewer/2022042601/55502e70b4c9058f2f8b4cfc/html5/thumbnails/38.jpg)
references • Storm Wiki
• https://github.com/nathanmarz/storm/wiki/Documentation
• Overview: Krishna Gade • http://www.slideshare.net/KrishnaGade2/storm-at-twitter
• Trending Topics: Michael Knoll • http://www.michael-noll.com/blog/2013/01/18/implementing-real-time-
trending-topics-in-storm/
• Understanding Parallelism: Michael Knoll • http://www.michael-noll.com/blog/2012/10/16/understanding-the-
parallelism-of-a-storm-topology/
![Page 39: Integrate Solr with real-time stream processing applications](https://reader034.vdocuments.us/reader034/viewer/2022042601/55502e70b4c9058f2f8b4cfc/html5/thumbnails/39.jpg)
get the code: https://github.com/thelabdude/lsrdublin
Q & A
Manning coupon codes for conference related books: h"p://deals.manningpublica8ons.com/Revolu8onsEU2013.html