When it comes to choosing a messaging solution, one must ensure that the messaging architecture is:
- Robust
- Scalable
- Supports both point-to-point and publish-subscribe models.
- Efficiently handles high volume of asynchronous requests.
- Allows seamless integration with a SOA framework.
An enterprise messaging architecture that caters for the above can be designed using the following core J2EE design patterns:
- Message Broker
- Service Activator
- Service To Worker
- Web Service endpoint Proxy
Sample Code below using Message Broker, Service Activator and Service To Worker J2EE core design patterns:
-- JMSMessageBroker interface
import java.io.Serializable;
import javax.jms.JMSException;
import javax.naming.NamingException;
public interface JMSMessageBroker {
void sendTextMessageToQueue(String msg) throws NamingException, JMSException;
void sendObjectMessageToQueue(Serializable msg) throws JMSException, NamingException;
void receiveFromQueue();
}
--JMSMessageBrokerImpl class
import java.io.Serializable;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.NamingException;
public class JMSMessageBrokerImpl implements JMSMessageBroker {
private QueueConnectionFactory connectionFactory;
private JMSServiceLocator jmsServiceLocator;
private Queue queue;
private QueueConnection queueConnection;
private QueueSession queueSession;
private MessageProducer messageProducer;
private TextMessage textMessage;
private ObjectMessage objectMessage;
private MessageConsumer messageConsumer;
private Message mesg;
private String text;
private Object obj;
public void receiveFromQueue() {
// TODO Auto-generated method stub
try {
connectionFactory = (QueueConnectionFactory) jmsServiceLocator.getQueueConnectionFactory();
queueConnection = connectionFactory.createQueueConnection();
queue = jmsServiceLocator.getQueue();
queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
messageConsumer = queueSession.createConsumer(queue);
mesg = (TextMessage)messageConsumer.receive();
if(mesg instanceof TextMessage){
text = ((TextMessage)mesg).getText();
}
else if(mesg instanceof ObjectMessage){
obj = ((ObjectMessage)mesg).getObject();
}
}catch(NamingException e){e.printStackTrace();}
catch(JMSException e1){e1.printStackTrace();}
}
public void sendObjectMessageToQueue(Serializable msg) throws JMSException, NamingException {
// TODO Auto-generated method stub
connectionFactory = (QueueConnectionFactory) jmsServiceLocator.getQueueConnectionFactory();
queueConnection = connectionFactory.createQueueConnection();
queue = jmsServiceLocator.getQueue();
queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
messageProducer = queueSession.createProducer(queue);
objectMessage = queueSession.createObjectMessage(msg);
messageProducer.send(objectMessage);
}
public void sendTextMessageToQueue(String msg) throws NamingException, JMSException {
// TODO Auto-generated method stub
connectionFactory = (QueueConnectionFactory) jmsServiceLocator.getQueueConnectionFactory();
queueConnection = connectionFactory.createQueueConnection();
queue = jmsServiceLocator.getQueue();
queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
messageProducer = queueSession.createProducer(queue);
textMessage = queueSession.createTextMessage(msg);
messageProducer.send(textMessage);
}
}
--JMSTaskManager interface
public interface JMSTaskManager {
void processRequest() throws InterruptedException;
}
--JMSTaskManagerImpl class
import java.io.Serializable;
public class JMSTaskManagerImpl implements JMSTaskManager, Serializable {
private JMSCommandProcessorImpl jmsCommandProcessor;
private Object businessService;
private String action;
private Object[] arguments;
public JMSTaskManagerImpl(Object businessService, String action,
Object[] arguments) {
// TODO Auto-generated constructor stub
this.businessService = businessService;
this.action = action;
this.arguments = arguments;
}
public void processRequest() throws InterruptedException {
// TODO Auto-generated method stub
jmsCommandProcessor = new JMSCommandProcessorImpl();
jmsCommandProcessor.processRequest(businessService,action,arguments);
}
}
--JMSCommandProcessor interface
public interface JMSCommandProcessor {
void processRequest(Object businessService, String action, Object[] arguments) throws InterruptedException;
}
--JMSCommandProcessorImpl class
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Date;
public class JMSCommandProcessorImpl implements JMSCommandProcessor,Serializable {
private static int seqno;
private Command command;
public JMSCommandProcessorImpl() {
// TODO Auto-generated constructor stub
}
public void processRequest(Object businessService, String action, Object[] arguments) throws InterruptedException {
// TODO Auto-generated method stub
command = new Command(businessService,action,arguments);
Thread task = new Thread(command);
task.setName(businessService.getClass().getName()+seqno++);
task.start();
synchronized(task){
task.wait();
}
}
class Command implements Runnable {
private Object businessService;
private String action;
private Method[] methods;
private Method method;
private Object[] arguments;
Command(Object businessService,String action,Object[] arguments){
this.businessService = businessService;
this.action = action;
this.arguments = arguments;
}
public void run() {
try {
Class cls = this.businessService.getClass();
Object service = cls.newInstance();
methods = cls.getMethods();
for(Method method : methods){
this.method = method;
if(action.equals(method.getName())){
break;
}
}
synchronized(this){
method.invoke(service, arguments);
notify();
}
}catch(InstantiationException e){e.printStackTrace();}
catch(IllegalAccessException e1){e1.printStackTrace();}
catch(InvocationTargetException e3){e3.printStackTrace();}
}
}
}
--JMSMessageListener class
import java.io.Serializable;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import com.mockrunner.mock.jms.MockObjectMessage;
public class JMSMessageListener implements MessageListener {
private Serializable JmsTaskManager;
public void onMessage(Message msg) {
// TODO Auto-generated method stub
if(msg instanceof ObjectMessage){
try {
msg.acknowledge();
JmsTaskManager = ((ObjectMessage)msg).getObject();
((JMSTaskManager)JmsTaskManager).processRequest();
}catch(JMSException e){e.printStackTrace();}
catch(InterruptedException e1){e1.printStackTrace();}
}
}
}
Wednesday, 1 July 2009
Subscribe to:
Posts (Atom)