When it comes to choosing a messaging solution, one must ensure that the messaging architecture is:
- Robust
- Scalable
- Supports both point-to-point and publish-subscribe models.
- Efficiently handles high volume of asynchronous requests.
- Allows seamless integration with a SOA framework.
An enterprise messaging architecture that caters for the above can be designed using the following core J2EE design patterns:
- Message Broker
- Service Activator
- Service To Worker
- Web Service endpoint Proxy
Sample Code below using Message Broker, Service Activator and Service To Worker J2EE core design patterns:
-- JMSMessageBroker interface
import java.io.Serializable;
import javax.jms.JMSException;
import javax.naming.NamingException;
public interface JMSMessageBroker {
void sendTextMessageToQueue(String msg) throws NamingException, JMSException;
void sendObjectMessageToQueue(Serializable msg) throws JMSException, NamingException;
void receiveFromQueue();
}
--JMSMessageBrokerImpl class
import java.io.Serializable;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.jms.TextMessage;
import javax.naming.NamingException;
public class JMSMessageBrokerImpl implements JMSMessageBroker {
private QueueConnectionFactory connectionFactory;
private JMSServiceLocator jmsServiceLocator;
private Queue queue;
private QueueConnection queueConnection;
private QueueSession queueSession;
private MessageProducer messageProducer;
private TextMessage textMessage;
private ObjectMessage objectMessage;
private MessageConsumer messageConsumer;
private Message mesg;
private String text;
private Object obj;
public void receiveFromQueue() {
// TODO Auto-generated method stub
try {
connectionFactory = (QueueConnectionFactory) jmsServiceLocator.getQueueConnectionFactory();
queueConnection = connectionFactory.createQueueConnection();
queue = jmsServiceLocator.getQueue();
queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
messageConsumer = queueSession.createConsumer(queue);
mesg = (TextMessage)messageConsumer.receive();
if(mesg instanceof TextMessage){
text = ((TextMessage)mesg).getText();
}
else if(mesg instanceof ObjectMessage){
obj = ((ObjectMessage)mesg).getObject();
}
}catch(NamingException e){e.printStackTrace();}
catch(JMSException e1){e1.printStackTrace();}
}
public void sendObjectMessageToQueue(Serializable msg) throws JMSException, NamingException {
// TODO Auto-generated method stub
connectionFactory = (QueueConnectionFactory) jmsServiceLocator.getQueueConnectionFactory();
queueConnection = connectionFactory.createQueueConnection();
queue = jmsServiceLocator.getQueue();
queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
messageProducer = queueSession.createProducer(queue);
objectMessage = queueSession.createObjectMessage(msg);
messageProducer.send(objectMessage);
}
public void sendTextMessageToQueue(String msg) throws NamingException, JMSException {
// TODO Auto-generated method stub
connectionFactory = (QueueConnectionFactory) jmsServiceLocator.getQueueConnectionFactory();
queueConnection = connectionFactory.createQueueConnection();
queue = jmsServiceLocator.getQueue();
queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
messageProducer = queueSession.createProducer(queue);
textMessage = queueSession.createTextMessage(msg);
messageProducer.send(textMessage);
}
}
--JMSTaskManager interface
public interface JMSTaskManager {
void processRequest() throws InterruptedException;
}
--JMSTaskManagerImpl class
import java.io.Serializable;
public class JMSTaskManagerImpl implements JMSTaskManager, Serializable {
private JMSCommandProcessorImpl jmsCommandProcessor;
private Object businessService;
private String action;
private Object[] arguments;
public JMSTaskManagerImpl(Object businessService, String action,
Object[] arguments) {
// TODO Auto-generated constructor stub
this.businessService = businessService;
this.action = action;
this.arguments = arguments;
}
public void processRequest() throws InterruptedException {
// TODO Auto-generated method stub
jmsCommandProcessor = new JMSCommandProcessorImpl();
jmsCommandProcessor.processRequest(businessService,action,arguments);
}
}
--JMSCommandProcessor interface
public interface JMSCommandProcessor {
void processRequest(Object businessService, String action, Object[] arguments) throws InterruptedException;
}
--JMSCommandProcessorImpl class
import java.io.Serializable;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.Date;
public class JMSCommandProcessorImpl implements JMSCommandProcessor,Serializable {
private static int seqno;
private Command command;
public JMSCommandProcessorImpl() {
// TODO Auto-generated constructor stub
}
public void processRequest(Object businessService, String action, Object[] arguments) throws InterruptedException {
// TODO Auto-generated method stub
command = new Command(businessService,action,arguments);
Thread task = new Thread(command);
task.setName(businessService.getClass().getName()+seqno++);
task.start();
synchronized(task){
task.wait();
}
}
class Command implements Runnable {
private Object businessService;
private String action;
private Method[] methods;
private Method method;
private Object[] arguments;
Command(Object businessService,String action,Object[] arguments){
this.businessService = businessService;
this.action = action;
this.arguments = arguments;
}
public void run() {
try {
Class cls = this.businessService.getClass();
Object service = cls.newInstance();
methods = cls.getMethods();
for(Method method : methods){
this.method = method;
if(action.equals(method.getName())){
break;
}
}
synchronized(this){
method.invoke(service, arguments);
notify();
}
}catch(InstantiationException e){e.printStackTrace();}
catch(IllegalAccessException e1){e1.printStackTrace();}
catch(InvocationTargetException e3){e3.printStackTrace();}
}
}
}
--JMSMessageListener class
import java.io.Serializable;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.ObjectMessage;
import com.mockrunner.mock.jms.MockObjectMessage;
public class JMSMessageListener implements MessageListener {
private Serializable JmsTaskManager;
public void onMessage(Message msg) {
// TODO Auto-generated method stub
if(msg instanceof ObjectMessage){
try {
msg.acknowledge();
JmsTaskManager = ((ObjectMessage)msg).getObject();
((JMSTaskManager)JmsTaskManager).processRequest();
}catch(JMSException e){e.printStackTrace();}
catch(InterruptedException e1){e1.printStackTrace();}
}
}
}
Subscribe to:
Post Comments (Atom)
thanks
ReplyDeleteI agree with you that Robustness and Scalability is must for any messaging architecture.The sample code is really helpful.Although its lengthy so it will take some time to me for understanding but it seems promising.
ReplyDeletepublic key infrastructure