Wednesday, 1 July 2009

Enterprise Messaging Architecture Design using JMS

When it comes to choosing a messaging solution, one must ensure that the messaging architecture is:


  - Robust
  - Scalable
  - Supports both point-to-point and publish-subscribe models.
  - Efficiently handles high volume of asynchronous requests.
  - Allows seamless integration with a SOA framework.


An enterprise messaging architecture that caters for the above can be designed using the following core J2EE design patterns:


  - Message Broker
  - Service Activator
  - Service To Worker
  - Web Service endpoint Proxy


Sample Code below using Message Broker, Service Activator and Service To Worker J2EE core design patterns:


-- JMSMessageBroker interface

import java.io.Serializable;

import javax.jms.JMSException;
import javax.naming.NamingException;

public interface JMSMessageBroker {

void sendTextMessageToQueue(String msg) throws NamingException, JMSException;
void sendObjectMessageToQueue(Serializable msg) throws JMSException, NamingException;
void receiveFromQueue();
}

--JMSMessageBrokerImpl class

import java.io.Serializable;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.NamingException;

public class JMSMessageBrokerImpl implements JMSMessageBroker {

private QueueConnectionFactory connectionFactory;
private JMSServiceLocator jmsServiceLocator;
private Queue queue;
private QueueConnection queueConnection;
private QueueSession queueSession;
private MessageProducer messageProducer;
private TextMessage textMessage;
private ObjectMessage objectMessage;
private MessageConsumer messageConsumer;
private Message mesg;
private String text;
private Object obj;

public void receiveFromQueue() {
// TODO Auto-generated method stub

try {

connectionFactory = (QueueConnectionFactory) jmsServiceLocator.getQueueConnectionFactory();
queueConnection = connectionFactory.createQueueConnection();
queue = jmsServiceLocator.getQueue();
queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
messageConsumer = queueSession.createConsumer(queue);
mesg = (TextMessage)messageConsumer.receive();

if(mesg instanceof TextMessage){

text = ((TextMessage)mesg).getText();
}
else if(mesg instanceof ObjectMessage){

obj = ((ObjectMessage)mesg).getObject();
}
}catch(NamingException e){e.printStackTrace();}
catch(JMSException e1){e1.printStackTrace();}
}

public void sendObjectMessageToQueue(Serializable msg) throws JMSException, NamingException {
// TODO Auto-generated method stub

connectionFactory = (QueueConnectionFactory) jmsServiceLocator.getQueueConnectionFactory();
queueConnection = connectionFactory.createQueueConnection();
queue = jmsServiceLocator.getQueue();
queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
messageProducer = queueSession.createProducer(queue);

objectMessage = queueSession.createObjectMessage(msg);
messageProducer.send(objectMessage);
}

public void sendTextMessageToQueue(String msg) throws NamingException, JMSException {
// TODO Auto-generated method stub

connectionFactory = (QueueConnectionFactory) jmsServiceLocator.getQueueConnectionFactory();
queueConnection = connectionFactory.createQueueConnection();
queue = jmsServiceLocator.getQueue();
queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
messageProducer = queueSession.createProducer(queue);

textMessage = queueSession.createTextMessage(msg);
messageProducer.send(textMessage);
}
}

--JMSTaskManager interface

public interface JMSTaskManager {

void processRequest() throws InterruptedException;

}

--JMSTaskManagerImpl class

import java.io.Serializable;

public class JMSTaskManagerImpl implements JMSTaskManager, Serializable {


private JMSCommandProcessorImpl jmsCommandProcessor;
private Object businessService;
private String action;
private Object[] arguments;

public JMSTaskManagerImpl(Object businessService, String action,
Object[] arguments) {
// TODO Auto-generated constructor stub
this.businessService = businessService;
this.action = action;
this.arguments = arguments;
}

public void processRequest() throws InterruptedException {
// TODO Auto-generated method stub

jmsCommandProcessor = new JMSCommandProcessorImpl();
jmsCommandProcessor.processRequest(businessService,action,arguments);
}

}

--JMSCommandProcessor interface

public interface JMSCommandProcessor {

void processRequest(Object businessService, String action, Object[] arguments) throws InterruptedException;

}

--JMSCommandProcessorImpl class

import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Date;

public class JMSCommandProcessorImpl implements JMSCommandProcessor,Serializable {

private static int seqno;
private Command command;

public JMSCommandProcessorImpl() {
// TODO Auto-generated constructor stub

}

public void processRequest(Object businessService, String action, Object[] arguments) throws InterruptedException {
// TODO Auto-generated method stub

command = new Command(businessService,action,arguments);
Thread task = new Thread(command);
task.setName(businessService.getClass().getName()+seqno++);
task.start();

synchronized(task){
task.wait();
}
}

class Command implements Runnable {

private Object businessService;
private String action;
private Method[] methods;
private Method method;
private Object[] arguments;

Command(Object businessService,String action,Object[] arguments){
this.businessService = businessService;
this.action = action;
this.arguments = arguments;
}
public void run() {

try {

Class cls = this.businessService.getClass();
Object service = cls.newInstance();
methods = cls.getMethods();

for(Method method : methods){
this.method = method;
if(action.equals(method.getName())){
break;
}

}
synchronized(this){
method.invoke(service, arguments);
notify();
}

}catch(InstantiationException e){e.printStackTrace();}
catch(IllegalAccessException e1){e1.printStackTrace();}
catch(InvocationTargetException e3){e3.printStackTrace();}
}
}

}

--JMSMessageListener class

import java.io.Serializable;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;

import com.mockrunner.mock.jms.MockObjectMessage;

public class JMSMessageListener implements MessageListener {

private Serializable JmsTaskManager;

public void onMessage(Message msg) {
// TODO Auto-generated method stub

if(msg instanceof ObjectMessage){

try {

msg.acknowledge();
JmsTaskManager = ((ObjectMessage)msg).getObject();
((JMSTaskManager)JmsTaskManager).processRequest();

}catch(JMSException e){e.printStackTrace();}
catch(InterruptedException e1){e1.printStackTrace();}
}
}

}

2 comments:

  1. I agree with you that Robustness and Scalability is must for any messaging architecture.The sample code is really helpful.Although its lengthy so it will take some time to me for understanding but it seems promising.
    public key infrastructure

    ReplyDelete