zeromq with nodejs
DESCRIPTION
TRANSCRIPT
ØMQ @ NodeJSØMQ: Small Pieces for Scalable Software
Fernando D. Alonso
1. Facts about ØMQ [ ]
Facts about ØMQ
Low level messaging library
○ It is written in C++.○ A thin layer between application and transport
layers.○ Message-passing using in-memory queues.○ Concise API which talks over the ZMTP protocol.○ Works on top of TCP, IPC, in-memory, and
PGM/EPGM communication protocols.
○ Philosophy: horizontal scalability to reduce points of failure.
○ Network topology aware. Point to point communication reduce latencies.
○ Communication is reduced to a few patterns, which combined are powerful.
○ No need to have a multitenant service.
Facts about ØMQ
A broker is not mandatory
Facts about ØMQ
Messaged oriented communication
○ Messages as first-class citizen. No need to fight with framing or buffering.
○ You can send multipart messages.○ Messages are atomic.○ Asynchronous messaging. A single background I/O
thread does the job for ØMQ sockets."A ØMQ message is a discrete unit of data passed between applications or components of the same application. ØMQ messages have no internal structure and from the point of view of ØMQ itself they are considered to be opaque binary data."
Facts about ØMQ
Fast for development
○ Interchangeable transports: e.g, scaling from one IPC server to a bunch of TCP.
○ Automatic reconnections.○ Multicore made easy.○ Provides zmq_poll to read BSD sockets.
○ It doesn't provide message persistence.○ It doesn't provide data serialization.○ It doesn't provide data compression.○ It doesn't provide message encryption.○ It doesn't provide security mechanisms. Not in the
next version of the ZMTP protocol !
(*) ZTMP 3.0 draft protocol: http://hintjens.com/blog:39
Facts about ØMQ
Aims for simplicity
Facts about ØMQ
Platforms & Languages
○ Bindings for 30+ languages including NodeJS, Ruby, Python, Java, C, C++, Scala, Erlang, Perl, PHP, .NET.
○ Linux & Windows.○ Raspberry PI.○ Android OS.
2. Install [ , ]
Install
On Linux from source
sudo apt-get install build-essential libtool autoconf automake uuid-devwget http://download.zeromq.org/zeromq-3.2.3.tar.gztar xvzf zeromq-3.2.3.tar.gz./configuremakesudo make installsudo ldconfig
NOTES:(*) Check http://download.zeromq.org/ for more versions.(**) Avoid 3.0.x, 3.1.x, 3.2.0, 3.2.1 versions. They lack backwards
compatibility.(***) Check ./configure --help for more options on build.(****) ./configure --with-pgm
Install
On Linux using packages
Libzmq 3.2:
● Debian Sid: sudo apt-get install libzmq3-dev
Libzmq 2.2:
● Debian Wheezy, Jessie, Sid: sudo apt-get install libzmq-dev
● Debian Squeeze backport: http://packages.debian.org/squeeze-backports/libzmq1
Install
On Mac
brew install zmq
Install
NodeJS bindings
NPM Packages:
● zmq (https://github.com/JustinTulloss/zeromq.node) [RECOMMENDED]● zmq-stream (https://github.com/Schoonology/zmq-stream)
npm install zmq
node
> var zmq = require('zmq');
Install
Bindings in other languages
Ruby: https://github.com/chuckremes/ffi-rzmq [RECOMMENDED]
Python: https://github.com/zeromq/pyzmq
Erlang: https://github.com/zeromq/erlzmq2
Java JZMQ (uses JNI): https://github.com/zeromq/jzmq
Java JeroMQ: https://github.com/zeromq/jeromq
Scala ZeroMQ: https://github.com/mDialog/scala-zeromq
Scala ZeroMQ (official): https://github.com/valotrading/zeromq-scala-binding
...
3. Sockets
Sockets
Creation
(*) http://api.zeromq.org/3-2:zmq-socket
ØMQ C API
NodeJS ZMQreq ZMQ_REQ
rep ZMQ_REP
dealer ZMQ_DEALER
router ZMQ_ROUTER
push ZMQ_PUSH
pull ZMQ_PULL
ØMQ C APINodeJS ZMQ
pub ZMQ_PUB
sub ZMQ_SUB
xsub ZMQ_XSUB
xpub ZMQ_XPUB
var sock = zmq.socket("<SOCKET_TYPE>");
zmq_socket(context, <SOCKET_TYPE>);
Request
Reply
Dealer (async Request)
Router (async Reply)
Push
Pull
Publisher
Subscriber
XSubscriber
XPublisher
Sockets
Bind & Connect
sock.bind("tcp://10.0.0.1:5555", function(err) { .. });
sock.connect("tcp://10.0.0.1:5555");
sock.close();
(*) http://api.zeromq.org/3-2:zmq-bind(**) http://api.zeromq.org/3-2:zmq-connect(***) http://api.zeromq.org/3-2:zmq-close
ØMQ C API
zmq_bind(socket, "tcp://10.0.0.1:5555");
zmq_connect(socket, "tcp://10.0.0.1:5555");
zmq_close(socket);
NodeJS ZMQ
Bind
Connect
Close
Sockets
Bind & Connect
sock.bindSync("tcp://10.0.0.1:5555");// This line gets executed once the socket// is binded, but it breaks on error.
NodeJS ZMQ
BindSync
Sockets
Transports
TCP● usage: tcp://<address>:<port>
sock.bind("tcp://192.168.1.100:5555");
sock.bind("tcp://eth0:5555");// binds to the first eth0 interface
sock.bind("tcp://*:5555");// binds to all interfaces
(*) http://api.zeromq.org/3-2:zmq-tcp
ØMQ C API
zmq_bind(socket, "tcp://192.168.1.100:5555");
zmq_bind(socket, "tcp://eth0:5555");
zmq_bind(socket, "tcp://*:5555");
NodeJS ZMQ
Sockets
Transports
Inter-Process Communication (IPC)● usage: ipc://<address>
● Requires RW permissions on the specified path.
(*) http://api.zeromq.org/3-2:zmq-ipc
sock.bind("ipc:///tmp/mysocket");
ØMQ C API
zmq_bind(socket, "ipc:///tmp/mysocket");
NodeJS ZMQ
Sockets
Transports
In-process Communication (in-memory)● usage: inproc://<address>
● Requires to bind before connect.
● Max. address name length is 256.
(*) http://api.zeromq.org/3-2:zmq-inproc
(**) Buggy on node-zmq package.
sock.bind("inproc://queue");
ØMQ C API
zmq_bind(socket, "ipc://queue");
NodeJS ZMQ
Sockets
Transports
PGM (multicast)● usage: pgm://<address>;<multicast_address>:<port>
● Requires sudo privileges to access raw IP sockets.
● Needs ØMQ built with PGM extension (./configure --with-pgm).
(*) http://api.zeromq.org/3-2:zmq-pgm
sock.bind( "pgm://192.168.1.100;239.192.1.1:3055");
ØMQ C API
zmq_bind(socket, "pgm://192.168.1.100;239.192.1.1:3055");
NodeJS ZMQ
Sockets
Transports
Encapsulated PGM (multicast)● usage: epgm://<address>;<multicast_address>:<port>
● Needs ØMQ built with PGM extension (./configure --with-pgm).
(*) http://api.zeromq.org/3-2:zmq-pgm
sock.bind( "epgm://192.168.1.100;239.192.1.1:3055");
ØMQ C API
zmq_bind(socket, "epgm://192.168.1.100;239.192.1.1:3055");
NodeJS ZMQ
Sockets
Send & Receive
Send
sock.send("My message");
sock.on("message", function(msg) {
console.log(msg.toString());
});
(*) http://api.zeromq.org/3-2:zmq-send
(**) http://api.zeromq.org/3-2:zmq-recv
Receive
sock.on("message", function(msg) {
console.log("Received " + msg);
});
// <msg> is a Buffer object.
Sockets
Multipart messages
ØMQ Queue
zmq_send(socket, "Como", 4, ZMQ_SNDMORE);
Messages are atomic
A ØMQ message is composed of 1 or more message parts.
zmq_send(socket, "andas", 5, 0);
4 ComoPart 1
5 andasPart 2
ØMQ ensures that peers receive either all message parts of a message or none at all.
The total number of message parts is unlimited except by available memory.
Sockets
Multipart messages
sock.send("First part", zmq.ZMQ_SNDMORE);sock.send("Second part");
sock.send(["First part", "Second part"]);
Send
var r = sock.send("Como", zmq.
ZMQ_SNDMORE);
console.log(r);
Check the outgoing buffer and socket state:
running this will output ->
> { type: 'req', _zmq: { state: 0, onReady: [Function] }, _outgoing: [ [ <Buffer 43 6f 6d 6f>, 2 ] ], _shouldFlush: true, _events: { message: [Function] } }
Sockets
Multipart messages
sock.on("message", function(first_part, second_part) {
console.log(first_part.toString());
console.log(second_part.toString());
});
sock.on("message", function() {
for(var key in arguments) {
console.log("Part " + key + ": " + arguments[key]);
};
});
Receive
sock.on("message", function() {
var messages = Array.prototype.slice.call(arguments);
messages.forEach(function(msg) {
console.log("Part:" + msg);
});
});
Sockets
Batched messaging
ØMQ Queue
3 Msg
"ØMQ batches messages in opportunistic manner. It sends all the messages available at the moment in one go. Latency of subsequent messages will be improved because sending single batch to the card is faster then sending lot of small messages."
sock.send("Part A",
zmq.ZMQ_SNDMORE);
sock.send("Msg");
sock.send("Part
B");
6 Part A
6 Part B
ØMQ Socket
send
send
send
<write busy>
<write ready>
Client Thread ØMQ I/O Thread
send batch
<write busy>
Patterns
Plugging multiple transports
Example
var zmq = require('zmq'), pub = zmq.socket('pub');
pub.bindSync('tcp://127.0.0.1:5555');pub.bindSync('ipc:///tmp/zmq.sock');
setInterval(function() { pub.send("I am polyglot!");}, 1000);
sub = zmq.socket('sub');sub.connect('ipc:///tmp/zmq.sock');sub.subscribe('');sub.on('message', function(msg) { console.log("Received: " + msg);});
var zmq = require('zmq'), sub = zmq.socket(sub');
sub.connect('tcp://127.0.0.1:5555');sub.subscribe('');
sub.on('message', function(msg) { console.log(Received: ' + msg);});
Sockets
Setting Socket Options
(*) http://api.zeromq.org/3-2:zmq-setsockopt
Setsockopt● usage (NodeJS ZMQ API): sock.setsockopt(<option>, <value>);
or sock.<option> = <value>;
sock.identity = 'monitor-1';sock.setsockopt('identity', 'monitor-1');
Identity:
// value can be any string up to 255 length.// identity is required on sockets// connecting to a Router.
Sockets
Setting Socket Options
Setsockopt
subscriber.subscribe('MUSIC');subscriber.setsockopt('subscribe', 'MUSIC');
Subscribe:
// sets a message filter for subscriber sockets.
subscriber.unsubscribe('MUSIC');susbscriber.setsockopt('subscribe', 'POP');
Unsubscribe:
// sets off a message filter for subscriber sockets.
4. Patterns [ ]
Patterns
Request / Reply
Synchronous task distribution
Ø REP #1
Ø REP # 2
Ø REQ
< round robin >
...
< synchronous >
Used when each message needs to be matched with a response. Handles only one message at time. Strict send-recv cycle.
Patterns
Request / Reply
Ø REP #1 Ø REP # 2Ø REQ
send
recv (wait response)
send
recv (dont wait response)
recv (dont wait response)
Patterns
Request / Reply
Ø REP #1 Ø REP # 2Ø REQ
send
recv
recv
send
send
-> REP #1 is downFAILURE
REQ #1 will not recover !
Patterns
Request / Reply
Basic Request / Reply
var zmq = require('zmq');
reply = zmq.socket('rep');
reply.bind('tcp://127.0.0.1:5555', function(err) { if (err) throw err; });
reply.on('message', function(msg) { console.log('Received: ' + msg); reply.send('Pong!');});
var zmq = require('zmq');
request = zmq.socket('req');
request.connect('tcp://127.0.0.1:5555');
request.send('Ping');
request.on('message', function(msg) { console.log('Response: ' + msg);});
Patterns
Request / Reply
Dealer socket message deliveryMessages have to be multipart, consisting on: an empty delimiter header, followed by the content body parts.
sock.send("Body");
Outgoing Queue
0
6 Body
sock.send(new Buffer([]), zmq.
ZMQ_SNDMORE);
send
send
Patterns
Request / Reply
Dealer socket message deliveryOutgoing messages are round-robined among all connected peers. However, sending depends on the availability of the receiver.
sock.send(
["", "Msg"]);
Outgoing Queue
3 Msg
6 Part A
Peer Socket A Peer Socket B
sock.send(
["", "Bla");
sock.send(
["", "Msg2"]);
send
send
send
To A
To B
To A
send
<write ready>
<write busy>
<write ready>
0
0
4 Msg2
0
Patterns
Request / Reply
Dealer socket message deliveryReconnections are handled automatically: messages are asynchronously delivered. Response time does not block sending more messages.
Outgoing Queue Peer Socket A Peer Socket B
send
-> Up againsock.send(
["", "Msg"]);
3 Msg
sock.send(
["", "Msg2"]);
send
send
To A
To A
0
4 Msg2
0
Patterns
Request / Reply
Ø REP #1 Ø REP # 2Ø DEALER
enqueue < Msg A >
send < Msg B >
-> REP #1 is down
enqueue < Msg C >
send < Msg A, Msg C >
send < Msg D >
Dealer socket handles peer reconnection automatically. It will send messages that queued for that peer once it has established connection.
Patterns
Request / Reply
> Response from: 5001> Response from: 5001> Response from: 5002> Response from: 5001> Response from: 5001> Response from: 5002> ...
Dealer example
var dealer = zmq.socket('dealer');dealer.connect('tcp://127.0.0.1:5001');dealer.connect('tcp://127.0.0.1:5002');
setInterval(function() { dealer.send(["", "Msg"]);}, 1000);
dealer.on('message', function(h, msg) { console.log('Response from: ' + msg);});
running this 'might' output ->
Patterns
Request / Reply
Router socket messagingMessages have to be multipart, consisting on: an identity frame, an empty delimiter frame, followed by the content body parts. This applies to both incoming and outgoing messages.
sock.send("Body");
Outgoing Queue
0
6 Body
sock.send("", zmq.
ZMQ_SNDMORE);
send
send
sock.send(
"worker-1", zmq.
ZMQ_SNDMORE); 8 worker-1
send
Patterns
Request / Reply
Router is asynchronousIncoming messages are fair-queued among all connected and available peers.
Peer Socket A Peer Socket B ØMQ Router
Incoming Queue
Socketsend
send
send
Patterns
Request / Reply
Router is asynchronousReceiving depends on the availability of the sender.
Peer Socket A Peer Socket B ØMQ Router
Incoming Queue
Socketsend
send
Patterns
Push / Pull
Unidirectional data distributionThe Push / Pull pattern fits well on pipelined communication, i.e., when no response is required for a given message.
Ø
Ø
...
ØØ
Patterns
Push / Pull
Binding the Push socketOutgoing messages are round-robined among all connected and available peers.
push.send("Msg");
push.send("Part
B");
Outgoing Queue Peer Socket A Peer Socket B
push.send("Part A",
zmq.ZMQ_SNDMORE);
push.send("Msg2");
3 Msg
6 Part A
6 Part B
4 Msg2
send
send
send
send
Patterns
Push / Pull
Binding the Push socketOutgoing messages are round-robined among all connected and available peers.
Outgoing Queue Peer Socket A Peer Socket B
-> Down3 Msg
6 Part A
6 Part B
4 Msg2
send
send
send
push.send("Msg");
push.send("Part
B");
push.send("Part A",
zmq.ZMQ_SNDMORE);
push.send("Msg2");
send
send
send
send
Patterns
Push / Pull
Binding the Push socketDisconnections and reconnections are handled automatically.
Outgoing Queue Peer Socket A Peer Socket B
4 More
6 Part 1
6 Part 2
5 Other send
send
send
push.send("More"); send
push.send("Part
2");
push.send("Part 1",
zmq.ZMQ_SNDMORE);send
send
push.send("Other");send
Patterns
Push / Pull
Parallel task distribution
Ø PULL #1
Ø PULL # 2
Ø PUSH
< round robin >
...
Patterns
Push / Pull
Passive agents / Monitor
Ø PULLØ PUSH
...Ø PUSH
Patterns
Push / Pull
Binding the Pull socketPush's outgoing messages are round-robined among all connected peers.
push.send("Msg");
push.send("Part
B");
Outgoing Queue Peer Socket A Peer Socket B
push.send("Part A",
zmq.ZMQ_SNDMORE);
push.send("Msg2");
send
send
send
send
3 Msg
6 Part A
6 Part B
4 Msg2
To A
To B
To A
Patterns
Push / Pull
Binding the Pull socketPush's outgoing messages are round-robined among all connected peers.
push.send("Msg");
push.send("Part
B");
Outgoing Queue Peer Socket A Peer Socket B
push.send("Part A",
zmq.ZMQ_SNDMORE);
push.send("Msg2");
send
send
send
sendsend
send
send
3 Msg
6 Part A
6 Part B
4 Msg2
To A
To B
To A
Patterns
Push / Pull
Binding the Pull socketDisconnections and reconnections are handled automatically.
Outgoing Queue Peer Socket A Peer Socket B
send
send
-> Down4 More
6 Part 1
6 Part 2
5 Other
To A
To A
To B
push.send("More"); send
push.send("Part
2");
push.send("Part 1",
zmq.ZMQ_SNDMORE);send
send
push.send("Other");send
Patterns
Push / Pull
Binding the Pull socketSending is asynchronous, it depends on the availability of the receiver.
Outgoing Queue Peer Socket A Peer Socket B
send
-> Up again
5 OtherTo B
push.send("More"); send
push.send("Part
2");
push.send("Part 1",
zmq.ZMQ_SNDMORE);send
send
push.send("Other");send
Patterns
Push / Pull
Ø PULLØ PUSH #1
...Ø PUSH #2
Active agents / collector pattern
Patterns
Publish / Subscribe
Broadcast data distribution
Ø SUB #1
Ø PUB #1
...Ø SUB #2
Ø SUB #2
Message distribution
Patterns
Publish / Subscribe
sock.send("Msg");
Outgoing Queue
3 Msg
Subscriber A
Subscriber B
sendsend
Outgoing messages are send to each connected and available peers.
Subscriber C
Message distribution
Patterns
Publish / Subscribe
sock.send("Msg");
Outgoing Queue
3 Msg
Subscriber A
Subscriber B
send
send
Outgoing messages are send to each connected and available peers.
Subscriber C
5 Othersock.send("Other");send
Subscribers can register to a specific topic.
Data filtering
Patterns
Publish / Subscribe
ØMQ Publisher
Subscriber A
Subscriber B
Subscriber C
Outgoing Queue
subscribe AR
subscribe VE
Socket
subscribe VE
Subscribers can register to a specific topic.
Data filtering
Patterns
Publish / Subscribe
ØMQ Publisher
Subscriber A
Subscriber B
Subscriber C
Outgoing Queue
subscribe AR
subscribe VE
Socket
subscribe VE
sock.send("AR
news");
send7 AR news
send
Subscribers can register to a specific topic.
Data filtering
Patterns
Publish / Subscribe
ØMQ Publisher
Subscriber A
Subscriber B
Subscriber C
Outgoing Queue
subscribe AR
subscribe VE
Socket
subscribe VE
sock.send("VE
news");
send7 VE news
send
send
Patterns
Publish / Subscribe
Data filtering example
pub = zmq.socket('pub');
pub.bindSync("tcp://10.0.0.12:3055");
count = 0setInterval(function() { pub.send("TEST " + count++);}, 1000);
sub = zmq.socket('sub');
sub.connect("tcp://10.0.0.12:3055");sub.subscribe("TEST");
sub.on("message", function(msg) { console.log("Received: " + msg);});
// older messages won't be// received
> TEST 6> TEST 7> TEST 8> TEST 9> TEST 10
running this 'might' output ->
Patterns
Publish / Subscribe
Multicast example
var zmq = require('zmq'), pub = zmq.socket('pub');
pub.bind( "epgm://10.0.0.12;239.192.1.1:3055", function(err) { if(err) throw err; })
setInterval(function() { pub.send("From pid: " + process.pid);}, 1000);
var zmq = require('zmq'), sub = zmq.socket('sub');
sub.connect( "epgm://10.0.0.13;239.192.1.1:3055");
sub.subscribe("");
sub.on('message', function(msg) { console.log("Received " + msg);});
(*) See running demo at http://youtu.be/NQrH0SATPk0
"XSUB and XPUB are exactly like SUB and PUB except they expose subscriptions as special messages."
Data distribution proxy
Patterns
XSUB / XPUB
XSUB/XPUB Proxy
Subscriber A
Subscriber B
Subscriber CXPUB Socket
XSUB Socket
Publisher A
Publisher B
Publisher C
subsc
ribe P
Y
forward subscription
Messages are forwarded by the proxy to the registered subscribers.
Data distribution proxy
Patterns
XSUB / XPUB
XSUB/XPUB Proxy
Subscriber A
Subscriber B
Subscriber CXPUB Socket
XSUB Socket
Publisher A
Publisher B
Publisher C
forward message
send "PY <msg>"
send
5. Useful links [ ]
Useful Links
Workshop examples
http://github.com/krakatoa/node_zmq_workshop
Useful Links
The Guide
http://zguide.zeromq.org/page:all
Useful Links
Pieter Hintjens personal blog
http://hintjens.com/
Useful Links
API References
http://api.zeromq.org/3-2:zmq-socket
http://api.zeromq.org/3-2:zmq-setsockopt