lsublsub.org/ist/13.mom.pdf · ejemplo point-to-point: synchronous receiver i n i t i a l c o n t e...
TRANSCRIPT
MOM
LSUB
GSYC
3 de abril de 2013
(cc) 2013 Laboratorio de Sistemas,Algunos derechos reservados. Este trabajo se entrega bajo la licencia Creative Commons Reconocimiento -
NoComercial - SinObraDerivada (by-nc-nd). Para obtener la licencia completa, veasehttp://creativecommons.org/licenses/. Tambien puede solicitarse a Creative Commons, 559 Nathan Abbott Way,
Stanford, California 94305, USA.
Las imagenes de terceros conservan su licencia original.
RPC
• MOM: Message Oriented Middleware. JMS (Java MessageService) es el MOM de Java.
• Permite comunicacion debilmente acoplada, fiable, sıncrona yasıncrona.
• Es una especificacion de API que forma parte de Java EE, haymultiples implementaciones.
• Usaremos GlassFish como servidor de aplicaciones Java EE.
JMS
¿Cuando se usa una comunicacion debilmente acoplada?
• No se quiere depender de las definiciones del resto decomponentes.
• No se quiere depender de la ejecucion de los otroscomponentes: los mensajes pueden quedar almacenados en elsistema y ser atendidos mas tarde.
Point-to-Point
• Un consumidor por mensaje
• Colas de mensajes persistentes.
• El receptor confirma la recepcion.
Imagen: (c) Oracle
Publish/Subscribe
• Multiples consumidores por mensaje
• Un consumidor se subscribe a un tema (topic).
• El receptor debe estar activo para recibir.
Imagen: (c) Oracle
JMS
Tipos de mensajes:
• Text: Strings.
• Object: Un objeto Serializable.
• Bytes: Un array de bytes.
• Map: Un diccionario.
• Stream: Un stream de valores primitivos.
JMS
Una aplicacion JMS se compone de:
• Clientes: componentes que envıan y reciben mensajes. Segarantiza que el mensaje se entrega justo una vez.
• Mensajes: objetos de comunicacion entre componentes.
• Proveedor JMS: el sistema que implementa el API de JMS. Laplataforma Java EE proporciona un proveedor de JMS. Haydistintas implementaciones.
• Los objetos administrados (administered objects) necesariospara interactuar con el proveedor. Se administran medianteuna herramienta (p. ej. asadmin).
Arquitectura
Imagen: (c) Oracle
Administered Objects
La herramienta de configuracion de Java EE nos permiteadministrar dos tipos de objetos:
• ConnectionFactory: crea conexiones con el proveedor deJMS.
• Queue connection factory: para Point-To-Point.• Topic connection factory: para Publish/Subscribe.• Connection factory: para ambas.
• Destination: representa emisores y destinatarios.• Queue: para Point-To-Point.• Topic: para Publish/Subscribe.
Administracion: GlassFish
• Crear un dominio (una o mas instancias del servidor de apliaciones) que tieneasociados estos puertos TCP entre otros:
• Un puerto para el servidor de aplicaciones (8080 por omision).• Un puerto de administracion (4848 por omision).• Un puerto de JMS (7070 por omision).• Un puerto de IIOP para RMI/CORBA (3700 por omision).
• Arrancar el dominio y la base de datos con la herramienta asadmin:
• start-domain nombre-de-dominio• start-database
• Arrancar un navegador: http://localhost:4848
• Crear los objetos administrados de JMS (JMS Resources):
• Nombre JNDI.• Tipo de recurso.
• Configurar el JMS Default Host con la direccion de la maquina que ejecuta elbroker de JMS (en este caso la maquina en la que ejecuta GlassFish).
Administracion: GlassFish
Administracion: GlassFish
Administracion: GlassFish
JNDI
Los recursos se registran en un espacio de nombres de JNDI (JavaNaming and Directory Interface). Para resolver los nombres:
I n i t i a l C o n t e x t j n d i = new I n i t i a l C o n t e x t ( ) ;QueueConnect ionFactory q F a c t o r y =( QueueConnect ionFactory ) j n d i . l o o k u p ( ” F a c t o r i a 1 ” ) ;// Lookup queue
Queue queue = ( Queue ) j n d i . l o o k u p ( ” Cola1 ” ) ;
Modelo
• Conexion: objeto que representa la conexion con el proveedorde JMS.
• Sesion: contexto para enviar y recibir mensajes desde un unicothread. La sesion crea mensajes, productores y consumidorespara su thread (no se deben usar desde otros threads).
Modelo
Imagen: (c) Oracle
Interfaces
Conexion Point-to-Point
• El metodo createQueueConnection de la factorıa crea unaconexion con el proveedor de JMS.
• Dentro de una conexion podemos crear una o mas sesionespara enviar, recibir, etc.
• El metodo close() cierra la conexion y todo lo que dependede ella (sesiones, etc.).
QueueConnect ion c o n n e c t i o n = f a c t o r y . c r e a t e Q u e u e C o n n e c t i o n ( ) ;QueueSess ion s e s s i o n =
c o n n e c t i o n . c r e a t e Q u e u e S e s s i o n ( f a l s e , QueueSess ion .AUTO ACKNOWLEDGE) ;
Sesion Point-to-Point
• El primer argumento de createQueueSession inica si lasesion es transaccional.
• Si no es transaccional, hay que indicar un modo deasentimiento del mensaje:
• AUTO ACKNOWLEDGE: se hace automaticamente.• CLIENT ACKNOWLEDGE: el receptor debe invocar el metodoacknowledge() manualmente. Si una sesion termina sinconfirmar la recepcion de un mensaje, el mensaje vuelve aestar disponible en la cola. La confirmacion de un mensajeconfirma todos los mensajes anteriores recibidos en la sesion.
Sesion Transaccional
• Forman parte de la transaccion todos los mensajes enviados yrecibidos en la sesion entre dos invocaciones decommit()/rollback(), potencialmente de distintas colas.
• Transaction Commit: todos los mensajes producidos estanenviados y todos los mensajes consumidos entan asentidos.
• Transaction Rollback: todos los mensajes producidos se handestruido y todos los mensajes consumidos se han devuelto.
Sesion Transaccional
Envıo:
• Despues de enviar, los mensajes no estan disponibles en la colahasta que no se invoque el metodo commit() de la sesion.
• Si se invoca rollback() en su lugar, los mensajes de latransaccion no llegan nunca a estar disponibles en la cola.
Sesion Transaccional
Recepcion:
• Los mensajes recibidos en la transaccion no se eliminan de lacola mientras se van recibiendo; se eliminan de la cola cuandoel receptor invoca commit(), despues de recibirlos.
• Si se invoca rollback() en su lugar, todos los mensajes de latransaccion vuelven a estar disponibles en la cola.
Sesion
La sesion nos permite crear objetos de tipo:
• QueueReceiver para recibir mensajes. Aunque es posibletener dos sesiones distintas con QueueReceivers para lamisma cola, el estandar de JMS no define como se repartenlos mensajes. Solo un consumidor recibe el mensaje.
• QueueSender para enviar mensajes.
• QueueBrowser para inspeccionar mensajes en la cola sinsacarlos.
• TemporaryQueue para crear una cola temporal que solosobrevive a la conexion en la que se crea.
Ejemplo Point-To-Point: Sender
I n i t i a l C o n t e x t j n d i = new I n i t i a l C o n t e x t ( ) ;QueueConnect ionFactory f a c t o r y =
( QueueConnect ionFactory ) j n d i . l o o k u p ( ” F a c t o r i a 1 ” ) ;Queue queue = ( Queue ) j n d i . l o o k u p ( ” Cola1 ” ) ;
QueueConnect ion c o n n e c t i o n = f a c t o r y . c r e a t e Q u e u e C o n n e c t i o n ( ) ;QueueSess ion s e s s i o n =
c o n n e c t i o n . c r e a t e Q u e u e S e s s i o n ( f a l s e , QueueSess ion .AUTO ACKNOWLEDGE) ;QueueSender s e n d e r = s e s s i o n . c r e a t e S e n d e r ( queue ) ;
f o r ( i n t i = 0 ; i < 1 0 ; i ++){TextMessage msg = s e s s i o n . c r e a t e T e x t M e s s a g e ( ) ;msg . s e t T e x t ( ” Message ” + i + ” to Cola1 ! ” ) ;s e n d e r . send ( msg ) ;System . e r r . p r i n t ( ” . ” ) ;Thread . s l e e p ( 1 0 0 0 ) ;
}c o n n e c t i o n . c l o s e ( ) ; // c l o s e s the connec t i on , the s e s s i o n and the s ende r
Ejemplo Point-To-Point: Synchronous Receiver
I n i t i a l C o n t e x t j n d i = new I n i t i a l C o n t e x t ( ) ;QueueConnect ionFactory f a c t o r y =
( QueueConnect ionFactory ) j n d i . l o o k u p ( ” F a c t o r i a 1 ” ) ;Queue queue = ( Queue ) j n d i . l o o k u p ( ” Cola1 ” ) ;
QueueConnect ion c o n n e c t i o n = f a c t o r y . c r e a t e Q u e u e C o n n e c t i o n ( ) ;QueueSess ion s e s s i o n =
c o n n e c t i o n . c r e a t e Q u e u e S e s s i o n ( f a l s e , QueueSess ion .AUTO ACKNOWLEDGE) ;Q u e u e R e c e i v e r r e c e i v e r = s e s s i o n . c r e a t e R e c e i v e r ( queue ) ;
c o n n e c t i o n . s t a r t ( ) ;System . e r r . p r i n t l n ( ” L i s t e n i n g . . . ” ) ;f o r ( ; ; ) {
Message msg = r e c e i v e r . r e c e i v e ( ) ;i f ( msg == n u l l ){
System . out . p r i n t l n ( ”no more messages ! ” ) ;break ;
}i f ( msg i n s t a n c e o f TextMessage ){
TextMessage m = ( TextMessage ) msg ;System . out . p r i n t l n ( ” Message r e c e i v e d : ” + m. getText ( ) ) ;
}}c o n n e c t i o n . c l o s e ( ) ; // c l o s e s the connec t i on , the s e s s i o n and the r e c e i v e r
Ejemplo Point-To-Point: Asynchronous Receiver
p r i v a t e c l a s s A s y n c R e c e i v e r implements Runnable , M e s s a g e L i s t e n e r{p r i v a t e QueueConnect ion c o n n e c t i o n ;p r i v a t e Queue queue ;p u b l i c A s y n c R e c e i v e r ( QueueConnect ion con , Queue queue ){
t h i s . c o n n e c t i o n = con ;t h i s . queue = queue ;
}@ O v e r r i d ep u b l i c v o i d run (){
t r y {QueueSess ion s e s s i o n =
c o n n e c t i o n . c r e a t e Q u e u e S e s s i o n ( f a l s e , QueueSess ion .AUTO ACKNOWLEDGE) ;Q u e u e R e c e i v e r r e c e i v e r = s e s s i o n . c r e a t e R e c e i v e r ( queue ) ;r e c e i v e r . s e t M e s s a g e L i s t e n e r ( t h i s ) ;System . out . p r i n t l n ( Thread . c u r r e n t T h r e a d ( ) . g e t I d ( ) + ” l i s t e n i n g ! ” ) ;// The th r ead w i l l be do ing i t s j ob : doJob ( ) .// When a message comes , the c a l l b a c k onMessage ( )// i s i nvoked by a s ep a r a t e d th r ead => beware o f r a c e c o n d i t i o n s !doJob ( ) ;
}catch ( E x c e p t i o n e ){ . . . }}@ O v e r r i d ep u b l i c v o i d onMessage ( Message msg ) {
t r y {TextMessage m = ( TextMessage ) msg ;System . out . p r i n t l n ( ” L i s t e n e r , Thread ” +
Thread . c u r r e n t T h r e a d ( ) . g e t I d ( ) + ” message r e c e i v e d : ” + m. getText ( ) ) ;} catch ( JMSException e ) { . . . }
}}
Publish/Subscribe
• La conexion y la sesion se crea de forma similar, pero usandolas interfaces de Topic en lugar de las de Queue.
• El emisor se crea con el metodo createPublisher() de lasesion. Recibe como parametro el Topic al que se quiereenviar.
• El receptor se crea con el metodo createSubscriber().
• Para recepcion asıncrona, debemos instanciar un objetoConsumer mediante createConsumer().
Ejemplo Pub/Sub: Sender
t r y {
I n i t i a l C o n t e x t j n d i = new I n i t i a l C o n t e x t ( ) ;T o p i c C o n n e c t i o n F a c t o r y f a c t o r y =
( T o p i c C o n n e c t i o n F a c t o r y ) j n d i . l o o k u p ( ” F a c t o r i a 2 ” ) ;Topic t o p i c = ( Topic ) j n d i . l o o k u p ( ” Topic1 ” ) ;
T o p i c C o n n e c t i o n c o n n e c t i o n = f a c t o r y . c r e a t e T o p i c C o n n e c t i o n ( ) ;T o p i c S e s s i o n s e s s i o n =
c o n n e c t i o n . c r e a t e T o p i c S e s s i o n ( f a l s e , T o p i c S e s s i o n .AUTO ACKNOWLEDGE) ;T o p i c P u b l i s h e r p u b l i s h e r = s e s s i o n . c r e a t e P u b l i s h e r ( t o p i c ) ;
f o r ( i n t i = 0 ; i < 1 0 ; i ++){TextMessage msg = s e s s i o n . c r e a t e T e x t M e s s a g e ( ) ;msg . s e t T e x t ( ” Message ” + i + ” to Cola1 ! ” ) ;p u b l i s h e r . p u b l i s h ( msg ) ;System . e r r . p r i n t ( ” . ” ) ;Thread . s l e e p ( 1 0 0 0 ) ;
}c o n n e c t i o n . c l o s e ( ) ;
}catch ( E x c e p t i o n e ){e . p r i n t S t a c k T r a c e ( ) ;
}
Ejemplo Pub/Sub: Synchronous Receiver
t r y {I n i t i a l C o n t e x t j n d i = new I n i t i a l C o n t e x t ( ) ;T o p i c C o n n e c t i o n F a c t o r y f a c t o r y =
( T o p i c C o n n e c t i o n F a c t o r y ) j n d i . l o o k u p ( ” F a c t o r i a 2 ” ) ;Topic t o p i c = ( Topic ) j n d i . l o o k u p ( ” Topic1 ” ) ;
T o p i c S e s s i o n s e s s i o n =c o n n e c t i o n . c r e a t e T o p i c S e s s i o n ( f a l s e , T o p i c S e s s i o n .AUTO ACKNOWLEDGE) ;
MessageConsumer consumer = s e s s i o n . c reateConsumer ( t o p i c ) ;
System . out . p r i n t l n ( ” Thread ” + Thread . c u r r e n t T h r e a d ( ) . g e t I d ( ) + ” l i s t e n i n g ! ” ) ;f o r ( ; ; ) {
f o r ( ; ; ) {Message msg = consumer . r e c e i v e ( ) ;i f ( msg == n u l l ){
System . out . p r i n t l n ( ”no more messages ! ” ) ;break ;
}i f ( msg i n s t a n c e o f TextMessage ){
TextMessage m = ( TextMessage ) msg ;System . out . p r i n t l n ( ”Consumer , Thread ” +
Thread . c u r r e n t T h r e a d ( ) . g e t I d ( ) +” message r e c e i v e d : ” + m. getText ( ) ) ;
}}
}catch ( E x c e p t i o n e ){e . p r i n t S t a c k T r a c e ( ) ;
}
Ejemplo Pub/Sub: Asynchronous Receiver
p r i v a t e s t a t i c c l a s s A s y n c R e c e i v e r implements Runnable , M e s s a g e L i s t e n e r{p r i v a t e T o p i c C o n n e c t i o n c o n n e c t i o n ;p r i v a t e Topic t o p i c ;p u b l i c A s y n c R e c e i v e r ( T o p i c C o n n e c t i o n con , Topic t o p i c ){
t h i s . c o n n e c t i o n = con ;t h i s . t o p i c = t o p i c ;
}@ O v e r r i d ep u b l i c v o i d run (){
t r y {T o p i c S e s s i o n s e s s i o n =
c o n n e c t i o n . c r e a t e T o p i c S e s s i o n ( f a l s e , T o p i c S e s s i o n .AUTO ACKNOWLEDGE) ;T o p i c S u b s c r i b e r s u b s c r i b e r = s e s s i o n . c r e a t e S u b s c r i b e r ( t o p i c ) ;s u b s c r i b e r . s e t M e s s a g e L i s t e n e r ( t h i s ) ;System . out . p r i n t l n ( ” Thread ” + Thread . c u r r e n t T h r e a d ( ) . g e t I d ( ) + ” s u b s c r i b e d ! ” ) ;// The th r ead w i l l be do ing i t s j ob : doJob ( ) .// When a message comes , the c a l l b a c k onMessage ( )// i s i nvoked by a s ep a r a t e d th r ead => beware o f r a c e c o n d i t i o n s !doJob ( ) ;
}catch ( E x c e p t i o n e ){ . . . }}@ O v e r r i d ep u b l i c v o i d onMessage ( Message msg ) {
t r y {TextMessage m = ( TextMessage ) msg ;System . out . p r i n t l n ( ” S u b s c r i b e r , Thread ” +
Thread . c u r r e n t T h r e a d ( ) . g e t I d ( ) + ” message r e c e i v e d : ” + m. getText ( ) ) ;} catch ( JMSException e ) { . . . }
}}
Ejecucion
En la maquina en la que ejecuta GlassFish:
# $GLASSFISH i s t h e path to t h e G l a s s F i s h d i r e c t o r y :j a v a −cp jmsexample . j a r : $GLASSFISH/ g l a s s f i s h / l i b / gf−c l i e n t . j a r
org . l s u b . jmsexample . P2PRece iver
Ejecucion
En otras maquinas:
• No hace falta ejecutar GlassFish en ellas.
• Es necesario tener todas las clases en el classpath. Podemosinstalar GlassFish o copiar todos los JAR necesarios a mano.
• Hay que indicar la maquina en la que ejecuta el proveedor deJMS.
# $GLASSFISH i s t h e path to t h e G l a s s F i s h d i r e c t o r y :j a v a −Dorg . omg .CORBA. O R B I n i t i a l H o s t=omac . l s u b . org
−cp jmsexample . j a r : $GLASSFISH/ g l a s s f i s h / l i b / gf−c l i e n t . j a rorg . l s u b . jmsexample . P2PRece iver
asadmin
Otros comandos utiles:
• flush-jmsdest --desttype queue cola : drena la colaindicada.
• flush-jmsdest --desttype topic topic : drena el topic.
• stop-domain domain : para la ejecucion del dominio.
• delete-domain domain : borra el dominio.
• create-domain domain : crea un dominio.
• ...