Messaging: Basic Exchange, Processing
and Transformation Models and Tools
Hong-Linh Truong
Faculty of Informatics, TU Wien
[email protected]://www.infosys.tuwien.ac.at/staff/truong
Twitter: @linhsolar
DST Summer 2018, Lecture 2
DST 2018 1
Outline
Overview of streaming message-oriented data
programming
Communication - Message-Oriented Middleware
Java Messaging Service (JMS), Advanced Message
Queuing Protocol (AMQP), Message Queuing
Telemetry Transport (MQTT)
Integration - Enterprise Integration patterns
Message routing patterns
Message transformation patterns
Processing - streaming data processing with
Complex Event Processing
DST 2018 2
Topic complexity
DST 2018 3
Getting started with each topic of “complex *” in 10 minutes.
Thousand of pages of documents, APIs, tutorials and code
Further advanced topics will be covered in Lecture 5
Why messaging is so important for
DST?
DST 2018 4
STREAMING MESSAGE-
ORIENTED PROGRAMMING
Overview
DST 2018 5
Data stream programming
DST 2018 6
Examples of data streams
Continuous media (e.g., video for video analytics)
Discrete media (e.g., stock market events, twitter
events, system monitoring events, notifications)
Data stream: a sequence/flow of data units
Data units are defined by applications: a data unit can
be data described by a primitive data type or by a
complex data type, a serializable object, etc.
Streaming data: produced by (near)realtime data
sources as well as (big) static data sources
Some key issues
DST 2018 7
Communication
Many techniques are needed: sending/receiving, routing, storage, etc.
Data processing
Within the brokering infrastructures and platforms
Within the producer and the consumer
Interoperability issues: message format, etc.
Performance issues: rates/throughput, intervals, delay/latency,
processing time etc.
Arrival orders
Data producer
Streaming
data
Data consumer
End-to-end delay
Message
brokering
Infrastructures &
platformsmsg3 msg2 msg1 msg3 msg1 msg2
Message-oriented Middleware
(MOM)
DST 2018 8
Discrete media data units
Data units are structured messages (maybe ordered by
timestamps)
Well-supported in large-scale systems for
Persistent but asynchronous messages
Scalable message handling
Message communication and transformation
publish/subscribe, routing, extraction, enrichment
Several implementations
Apache Qpid™Amazon SQS
JMS
Apache Kafka
Message-oriented Persistent
Communication
DST 2018 9
Exchange models
Operations
PUT/SEND/PUBLISH
GET/RECEIVE
POLL/SUBSCRIBE
NOTIFY/SEND
Fig source: Andrew S. Tanenbaum and Maarten van Steen, Distributed Systems – Principles and
Paradigms, 2nd Edition, 2007, Prentice-Hall
The receiver pulls the
data from the broker or
the broker pushes the
data to the receiver?
Broker
MOM – some message processing
operations
DST 2018 10
m2IF…
m1m3
m1
m2m3
Publish/subscribe/notify; send/forward; routing operations within a broker
queue
queuesrouter/exchange
Message processing within data
consumer
DST 2018 11
Incoming streams
Streaming
data type m
Output complex messages
m3 m2 m1
… … …
s3 s2 s1
m1
…
s1
m2
…
s2
m3
…
s3Streaming
data type s
Complex/multiple streams
data processing
Application-specific data processing
Streaming data processing with a
network of data processing
elements
DST 2018 12
… … …
Streaming
data
processing
… … …
… … …
… … …
… … …
Streaming
data
processing
Streaming
data
processing
Message handling for service
integration
Messages handling concepts and patterns have
been around for many years
Cross services/organizations integration
Enterprise integration pattern is well studied but
mostly focused on business messages
http://www.enterpriseintegrationpatterns.com/
Today distributed applications
not just enterprise integration patterns
also various types of measurements and log
information integration
DST 2018 13
Filter, exchange, etc.
DST 2018 14
We need several features implemented by MOM,
consumers, or external systems
Client Clientm1m2m3
Queue/Topic
m4
Exchange,
Router, Filter,
Aggregator, etc.
http://www.eaipatterns.com/
Syntax and semantic problems
DST 2018 15
Secretary
of the statePresidentKafka
Source: http://www.smart-
words.org/humor-
jokes/language-humor/who-
is-hu-china.html
The same
communication
protocol does not
mean that both
sides understand
the message well!
Message serialization and
deserialization
Remember that the sender and the receiver are diverse
In many cases, they are not in the same organization
You need to guarantee the message syntax and
semantics
Solutions
Agreed in advance in the implementation or with a
standard
Know and use tools to deal with syntax differences
But semantics are domain/application-specific
DST 2018 16
Arvo
DST 2018 17
https://avro.apache.org/
Support message
description
Serialize and
deserialize libraries
Work with different
languagesPython Java
Sender
Kafka
Sender
Syntax specification
Why is it important?
Some other techniques
Protobuf From Google, used by default in gRPC (gRPC.io)
https://github.com/google/protobuf
Language-neutral, platform-neutral mechanism for serializing/deserializing structured data
Thrift https://thrift.apache.org
RPC style
Support also serializing and deserializing data)
Support cross-language services development Specify services interfaces
Data exchange
Code generation
Flatbuffers https://github.com/google/flatbuffers
DST 2018 18
JAVA MESSAGING SERVICE
Communication
DST 2018 19
General concepts
DST 2018 20
Standard APIs for Java platform
Client 1msg1msg3
Client 2
Destination
Messaging System Provider
(Message
Producer)
(Message
Consumer)
msg2
Message Structure
DST 2018 21
Types of messages (or what is a message for?)
Application-specific semantics
E.g., notify an event, send a document, or ask for
the execution of a command
Header Properties Body (payload)
Header: pre-defined system information (e.g., storage,
routing and identification operations)
Properties: application
defined properties
Body: application-defined
Java primitive types, Map (a set of tuples), Text, Serializable
Object
Delivery Patterns
DST 2018 22
Point-to-point
Publish/Subscription (Pub/Sub)
Fig source: http://docs.oracle.com/javaee/7/tutorial/doc/jms-concepts002.htm
Fig source: http://docs.oracle.com/javaee/7/tutorial/doc/jms-concepts002.htm
Request-reply versus Request-only
messages
Request only
A sender does not expect a reply for a given request
Request-reply
A sender expects, e.g., a system ack or an application-specific
reply
Some design principles
Need to uniquely identify a request message?
Use a unique identifier
Need a reply message from a request message
Where is the return address?
Correlation between the request and reply messages (using unique
id), e.g., MessageType=REQUEST|REPLY & MessageID = ID
DST 2018 23
JMS programming versus
administrative activities
DST 2018 24
Best as
Administered objects
Best as administered
objects
Best with
programming
activities
Fig source: http://docs.oracle.com/javaee/7/tutorial/doc/jms-concepts003.htm
Simple example from the Java
tutorial
DST 2018 25
@Resource(lookup = "java:comp/DefaultJMSConnectionFactory")
private static ConnectionFactory connectionFactory;
@Resource(lookup = "jms/Queue")
private static Queue dest;
….
try (JMSContext context = connectionFactory.createContext();) {
int count = 0;
for (int i = 0; i < NUM_MSGS; i++) {
message = "This is message " + (i + 1) + " from producer";
TextMessage msg = context.createTextMessage();
msg.setText(message);
msg.setIntProperty("ID",count);
if (((i+1) %2 )==0) {
msg.setStringProperty("msgType","EVEN");
} else msg.setStringProperty("msgType","ODD");
context.createProducer()
.setDeliveryMode(DeliveryMode.NON_PERSISTENT)
.send(dest, msg);
count += 1;
}
System.out.println("Messages sent: " + count);
Some other JMS API features
Control message acknowledgement
By JMS provider or by the client
Message parameters
Persistent, priority, delay, and expiration
Programming temporal destinations
Nondurable versus durable subscription for
subscribers
Asynchronous sending
DST 2018 26
Generic question: how does the broker manage durable subscription?
Example of temporary queues for
performance improvement
DST 2018 27
Use cases and Figs source: http://www.onjava.com/2007/04/10/designing-messaging-applications-with-temporary-queues.html
Common static queues for multiple clients Separate static queues for multiple
clients
Temporary queues
Outside the java world?
DST 2018 28
Source: http://docs.spring.io/spring-python/1.2.x/sphinx/html/jms.html
Recall
DST 2018 29
Figure source: http://queue.acm.org/detail.cfm?id=1971597
Would you use a
JMS topic or
queue?
ADVANCED MESSAGE
QUEUING PROTOCOL
Communication
DST 2018 30
Overview
DST 2018 31
MOM, but not language- or platform- specific
For Java, C#, Python, ….
Solving message interoperability in heterogeneous
environments of MOMs
Binary wire-level protocol for message
exchange, rather than APIs
It does not include broker behaviors/capabilities but
they were in the standard before version 1.0
http://www.amqp.orgApache Qpid™
Core concepts –
Message/Transport
DST 2018 32
Message representation
Defined based on type
systems for interoperability
Transport
A network of nodes
connected via links
Node: message storage,
delivery, relay, etc.
Container: includes nodes
Figs source: http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-complete-v1.0-os.pdf
Core concept -- Transport
DST 2018 33
Figs source: http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-complete-v1.0-os.pdf
Connection
Session
Session and Connection endpoints
Links
Example
DST 2018 34
cloudamqp.com
Test sender
Get a free instance of RabbitMQ from cloudamqp.com
Get code from: https://github.com/cloudamqp/java-amqp-example
First run the test sender, then run the receiver
Test receiver
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
for (int i=0; i<100; i++) {
String message = "Hello distributed systems guys: "+i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
new Thread().sleep(5000);
}
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(" [x] Received '" + message + "'");
}
Note: i modified the code a bit
DST 2018 35
Real code versus simulation
http://tryrabbitmq.com/
Performance
“RabbitMQ Hits One Million Messages Per
Second on Google Compute Engine”
https://blog.pivotal.io/pivotal/products/rabbitmq-hits-
one-million-messages-per-second-on-google-
compute-engine
https://cloudplatform.googleblog.com/2014/06/rabbit
mq-on-google-compute-engine.html
Using 32 nodes
RabbitMQ is widely used in big industries!
DST 2018 36
MESSAGE QUEUING
TELEMETRY TRANSPORT
(MQTT)
http://mqtt.org
DST 2018 37
MQTT Overview
OASIS Standard
ISO/IEC 20922:2016 (Message Queuing Telemetry Transport (MQTT) v3.1.1)
M2M Connectivity Protocol atop TCP/IP
MQTT brokers enable publish/subscribe messaging systems
Publisher can publish a messge within a topic that can be subscribed by many Subscribers
Simple protocols
Suitable for constrained devices.
DST 2018 38
Protocol Features
Lightweight protocol Small message size
QoS At most once, at least once and exactly once
Few commands/interactions: CONNECT, PUBLISH, SUBSCRIBE, UNSUBRIBE, DISCONNECT Easy to implement
Small foot-print libary
Low bandwidth, high latency, data limits, and fragile connections
Suitable for IoT (constrained devices/networks)
DST 2018 39
How QoS would impact the design of the subscriber?
Model and Implementation
DST 2018 40
Different programming languages for OS/devices
Including Anrduino, Nanode
Mosquitto (http://projects.eclipse.org/projects/technology.mosquitto)
Paho: http://www.eclipse.org/paho/
RabbitMQ
Apache ActiveMQ
Cloud providers:
http://cloudmqtt.com (get a free account to learn MQTT)
PublisherBroker
ServerSubcriber
MESSAGE ROUTING
PATTERNS
Integration
DST 2018 41
Integration Issues
DST 2018 42
We need several features implemented by
MOM, consumer, or external systems
Client Clientm1m2m3
Queue/Topic
m4
Exchange, Router,
Filter, Aggregator, etc.
http://www.eaipatterns.com/
Example of supporting technology
DST 2018 43
Best practices for solving common
problems: Integration Patterns
Also check: http://projects.spring.io/spring-integration/
Content-Based Message Routing:
Camel/EIP
DST 2018 44SS 2012
Source: https://camel.apache.org/content-based-router.html
Source: https://camel.apache.org/dynamic-router.html
Content-Based Router:
can be used to decide
the right destination
queue for a given
message based on the
message content
Dynamic Router: can
self-configure based on
processing messages
Content-Based Message Routing:
AMQP
DST 2018 45
Figs source: https://access.redhat.com/site/documentation/en-
US/Red_Hat_Enterprise_MRG/1.1/html/Messaging_User_Guid
e/chap-Messaging_User_Guide-Exchanges.html
Note: defined in AMQP 0-10
But not in AMQP 1.0
DST 2018 46
Some code example with RabbitMQ
Message Filter/Selector
DST 2018 47SS 2012
TextMessage msg = context.createTextMessage();
msg.setText(message);
msg.setIntProperty("ID",count);
if ((count % 2 )==0) {
msg.setStringProperty("msgType",“EVEN");
}
else
msg.setStringProperty("msgType",“ODD");
JMSConsumer consumer = context.createConsumer(dest,"msgType
=‘EVEN’”);
JMS: selector based on message header and properties
CAMEL/EIP: Message Filter
https://camel.apache.org/message-filter.html
Message Selector or
Message Filter: filter
unneeded messages
TRANSFORMATION
PATTERNS AND TOOLS
Integration
DST 2018 48
Splitter and Aggregator
DST 2018 49SS 2012
Splitter: decompose a
composite message into
different messages
Aggregator: gather all
correlated messages for
a specific purpose then
build a new composite
message
Questions: for which scenarios/use cases we can use the above-
mentioned patterns
https://camel.apache.org/splitter.html
https://camel.apache.org/aggregator2.html
How would you use splitter and
aggregator with a set of
microservices for a request
DST 2018 50
Request
Service 1
Service 2
Service n
?
Envelope Wrapper and Normalizer
DST 2018 51SS 2012
Envelope wrapper: wrap a
message before sending it
into a messaging system and
unwrap it after the wrapped
message leaves the
messaging system
Normalizer: route all
messages of a given type
to a suitable Message
Translator which transforms
the message to the
common format.
https://camel.apache.org/normalizer.html
http://www.eaipatterns.com/EnvelopeWrapper.html
Content Enricher & Extracter
DST 2018 52SS 2012
Content Enricher: obtain
required/missing data then
enrich the message with the
newly obtained data
Content Filter: remove
unimportant data items from a
message or extract only
needed information.
https://camel.apache.org/content-filter.html
https://camel.apache.org/content-enricher.html
Question: is it possible to send the to-be-enriched message to an external
service to enrich it or to send the message to an external extraction service?
Logstash
DST 2018 53
Codecs: stream filters within inputs or outputs that change data representation
E.g.: multilines a single event
53
Source: https://www.elastic.co/guide/en/logstash/current/advanced-pipeline.html
Plug-ins
DST 2018 5454
https://www.elastic.co/guide/en/logstash/current/working-with-plugins.html
Logstash Grok
Grok is for parsing unstructured log data
text patterns into something that matches your
logs.
Grok syntax: %{SYNTAX:SEMANTIC}
Regular and custom patterns
A lot of exiting patterns:
https://github.com/logstash-plugins/logstash-patterns-
core/tree/master/patterns
Debug Tools: http://grokdebug.herokuapp.com/
DST 2018 5555
Example with NETACT Log
DST 2018 5656
29869;10/01/2017 00:57:56;;Major;PLMN-PLMN/BSC-401441/BCF-137/BTS-
403;XYZ01N;ABC08;DEF081;BTS OPERATION DEGRADED;00 00 00 83 11
11;Processing
Simple Grok
Apache Nifi
DST 2018 57
From NSA
http://nifi.apache.org/
Main concepts:
Processor: components to handle data, such as
download, store, transform, etc.
FlowFile: describes how different components are
composed to create pipelines for data ingestion
Provenance (for data governance): see all usage
records in detail
57
Apache Nifi
DST 2018 5858
https://nifi.apache.org/docs.html
Example
DST 2018 59
COMPLEX EVENT
PROCESSING
Processing
DST 2018 60
Dataflow programming and
streaming processing
Data exchange between tasks
Links in task graphs reflect data flows
Streaming processing
Centralized or distributed (in terms of computing
resources)
Various applications
CEP is just one type of applications of streaming
processing
Note: we will go further some advanced
streaming processing in Lecture 5
DST 2018 61
Centralized versus distributed
processing topology
DST 2018 62
Two views: streams of events or cloud of events
Centralized processing Distributed processing
Proces
sing
Usually only
queries/patterns are written Code processing events and
topologies need to be
written
Event cloud
Event source
Proces
sing
Proce
ssing Proces
sing
node
node
node
node
node node
Goals of complex event processing
Group and process events in a specific time
(time) and space (size) constraints
Detect special situations
Finding correlation among events
Aggregation results
Special case of streaming processing
DST 2018 63
TIBCO Systems
DST 2018 64
Source: http://www.tibco.com/blog/2015/10/05/how-to-extend-big-data-
architectures-with-rules-and-visualization/
WSO2 Carbon
CEP/Siddhi
Source:
https://docs.wso2.com/display/CEP420
DST 2018 65
Apache Flink
DST 2018 66
Source: https://ci.apache.org/projects/flink/flink-docs-release-1.4/internals/components.html
Common concept in these systems
The way to connect data streams and obtain events Focusing very much on connector concepts and well-defined
event structures (e.g., can be described in XML, JSON, POJO)
Assume that existing systems push the data
The way to specify “analytics” Statements and how they are glued together to process flows of
events
High-level, easy to use
The engine to process analytics requests Centralized in the view of the user so the user does not have
to program complex distributed applications
Underlying it might be complex (for scalability purposes)
The way to push results to external components
DST 2018 6767
Basic concepts
DST 2018 68
m1m2m4 .. .... A stream of events
Arrival order
Window
If we
• specify a set of conditions for the window and events within the
window
then we can
• get a set of events filtered from the window that match these
conditions
Conditions: can be specified using an SQL-alike language or pre-
defined functions
Sliding/Tumble window size:
time or size of events
Event Representation, Streams and
Views
Event sources: via MOM, files, different IO
adapters/connectors, etc.
Event representation & views POJO (Plain Old Java Object), Map, Object-array, XML
SQL-alike tables
Event Stream
Events ordered based on their arrival times
Event Sink A component receiving events via its listener that declares
some statements on interesting events
DST 2018 69
Windows and Times
DST 2018 70
Source: https://ci.apache.org/projects/flink/flink-docs-release-1.4/concepts/programming-model.html
Window size and slide
DST 2018 71
Source: https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/windows.html
Batch/Tumbling Windows
DST 2018 72
Source: https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/windows.html
Flink CEP Patterns
DST 2018 73Source: https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/libs/cep.html
Flink CEP Patterns
DST 2018 74
Source: https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/libs/cep.html
Example with Base Transceiver
Station
DST 2018 75
Data
station_id,datapoint_id,alarm_id,event_time,value,valueThreshold,
isActive,storedtime
1161115016,121,308,2017-02-18 18:28:05 UTC,240,240,false,
1161114050,143,312,2017-02-18 18:56:20 UTC,28.5,28,true,
1161115040,141,312,2017-02-18 18:22:03 UTC,56.5,56,true,
1161114008,121,308,2017-02-18 18:34:09 UTC,240,240,false,
1161115040,141,312,2017-02-18 18:20:49 UTC,56,56,false,
Simple example
DST 2018 76
AMQP Connector
Patterns
Output
Monitoring
DST 2018 77
Results
DST 2018 78
SQL-alike CEP
We can register/view stream as a table (like
SQL)
Then apply SQL-alike statements with windows
for detecting events and patterns
Tools: Esper, WSO2, and certain streaming
databases
DST 2018 79
Example of WSO2 Siddhi
DST 2018 8080
Source: https://docs.wso2.com/display/CEP420/SiddhiQL+Guide+3.1
SQL-alike conditions
@Import('mobifonetrainingopensignal:1.0.0')
define stream inStream (meta_USERPHONE int, meta_TIME long, correlation_lat float,
correlation_lon float, GSM_BIT_ERROR_RATE float, GSM_SIGNAL_STRENGTH float,
LOC_ACCURACY float, LOC_SPEED float);
@Export('OutputSignal:1.0.0')
define stream OutputSignal (avgSignalStrength double, avgBitRateError double);
from inStream#window.lengthBatch(5)
select avg(GSM_SIGNAL_STRENGTH) as avgSignalStrength, avg(GSM_BIT_ERROR_RATE) as
avgBitRateError
insert into OutputSignal;
DST 2018 81
Put things together
DST 2018 82
A data pipeline of stream receivers event processor event publishers
82
Service
Example with WSO2 Carbon CEP
DST 2018 8383
Source Sink
Language + UI specifications
Can also execute the topology as Storm task (see Storm in the next lecture)
Get a high-level view
DST 2018 84
Check:
http://de.slideshare.net/alessandro_margara/processing-flows-of-information-debs-2011
BEYOND BASIC MESSAGE
PROCESSING
Partially covered in Lecture 5
DST 2018 85
Datalake with messaging
Figure source: Data Lake for Enterprises by Pankaj Misra; Tomcy John Published by
Packt Publishing, 2017
DST 2018 86
Cloud services and big data analytics
DST 2018 87
Data sources
(sensors, files, database,
queues, log services)
Messaging systems
(e.g., Kafka, AMQP,
MQTT)
Storage and Database
(S3, InfluxDB, HDFS, Cassandra,
MongoDB, Elastic Search etc.)Batch data processing
systems
(e.g., Hadoop, Airflow, Spark)
Stream processing
systems
(e.g. Apex, Storm, Flink,
WSO2, Google Dataflow)
Elastic Cloud Infrastructures
(VMs, dockers, OpenStack elastic resource management tools, storage)
Warehouse
Analytics
Operation/Management/
Business Services
Data Processing Framework
Batch processing
Mapreduce/Hadoop
Scientific workflows
(Near) realtime streaming processing
Flink, Apex, Kafka SQL, Storm
Hybrid data processing
Summingbird, Apache Kylin
Impala, Storm-YARN
Apache Spark
DST 2018 88
Take a short read: http://www.infoq.com/articles/stream-processing-hadoop
Analytics (Application Level)
Conceptual View
DST 2018 89
Data Processing Frameworks
Streaming/Online
Data Processing
Batch Data
Processing
Hybrid Data
Processing
Static data(Near)
realtime data
Decision Data Analysis
Analytics,
Tools,
Processes &
Models
Recap
So how can you use messaging techniques for
complex distributed applications/systems?
Reactive patterns
Asynchronous communications
Large-scale integration
Big data
?
DST 2018 90
Further materials
https://access.redhat.com/site/documentation/en-
US/Red_Hat_Enterprise_MRG/1.1/html/Messaging_User_Guide/sect-Messaging_User_Guide-
Introduction_to_RHM-The_AMQP_0_10_Model.html
Java Message Service: http://www.oracle.com/technetwork/java/index-jsp-142945.html
Java Message Service specification, version 2.0, available from:
http://jcp.org/en/jsr/detail?id=343
http://kafka.apache.org
https://camel.apache.org/enterprise-integration-patterns.html
http://www.eaipatterns.com
http://docs.oracle.com/javaee/7/tutorial/doc/home.htm
http://docs.oracle.com/cd/E13157_01/wlevs/docs30/epl_guide/index.html
http://www.espertech.com/esper/documentation.php
Miyuru Dayarathna and Toyotaro Suzumura. 2013. A performance analysis of system s, s4, and
esper via two level benchmarking. In Proceedings of the 10th international conference on
Quantitative Evaluation of Systems (QEST'13), Kaustubh Joshi, Markus Siegle, Mariëlle
Stoelinga, and Pedro R. D'Argenio (Eds.). Springer-Verlag, Berlin, Heidelberg, 225-240.
DOI=10.1007/978-3-642-40196-1_19 http://dx.doi.org/10.1007/978-3-642-40196-1_19
https://code.facebook.com/posts/872547912839369/improving-facebook-s-performance-on-
android-with-flatbuffers/
DST 2018 91
92
Thanks for your attention
Hong-Linh Truong
Faculty of Informatics, TU Wien
http://www.infosys.tuwien.ac.at/staff/truong
DST 2018