Correlating messages over JMS/MQ
I have multi-threaded client(JMS over MQ) that sends a string in the body and service echoes it back to client much like echo service. I have two physical queues pointed by two JNDI queues (testRequest and testResponse): one dedicated for request and the other for reply.
It runs fine in both synchronous and asynchronous mode in WSAS 2.2.1 and also WSAS 2.3.
I am trying to correlate requests with their corresponding responses. According to AXIS2 documentation, I have to engage ws-addressing module and use separate transport connections for correlation.
Only I am able to correlate is when I use temporary queues for replies and NOT dedicated reply queue. Here are the options that uses temporary queues(NOTE: REPLY_PARAM is not set and also separate listener is not set to true):
//options.setProperty(JMSConstants.REPLY_PARAM, replyURL);
options.setTransportInProtocol(Constants.TRANSPORT_JMS);
//options.setUseSeparateListener(true);
stub._getServiceClient().setOptions(options);
When I uncomment the code that sets REPLY_PARAM and sets the separate listener to true, as shown below:
options.setProperty(JMSConstants.REPLY_PARAM, replyURL);
options.setTransportInProtocol(Constants.TRANSPORT_JMS);
options.setUseSeparateListener(true);
stub._getServiceClient().setOptions(options);
I get the Error looking up destination with JNDI name.
I have attached axis2.xml (which is same for both the above cases). Also, attached are JMS logs for temporary queues as wells the one for dedicated reply queue (ie when I have a separate listener).
Any input is appreciated.
Thanks,
Prasad
| Attachment | Size |
|---|---|
| axis2.xml | 17.22 KB |
| LogForTempQueuesThatWorks.txt | 636 bytes |
| LogForDedicatedQueThatThrowsException.txt | 4.25 KB |
- Login or register to post comments
- Printer friendly version
- 387 reads











Adding the client code
Attached is client code.
Issue Resolved
Basically, JMSSender (part of axis2-1.401 which wsas 2.3 is based) has a bug. Basically, it does not select the response message(s) selectively i.e., it does not consider the ws-addressing based message-id for filtering which is standard way of filtering messages.
If you look at the line 226 in org.apache.axis2.transport.jms.JMSSender, it shows that while creating MessageConsumer, messageSelector parameter is missing:
MessageConsumer consumer = session.createConsumer(replyDest);
So, if you replace the above line with the following lines, your messages will be correlated always (of course you need to engage 'addressing' module):
MessageConsumer consumer = null;
String messageID = msgContext.getMessageID();
if(messageID != null)
{
log.info("Setting MessageID: " + messageID + " as a JMS CorrelationId message selector");
String selectorStr = "JMSCorrelationID = '" + messageID + "'";
consumer = session.createConsumer(replyDest, selectorStr);
}
else
{
consumer = session.createConsumer(replyDest);
}
I found that it is the same issue with wsas 2.2.1. You can have this same fix there as well.
For those of you, who need this fix, please use the attached code.
Thanks,
Prasad Erabelli
Prasad The JMS transport
Prasad
The JMS transport code currently in the Axis2 SVN is now deprecated, and the code currently developed by the Apache Synapse team in that SVN is scheduled to be moved to a common module, so that its easier to maintain one codebase and share it. The correlation on message ID's was fixed on the Synapse version as I remember.. Anyway I will keep a tab for it..
thanks
asankha
Apache Synapse throws ClassCastException
Hi Asankha,
I tried the Synapse JAR before embarking on the solution I mentioed above without much success.
In other words it throws following exception:
java.lang.ClassCastException: com.ibm.mq.jms.MQConnectionFactory
at org.apache.synapse.transport.jms.JMSSender.sendMessage(JMSSender.java:138)
at org.apache.synapse.transport.base.AbstractTransportSender.invoke(AbstractTransportSender.java:120)
at org.apache.axis2.engine.AxisEngine.send(AxisEngine.java:448)
at org.apache.axis2.description.OutInAxisOperationClient.send(OutInAxisOperation.java:401)
at org.apache.axis2.description.OutInAxisOperationClient.executeImpl(OutInAxisOperation.java:228)
at org.apache.axis2.client.OperationClient.execute(OperationClient.java:163)
at _0.pingserverservice_1.srv.irs.treasury.gov.us.PingServerServiceStub.PingServer(PingServerServiceStub.java:213)
at gov.irs.stir.itma.soap.tests.mq.client.PingMQMultiSyncClient.run(PingMQMultiSyncClient.java:158)
at java.lang.Thread.run(Unknown Source)
So, I ended up fixing the code.
Thanks,
Prasad