introducing the wso2 complex event processor
DESCRIPTION
TRANSCRIPT
![Page 1: Introducing the WSO2 Complex Event Processor](https://reader030.vdocuments.us/reader030/viewer/2022020207/54c6a93a4a795973318b45b6/html5/thumbnails/1.jpg)
Introducing the WSO2 Complex Event Processor
Simplifying Complexities of Data Processing
S. Suhothayan Software Engineer,
Data Technologies Team.
![Page 2: Introducing the WSO2 Complex Event Processor](https://reader030.vdocuments.us/reader030/viewer/2022020207/54c6a93a4a795973318b45b6/html5/thumbnails/2.jpg)
Outline
� Introduction to CEP � WSO2 CEP Server � Siddhi Runtime � HA & Scalability of WSO2 CEP � WSO2 CEP server and WSO2 BAM � Use Cases
![Page 3: Introducing the WSO2 Complex Event Processor](https://reader030.vdocuments.us/reader030/viewer/2022020207/54c6a93a4a795973318b45b6/html5/thumbnails/3.jpg)
Event Processing (Contd.) � Event processing is about listening to events and
detecting patterns in near real-time without storing all events.
� Three models o Simple Event Processing
- Simple filters (e.g. Is this a gold or platinum customer?) o Event Stream Processing
- Looking across multiple event streams and joining multiple event stream etc.
o Complex Event Processing - Processing multiple event streams to identify meaningful
patterns, using complex conditions & temporal windows - E.g. There has been a more than 10% increase in overall
trading activity AND the average price of commodities has fallen 2% in the last 4 hours
![Page 4: Introducing the WSO2 Complex Event Processor](https://reader030.vdocuments.us/reader030/viewer/2022020207/54c6a93a4a795973318b45b6/html5/thumbnails/4.jpg)
Complex Event Processing
� We categorize events into different streams � Process with minimal storage � Use queries to evaluate the continuous event
streams (Usually SQL like query language) � Very fast results (in milliseconds range)
![Page 5: Introducing the WSO2 Complex Event Processor](https://reader030.vdocuments.us/reader030/viewer/2022020207/54c6a93a4a795973318b45b6/html5/thumbnails/5.jpg)
� Types of queries are following o Filters and Projection o Windows – events are processed within temporal
windows (e.g. for aggregation and joins). Time window vs. length window. o Ordering – identify event sequences and patterns
(e.g. for a credit card new location followed by small and a large purchase might suggest a fraud)
o Joins – join two streams
CEP Queries
![Page 6: Introducing the WSO2 Complex Event Processor](https://reader030.vdocuments.us/reader030/viewer/2022020207/54c6a93a4a795973318b45b6/html5/thumbnails/6.jpg)
from p=PINChangeEvents#window.time(3600) join t=TransactionEvents[amount>10000]#window.time(3600) on p.custid==t.custid return t.custid, t.amount;
Example Query
![Page 7: Introducing the WSO2 Complex Event Processor](https://reader030.vdocuments.us/reader030/viewer/2022020207/54c6a93a4a795973318b45b6/html5/thumbnails/7.jpg)
Opensource CEP Runtimes � Siddhi
o Apache License, a java library, Tuple based event model
o Supports distributed processing o Supports multiple query models
- Based on a SQL-like language - Filters, Windows, Joins, Ordering and others
� Esper, http://esper.codehaus.org o GPLv2 License, a Java library, Events can be XML, Map,
Object o Supports multiple query models
- Based on a SQL-like language - Filters, Windows, Joins, Ordering and others
� Drools Fusion o Apache License, a java library o Support for temporal reasoning + windows
![Page 8: Introducing the WSO2 Complex Event Processor](https://reader030.vdocuments.us/reader030/viewer/2022020207/54c6a93a4a795973318b45b6/html5/thumbnails/8.jpg)
WSO2 CEP Server � Enterprise grade server for CEP runtimes � Provides support for several transports
(network access) and data formats o SOAP/WS-Eventing – XML messages o REST/JSON – JSON messages o JMS – map messages, XML messages o Thrift – WSO2 data bridge format
- High Performant Event Capturing & Delivery Framework supports Java/C/C++/C# via Thrift language bindings.
� Support multiple CEP runtimes o Siddhi – WSO2, new, very fast, distributed o Esper - well known CEP runtime o Drools Fusion – rule based, but much slower
� Easy plugin new brokers, new CEP engines
![Page 9: Introducing the WSO2 Complex Event Processor](https://reader030.vdocuments.us/reader030/viewer/2022020207/54c6a93a4a795973318b45b6/html5/thumbnails/9.jpg)
WSO2 CEP Server(Contd.)
File System
![Page 10: Introducing the WSO2 Complex Event Processor](https://reader030.vdocuments.us/reader030/viewer/2022020207/54c6a93a4a795973318b45b6/html5/thumbnails/10.jpg)
CEP Buckets
� CEP Bucket is a logical execution unit
� Each CEP bucket has set of queries,
event sources and input, output event mappings.
� It is one-one with a CEP engine
![Page 11: Introducing the WSO2 Complex Event Processor](https://reader030.vdocuments.us/reader030/viewer/2022020207/54c6a93a4a795973318b45b6/html5/thumbnails/11.jpg)
Management UI � To define
buckets � Update running
queries without resetting current execution states
� Manage brokers (Data adopters)
![Page 12: Introducing the WSO2 Complex Event Processor](https://reader030.vdocuments.us/reader030/viewer/2022020207/54c6a93a4a795973318b45b6/html5/thumbnails/12.jpg)
Developer Studio UI
� Eclipse based tool to define buckets
� Can manage the configurations through the production lifecycle
![Page 13: Introducing the WSO2 Complex Event Processor](https://reader030.vdocuments.us/reader030/viewer/2022020207/54c6a93a4a795973318b45b6/html5/thumbnails/13.jpg)
Siddhi Complex Event Processing
Engine
![Page 14: Introducing the WSO2 Complex Event Processor](https://reader030.vdocuments.us/reader030/viewer/2022020207/54c6a93a4a795973318b45b6/html5/thumbnails/14.jpg)
Big Picture � Users provide query/queries � Map event streams to queries � Siddhi keep the queries running and invoke
callbacks registered against one or more queries/streams
� Example Query from cseEventStream[ symbol == ‘IBM’]#win.time(50000) insert into IBMStockQuote symbol, avg(price) as avgPrice
![Page 15: Introducing the WSO2 Complex Event Processor](https://reader030.vdocuments.us/reader030/viewer/2022020207/54c6a93a4a795973318b45b6/html5/thumbnails/15.jpg)
Siddhi High Level Architecture
![Page 16: Introducing the WSO2 Complex Event Processor](https://reader030.vdocuments.us/reader030/viewer/2022020207/54c6a93a4a795973318b45b6/html5/thumbnails/16.jpg)
Siddhi Queries: Filters
� Filters the events by conditions � Conditions
o >, <, = , <=, <=, != o contains o and, or, not
� Example
from <stream-name> [<conditions>]* insert into <stream-name>
from cseEventStream[price >= 20 and symbol==’IBM’] insert into StockQuote symbol, volume
![Page 17: Introducing the WSO2 Complex Event Processor](https://reader030.vdocuments.us/reader030/viewer/2022020207/54c6a93a4a795973318b45b6/html5/thumbnails/17.jpg)
Window
� Types of Windows o (Time | Length) (Sliding| Batch) windows o Unique window, First unique (not supported in 1.0)
� Type of aggregate functions o sum, avg, max, min
� Example
from <stream-name> [<conditions>]#window.<window-name>(<parameters>) Insert [<output-type>] into <stream-name
from cseEventStream[price >= 20]#window.lengthBatch(50) insert expired-events into StockQuote symbol, avg(price) as avgPrice group by symbol having avgPrice>50
![Page 18: Introducing the WSO2 Complex Event Processor](https://reader030.vdocuments.us/reader030/viewer/2022020207/54c6a93a4a795973318b45b6/html5/thumbnails/18.jpg)
Join
� Join two streams based on a condition and window � Join can be in multiple forms ((left|right|full outer) |
inner) join - only inner is supported in 1.0 � Unidirectional – event arriving only to the
unidirectional stream triggers the join � Example
from <stream>#<window> [unidirectional] join <stream>#<window> on <condition> within <time> insert into <stream>
from TickEvent[symbol==’IBM’]#win.length(2000) join NewsEvent#win.time(500) insert into JoinStream *
![Page 19: Introducing the WSO2 Complex Event Processor](https://reader030.vdocuments.us/reader030/viewer/2022020207/54c6a93a4a795973318b45b6/html5/thumbnails/19.jpg)
Pattern
� Check condition A happen before/after condition B � Can do iterative checks via “every” keyword. � Here with “within <time>”, SIddhi emits only events
that are within that time of each other � Example
from [every] <condition> Æ [every] <condition> … <condition> within <time> insert into StockQuote (<attribute-name>* | * )
from every (a1 = purchase[price < 10] ) Æa2 = purchase [price >10000 and a1.cardNo==a2.cardNo]
within 300000 insert into potentialFraud a2. cardNo as cardNo, a2. price as price, a2.place as place
![Page 20: Introducing the WSO2 Complex Event Processor](https://reader030.vdocuments.us/reader030/viewer/2022020207/54c6a93a4a795973318b45b6/html5/thumbnails/20.jpg)
Sequence
� Regular Expressions supported o * - Zero or more matches (reluctant). o + - One or more matches (reluctant). o ? - Zero or one match (reluctant). o or – either event
� Here we have to refer events returned by * , + using square brackets to access a specific occurrence of that event
from <event-regular-expression> within <time> insert into <stream>
From a1 = requestOrder[action == "buy"], b1 = cseEventStream[price > a1.price and symbol==a1.symbol]+, b2 = cseEventStream[price <b1.price] insert into purchaseOrder a1. symbol as symbol, b1[0].price as firstPrice, b2.price as orderPrice
![Page 21: Introducing the WSO2 Complex Event Processor](https://reader030.vdocuments.us/reader030/viewer/2022020207/54c6a93a4a795973318b45b6/html5/thumbnails/21.jpg)
� We compared Siddhi with Esper, the widely used opensource CEP engine
� For evaluation, we did setup different queries using both systems, push events in to the system, and measure the time till all of them are processed.
� We used Intel(R) Xeon(R) X3440 @2.53GHz , 4 cores 8M cache 8GB RAM running Debian 2.6.32-5-amd64 Kernel
Performance Results
![Page 22: Introducing the WSO2 Complex Event Processor](https://reader030.vdocuments.us/reader030/viewer/2022020207/54c6a93a4a795973318b45b6/html5/thumbnails/22.jpg)
Simple filter without window
Performance Comparison With ESPER
from StockTick[prize >6] return symbol, prize
![Page 23: Introducing the WSO2 Complex Event Processor](https://reader030.vdocuments.us/reader030/viewer/2022020207/54c6a93a4a795973318b45b6/html5/thumbnails/23.jpg)
State machine query for pattern matching
Performance Comparison With ESPER
From f=FraudWarningEvent -> p=PINChangeEvent(accountNumber=f.accountNumber) return accountNumber;
![Page 24: Introducing the WSO2 Complex Event Processor](https://reader030.vdocuments.us/reader030/viewer/2022020207/54c6a93a4a795973318b45b6/html5/thumbnails/24.jpg)
Siddhi Features � Supports State Persistence
o Enabling Queries to span lifetimes much greater than server uptime.
o By taking periodic snapshots and storing all state information and windows to a scalable persistence store (Apache Cassandra).
o Pluggable persistent stores. � Support Highly Available Deployment
o Using Hazelcast distributed cache as a shared working memory.
![Page 25: Introducing the WSO2 Complex Event Processor](https://reader030.vdocuments.us/reader030/viewer/2022020207/54c6a93a4a795973318b45b6/html5/thumbnails/25.jpg)
HA/ Persistence � This is ability to recover
runtime state in the case of a failure
� CEP server can support if CEP engine supports persistence (OK with Siddhi, Esper)
![Page 26: Introducing the WSO2 Complex Event Processor](https://reader030.vdocuments.us/reader030/viewer/2022020207/54c6a93a4a795973318b45b6/html5/thumbnails/26.jpg)
Scaling � CEP pipeline can be distributed,But queries like
windows, patterns, and Join are hard to distribute � WSO2 CEP with Siddhi uses distributed cache
(Hazelcast) as shared memory and selective processing approach to achieve massive scalability in distributed processing
![Page 27: Introducing the WSO2 Complex Event Processor](https://reader030.vdocuments.us/reader030/viewer/2022020207/54c6a93a4a795973318b45b6/html5/thumbnails/27.jpg)
Event Recording � Ability to record all/some of the events for
future processing � Few options
o Publish them to Cassandra cluster using WSO2 data bridge API or BAM (can process data in Cassandra with Hadoop using WSO2 BAM).
o Write them to distributed cache o Custom thrift based event recorder
![Page 28: Introducing the WSO2 Complex Event Processor](https://reader030.vdocuments.us/reader030/viewer/2022020207/54c6a93a4a795973318b45b6/html5/thumbnails/28.jpg)
WSO2 BAM
![Page 29: Introducing the WSO2 Complex Event Processor](https://reader030.vdocuments.us/reader030/viewer/2022020207/54c6a93a4a795973318b45b6/html5/thumbnails/29.jpg)
CEP Role within WSO2 Platform
![Page 30: Introducing the WSO2 Complex Event Processor](https://reader030.vdocuments.us/reader030/viewer/2022020207/54c6a93a4a795973318b45b6/html5/thumbnails/30.jpg)
DEMO
![Page 31: Introducing the WSO2 Complex Event Processor](https://reader030.vdocuments.us/reader030/viewer/2022020207/54c6a93a4a795973318b45b6/html5/thumbnails/31.jpg)
Scenario � Monitoring stock exchange for game changing
moments � Two input event streams.
o Event stream of Stock Quotes from a stock exchange
o Event stream of word count on various company names from twitter pages
� Check whether the last traded price of the stock has changed significantly(by 2%) within last minute, and people are twitting about that company (> 10) within last minute
![Page 32: Introducing the WSO2 Complex Event Processor](https://reader030.vdocuments.us/reader030/viewer/2022020207/54c6a93a4a795973318b45b6/html5/thumbnails/32.jpg)
![Page 33: Introducing the WSO2 Complex Event Processor](https://reader030.vdocuments.us/reader030/viewer/2022020207/54c6a93a4a795973318b45b6/html5/thumbnails/33.jpg)
Example Scenario
JMS Event Publisher
JMS Event Receiver
![Page 34: Introducing the WSO2 Complex Event Processor](https://reader030.vdocuments.us/reader030/viewer/2022020207/54c6a93a4a795973318b45b6/html5/thumbnails/34.jpg)
Input events � Input events are JMS Maps
o Stock Exchange Stream
Map<String, Object> map1 = new HashMap<String, Object>(); map1.put("symbol", "MSFT"); map1.put("price", 26.36); publisher.publish("AllStockQuotes", map1);
o Twitter Stream
Map<String, Object> map1 = new HashMap<String, Object>(); map1.put("company", "MSFT"); map1.put("wordCount", 8); publisher.publish("TwitterFeed", map1);
![Page 35: Introducing the WSO2 Complex Event Processor](https://reader030.vdocuments.us/reader030/viewer/2022020207/54c6a93a4a795973318b45b6/html5/thumbnails/35.jpg)
Queries
![Page 36: Introducing the WSO2 Complex Event Processor](https://reader030.vdocuments.us/reader030/viewer/2022020207/54c6a93a4a795973318b45b6/html5/thumbnails/36.jpg)
Queries from allStockQuotes[win.time(60000)] insert into fastMovingStockQuotes symbol,price, avg(price) as averagePrice group by symbol having ((price > averagePrice*1.02) or (averagePrice*0.98 > price )) from twitterFeed[win.time(60000)] insert into highFrequentTweets company as company, sum(wordCount) as words group by company having (words > 10) from fastMovingStockQuotes[win.time(60000)] as fastMovingStockQuotes join highFrequentTweets[win.time(60000)] as highFrequentTweets on fastMovingStockQuotes.symbol==highFrequentTweets.company insert into predictedStockQuotes fastMovingStockQuotes.symbol as company, fastMovingStockQuotes.averagePrice as amount, highFrequentTweets.words as words
![Page 37: Introducing the WSO2 Complex Event Processor](https://reader030.vdocuments.us/reader030/viewer/2022020207/54c6a93a4a795973318b45b6/html5/thumbnails/37.jpg)
Alert � As a XML
<quotedata:StockQuoteDataEvent xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema" xmlns:quotedata="http://ws.cdyne.com/"> <quotedata:StockSymbol>{company}</quotedata:StockSymbol> <quotedata:LastTradeAmount>{amount}</quotedata:LastTradeAmount> <quotedata:WordCount>{words}</quotedata:WordCount> </quotedata:StockQuoteDataEvent>
![Page 38: Introducing the WSO2 Complex Event Processor](https://reader030.vdocuments.us/reader030/viewer/2022020207/54c6a93a4a795973318b45b6/html5/thumbnails/38.jpg)
Useful links � WSO2 CEP 2.0.0 Milestone 2 https://svn.wso2.org/repos/wso2/people/suho/packs/cep/wso2cep-2.0.0-
M2.zip
� Distributed Processing Sample With Siddhi CEP and ActiveMQ JMS Broker.
http://suhothayan.blogspot.com/2012/08/distributed-processing-sample-for-wso2.html
![Page 39: Introducing the WSO2 Complex Event Processor](https://reader030.vdocuments.us/reader030/viewer/2022020207/54c6a93a4a795973318b45b6/html5/thumbnails/39.jpg)
Questions?
![Page 40: Introducing the WSO2 Complex Event Processor](https://reader030.vdocuments.us/reader030/viewer/2022020207/54c6a93a4a795973318b45b6/html5/thumbnails/40.jpg)
Thank you.