|
We have quite some routes which end with a simple fire and forget
towards a JMS queue. In general, this works fine. But there is one more thing we would like to do afterwards, and that's capture and log the JMS message ID in some audit table. For the logging, we can use some custom bean, but the problem is: HOW can we get grip on the message ID, which is assigned by the JMS provider (in this case WebSphere MQ) ? When looking into the code from JmsProducer.setMessageId(), the MQ message ID is only available in request-reply configuration, and not with the "InOnly" case. Is there some "easy" way to capture the ID from the external JMS provider? Or is the only alternative using self generated message ID's ? (Camel 2.6.0 on WebSphere AS 6.1 and WebSphere MQ) Many thanks in advance! Tung |
|
On Thu, Jan 5, 2012 at 3:21 PM, wing-tung Leung
<[hidden email]> wrote: > But there is one more thing we would like to do afterwards, and that's > capture and log the JMS message ID in some audit table. I finally patched the Camel JmsProducer class, with a minor modification of the "InOnly" behavior. In this case, the actually sent JMS message, containing the filled in "JMSMessageID", is copied to the "out" from the exchange. This enables processors further down the line to capture the message ID for logging. Attached the patch. Any comments/ideas to improve this further? Any idea if such idea could end up in the trunk? I applied it to version 2.6.0, but it can be applied to the current trunk without conflicts as well. Regards, Tung |
|
Hi
Thanks for the patch. A possible issue is that the JMSMessageID is only available when the JMS Client have really sent the message, and some clients can be in an async mode, where they may take a little while before sending. So in that case I would assume Camel will have to wait for that to happen. Unless the Spring Sent callback happens as a sort of pre construct step, and the JMS client is able to determine the JMSMessageID before actually sending it. Anyway its probably different a bit from JMS vendor to vendor. So a one shoe solution is maybe not possible. Also a JMS InOnly message, you most likely do not want to have the Message on the Camel Exchange changed into something else such as a javax.jms.Message, as you did a fire and forget. So maybe the patch should just enrich the message by adding a header with the JMSMessageID value. And I think there should be an option so people can turn this on|off, based on their needs. BTW: What is your use case. You need to use the actual JMSMessageID for track and tracing or some sort? On Fri, Jan 6, 2012 at 2:18 PM, wing-tung Leung <[hidden email]> wrote: > On Thu, Jan 5, 2012 at 3:21 PM, wing-tung Leung > <[hidden email]> wrote: >> But there is one more thing we would like to do afterwards, and that's >> capture and log the JMS message ID in some audit table. > > I finally patched the Camel JmsProducer class, with a minor > modification of the "InOnly" behavior. In this case, the actually sent > JMS message, containing the filled in "JMSMessageID", is copied to the > "out" from the exchange. > > This enables processors further down the line to capture the message > ID for logging. > > Attached the patch. Any comments/ideas to improve this further? Any > idea if such idea could end up in the trunk? I applied it to version > 2.6.0, but it can be applied to the current trunk without conflicts as > well. > > Regards, > > Tung -- Claus Ibsen ----------------- FuseSource Email: [hidden email] Web: http://fusesource.com Twitter: davsclaus, fusenews Blog: http://davsclaus.blogspot.com/ Author of Camel in Action: http://www.manning.com/ibsen/ |
|
On Wed, Jan 18, 2012 at 4:53 PM, Claus Ibsen <[hidden email]> wrote:
> A possible issue is that the JMSMessageID is only available when the > JMS Client have really sent the message, and some clients can be in an > async mode, where they may take a little while before sending. So in > that case I would assume Camel will have to wait for that to happen. Sorry to respond so late, lost track of it in the huge pile of camel mailing .. ;-) I did not take into account that the sending could run asynchronously for some JMS clients. You are right that retrieving the JMSMessageID in that case could block the process a little. > Unless the Spring Sent callback happens as a sort of pre construct > step, and the JMS client is able to determine the JMSMessageID before > actually sending it. I'm afraid this is not clearly specified in the JMS spec, and we don't have any guarantee. :-( > Anyway its probably different a bit from JMS vendor to vendor. So a > one shoe solution is maybe not possible. Indeed. > Also a JMS InOnly message, you most likely do not want to have the > Message on the Camel Exchange changed into something else such as a > javax.jms.Message, as you did a fire and forget. So maybe the patch > should just enrich the message by adding a header with the > JMSMessageID value. Sounds reasonable. Code is written a while ago, I'm not sure, but IIRC, I copied the full message to the output because it was easier for logging both the JMSMessageID and the text message body. But the body could be captured earlier of course, so having only the JMSMessageID as header value should probably work fine as well. When I have more time to revisit the code, I will check if I can refactor both the patch and the route definition according to your suggestion. > And I think there should be an option so people can turn this on|off, based on their needs. Probably not hard to implement, based on some configuration option or exchange property. If you have any preference, just let me know. Will try to integrate into patch as well, but probably not very soon. > BTW: What is your use case. You need to use the actual JMSMessageID > for track and tracing or some sort? Yes, merely for track and tracing. Our application fires messages to a remote system via IBM MQ, and for audit and troubleshooting reasons, we log the message IDs of our all the outgoing messages. We were actually surprised Camel did not provide this out of the box, but maybe we just have strange requirements .. ;-) |
|
Hi there,
So did you guys found out the way how to implement this. I tried the patch but i get the following error: Also pasting my configuration and java code. Please help. java.lang.UnsupportedOperationException: JMSCC0029: A destination must be specified when sending from this producer. at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)[:1.6.0_32] at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:39)[:1.6.0_32] at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:27)[:1.6.0_32] at java.lang.reflect.Constructor.newInstance(Constructor.java:513)[:1.6.0_32] at com.ibm.msg.client.commonservices.j2se.NLSServices.createException(NLSServices.java:313)[com.ibm.mqjms.jar:7.0.1.0 - k000-L090724] at com.ibm.msg.client.commonservices.nls.NLSServices.createException(NLSServices.java:390)[com.ibm.mqjms.jar:7.0.1.0 - k000-L090724] at com.ibm.msg.client.jms.internal.JmsErrorUtils.createException(JmsErrorUtils.java:104)[com.ibm.mqjms.jar:7.0.1.0 - k000-L090724] at com.ibm.msg.client.jms.internal.JmsMessageProducerImpl.checkNotUnidentifiedProducer(JmsMessageProducerImpl.java:1058)[com.ibm.mqjms.jar:7.0.1.0 - k000-L090724] at com.ibm.msg.client.jms.internal.JmsMessageProducerImpl.send_(JmsMessageProducerImpl.java:725)[com.ibm.mqjms.jar:7.0.1.0 - k000-L090724] at com.ibm.msg.client.jms.internal.JmsMessageProducerImpl.send(JmsMessageProducerImpl.java:406)[com.ibm.mqjms.jar:7.0.1.0 - k000-L090724] at com.ibm.mq.jms.MQMessageProducer.send(MQMessageProducer.java:299)[com.ibm.mqjms.jar:7.0.1.0 - k000-L090724] at org.springframework.jms.core.JmsTemplate.doSend(JmsTemplate.java:592)[spring-jms-3.0.7.RELEASE.jar:3.0.7.RELEASE] at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSend(JmsConfiguration.java:274)[file:/C:/workspace/OceanviewEAI/bin/:] at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.doSendToDestination(JmsConfiguration.java:213)[file:/C:/workspace/OceanviewEAI/bin/:] at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.access$1(JmsConfiguration.java:203)[file:/C:/workspace/OceanviewEAI/bin/:] at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate$2.doInJms(JmsConfiguration.java:179)[file:/C:/workspace/OceanviewEAI/bin/:] at org.springframework.jms.core.JmsTemplate.execute(JmsTemplate.java:466)[spring-jms-3.0.7.RELEASE.jar:3.0.7.RELEASE] at org.apache.camel.component.jms.JmsConfiguration$CamelJmsTemplate.send(JmsConfiguration.java:177)[file:/C:/workspace/OceanviewEAI/bin/:] at org.apache.camel.component.jms.JmsProducer.doSend(JmsProducer.java:375)[file:/C:/workspace/OceanviewEAI/bin/:] at org.apache.camel.component.jms.JmsProducer.processInOnly(JmsProducer.java:313)[file:/C:/workspace/OceanviewEAI/bin/:] at org.apache.camel.component.jms.JmsProducer.process(JmsProducer.java:111)[file:/C:/workspace/OceanviewEAI/bin/:] at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:115)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:285)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:110)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:71)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:333)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:223)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:45)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:304)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.processor.Pipeline.process(Pipeline.java:117)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.processor.ChoiceProcessor.process(ChoiceProcessor.java:81)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:71)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:333)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:223)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:45)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:304)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.processor.Pipeline.process(Pipeline.java:117)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:45)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.processor.UnitOfWorkProcessor.processAsync(UnitOfWorkProcessor.java:150)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:117)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.processor.RouteInflightRepositoryProcessor.processNext(RouteInflightRepositoryProcessor.java:50)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:71)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:86)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.impl.DefaultScheduledPollConsumer.poll(DefaultScheduledPollConsumer.java:64)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.impl.ScheduledPollConsumer.doRun(ScheduledPollConsumer.java:138)[camel-core-2.9.2.jar:2.9.2] at org.apache.camel.impl.ScheduledPollConsumer.run(ScheduledPollConsumer.java:90)[camel-core-2.9.2.jar:2.9.2] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)[:1.6.0_32] at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317)[:1.6.0_32] at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150)[:1.6.0_32] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98)[:1.6.0_32] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180)[:1.6.0_32] at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:204)[:1.6.0_32] at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)[:1.6.0_32] at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)[:1.6.0_32] at java.lang.Thread.run(Thread.java:662)[:1.6.0_32] route ================================================= <route> <from uri="bean:readyOutgoingMessageSql?method=getRawMessage"/> <to uri="bean:readyOutgoingMessageSql?method=getMessageFormat"/> <process ref="readyOutgoingMessageProcessor"/> <choice> <when> <simple>${in.header.mqDefId} == '1'</simple> <to uri="dtccPol-JMS:queue:LQ_SPO"/> <to uri="bean:printer?method=printFileNames" /> </when> <when> <simple>${in.header.mqDefId} == '2'</simple> <to uri="dtccDox-JMS:queue:LQ_DOX"/> <to uri="bean:printer?method=printFileNames" /> </when> <otherwise> <to uri="bean:printer?method=printFileNames" /> </otherwise> </choice> </route> Processor ======================================= public class OutgoingMessageProcessor implements Processor { private static Logger log = Logger.getLogger(OutgoingMessageProcessor.class); @Override public void process(Exchange ex) throws Exception { if(ex.getIn().getBody()!=null){ Map<String, Object> m = ex.getProperties(); WrappedMessageModel wm = (WrappedMessageModel)ex.getIn().getBody(); if(wm.getMessage()!=null && wm.getMessageFormat()!=null){ ex.setPattern(ExchangePattern.InOnly); ex.getIn().setHeader("mqDefId", wm.getMessageFormat().getMqDefId()); ex.getIn().setBody(wm.getMessage().getMessage()); } } } } where get message return byte[] JMS Configuration =========================================== <bean id="dtccDoxConnectionFactory" class="com.ibm.mq.jms.MQXAQueueConnectionFactory"> <property name="hostName" value="127.0.0.1" /> <property name="port" value="1414" /> <property name="queueManager" value="booradley" /> <property name="channel" value="booradley" /> <property name="transportType" value="1" /> </bean> <bean id="atomikosDtccDoxConnectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean" init-method="init" destroy-method="close"> <property name="xaConnectionFactory"><ref bean="dtccDoxConnectionFactory"/></property> <property name="uniqueResourceName" value="DTCC_DOX_JMS"/> <property name="poolSize" value="50"/> </bean> <bean id="dtccDoxJmsConfig" class="org.apache.camel.component.jms.JmsConfiguration"> <property name="connectionFactory" ref="atomikosDtccDoxConnectionFactory" /> <property name="transactionManager" ref="JtaTransactionManager"/> </bean> <bean id="dtccDox-JMS" class="org.apache.camel.component.jms.JmsComponent"> <property name="configuration" ref="dtccDoxJmsConfig" /> </bean> |
| Powered by Nabble | Edit this page |
