Wednesday, 1 July 2009

Enterprise Messaging Architecture Design using JMS

When it comes to choosing a messaging solution, one must ensure that the messaging architecture is:


  - Robust
  - Scalable
  - Supports both point-to-point and publish-subscribe models.
  - Efficiently handles high volume of asynchronous requests.
  - Allows seamless integration with a SOA framework.


An enterprise messaging architecture that caters for the above can be designed using the following core J2EE design patterns:


  - Message Broker
  - Service Activator
  - Service To Worker
  - Web Service endpoint Proxy


Sample Code below using Message Broker, Service Activator and Service To Worker J2EE core design patterns:


-- JMSMessageBroker interface

import java.io.Serializable;

import javax.jms.JMSException;
import javax.naming.NamingException;

public interface JMSMessageBroker {

void sendTextMessageToQueue(String msg) throws NamingException, JMSException;
void sendObjectMessageToQueue(Serializable msg) throws JMSException, NamingException;
void receiveFromQueue();
}

--JMSMessageBrokerImpl class

import java.io.Serializable;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.NamingException;

public class JMSMessageBrokerImpl implements JMSMessageBroker {

private QueueConnectionFactory connectionFactory;
private JMSServiceLocator jmsServiceLocator;
private Queue queue;
private QueueConnection queueConnection;
private QueueSession queueSession;
private MessageProducer messageProducer;
private TextMessage textMessage;
private ObjectMessage objectMessage;
private MessageConsumer messageConsumer;
private Message mesg;
private String text;
private Object obj;

public void receiveFromQueue() {
// TODO Auto-generated method stub

try {

connectionFactory = (QueueConnectionFactory) jmsServiceLocator.getQueueConnectionFactory();
queueConnection = connectionFactory.createQueueConnection();
queue = jmsServiceLocator.getQueue();
queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
messageConsumer = queueSession.createConsumer(queue);
mesg = (TextMessage)messageConsumer.receive();

if(mesg instanceof TextMessage){

text = ((TextMessage)mesg).getText();
}
else if(mesg instanceof ObjectMessage){

obj = ((ObjectMessage)mesg).getObject();
}
}catch(NamingException e){e.printStackTrace();}
catch(JMSException e1){e1.printStackTrace();}
}

public void sendObjectMessageToQueue(Serializable msg) throws JMSException, NamingException {
// TODO Auto-generated method stub

connectionFactory = (QueueConnectionFactory) jmsServiceLocator.getQueueConnectionFactory();
queueConnection = connectionFactory.createQueueConnection();
queue = jmsServiceLocator.getQueue();
queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
messageProducer = queueSession.createProducer(queue);

objectMessage = queueSession.createObjectMessage(msg);
messageProducer.send(objectMessage);
}

public void sendTextMessageToQueue(String msg) throws NamingException, JMSException {
// TODO Auto-generated method stub

connectionFactory = (QueueConnectionFactory) jmsServiceLocator.getQueueConnectionFactory();
queueConnection = connectionFactory.createQueueConnection();
queue = jmsServiceLocator.getQueue();
queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
messageProducer = queueSession.createProducer(queue);

textMessage = queueSession.createTextMessage(msg);
messageProducer.send(textMessage);
}
}

--JMSTaskManager interface

public interface JMSTaskManager {

void processRequest() throws InterruptedException;

}

--JMSTaskManagerImpl class

import java.io.Serializable;

public class JMSTaskManagerImpl implements JMSTaskManager, Serializable {


private JMSCommandProcessorImpl jmsCommandProcessor;
private Object businessService;
private String action;
private Object[] arguments;

public JMSTaskManagerImpl(Object businessService, String action,
Object[] arguments) {
// TODO Auto-generated constructor stub
this.businessService = businessService;
this.action = action;
this.arguments = arguments;
}

public void processRequest() throws InterruptedException {
// TODO Auto-generated method stub

jmsCommandProcessor = new JMSCommandProcessorImpl();
jmsCommandProcessor.processRequest(businessService,action,arguments);
}

}

--JMSCommandProcessor interface

public interface JMSCommandProcessor {

void processRequest(Object businessService, String action, Object[] arguments) throws InterruptedException;

}

--JMSCommandProcessorImpl class

import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Date;

public class JMSCommandProcessorImpl implements JMSCommandProcessor,Serializable {

private static int seqno;
private Command command;

public JMSCommandProcessorImpl() {
// TODO Auto-generated constructor stub

}

public void processRequest(Object businessService, String action, Object[] arguments) throws InterruptedException {
// TODO Auto-generated method stub

command = new Command(businessService,action,arguments);
Thread task = new Thread(command);
task.setName(businessService.getClass().getName()+seqno++);
task.start();

synchronized(task){
task.wait();
}
}

class Command implements Runnable {

private Object businessService;
private String action;
private Method[] methods;
private Method method;
private Object[] arguments;

Command(Object businessService,String action,Object[] arguments){
this.businessService = businessService;
this.action = action;
this.arguments = arguments;
}
public void run() {

try {

Class cls = this.businessService.getClass();
Object service = cls.newInstance();
methods = cls.getMethods();

for(Method method : methods){
this.method = method;
if(action.equals(method.getName())){
break;
}

}
synchronized(this){
method.invoke(service, arguments);
notify();
}

}catch(InstantiationException e){e.printStackTrace();}
catch(IllegalAccessException e1){e1.printStackTrace();}
catch(InvocationTargetException e3){e3.printStackTrace();}
}
}

}

--JMSMessageListener class

import java.io.Serializable;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;

import com.mockrunner.mock.jms.MockObjectMessage;

public class JMSMessageListener implements MessageListener {

private Serializable JmsTaskManager;

public void onMessage(Message msg) {
// TODO Auto-generated method stub

if(msg instanceof ObjectMessage){

try {

msg.acknowledge();
JmsTaskManager = ((ObjectMessage)msg).getObject();
((JMSTaskManager)JmsTaskManager).processRequest();

}catch(JMSException e){e.printStackTrace();}
catch(InterruptedException e1){e1.printStackTrace();}
}
}

}