JMSPage edited by Claus Ibsen
Comment:
fixed examples
Changes (3)
Full ContentJMS Component
The JMS component allows messages to be sent to (or consumed from) a JMS Queue or Topic. The implementation of the JMS Component uses Spring's JMS support for declarative transactions, using Spring's JmsTemplate for sending and a MessageListenerContainer for consuming. Maven users will need to add the following dependency to their pom.xml for this component: <dependency> <groupId>org.apache.camel</groupId> <artifactId>camel-jms</artifactId> <version>x.x.x</version> <!-- use the same version as your Camel core version --> </dependency> URI formatjms:[queue:|topic:]destinationName[?options] Where destinationName is a JMS queue or topic name. By default, the destinationName is interpreted as a queue name. For example, to connect to the queue, FOO.BAR use: jms:FOO.BAR You can include the optional queue: prefix, if you prefer: jms:queue:FOO.BAR To connect to a topic, you must include the topic: prefix. For example, to jms:topic:Stocks.Prices You append query options to the URI using the following format, ?option=value&option=value&... NotesUsing ActiveMQThe JMS component reuses Spring 2's JmsTemplate for sending messages. This is not ideal for use in a non-J2EE container and typically requires some caching in the JMS provider to avoid poor performance. If you intend to use Apache ActiveMQ as your Message Broker - which is a good choice as ActiveMQ rocks
Transactions and Cache Levels
If you are not using XA, then you should consider caching as it speeds up performance, such as setting cacheLevelName=CACHE_CONSUMER. Through Camel 2.7.x, the default setting for cacheLevelName is CACHE_CONSUMER. You will need to explicitly set cacheLevelName=CACHE_NONE.
So you can say the default setting is conservative. Consider using cacheLevelName=CACHE_CONSUMER if you are using non-XA transactions. Durable SubscriptionsIf you wish to use durable topic subscriptions, you need to specify both clientId and durableSubscriptionName. The value of the clientId must be unique and can only be used by a single JMS connection instance in your entire network. You may prefer to use Virtual Topics instead to avoid this limitation. More background on durable messaging here. Message Header MappingWhen using message headers, the JMS specification states that header names must be valid Java identifiers. So, by default, Camel ignores any headers that do not match this rule. So try to name your headers as if they are valid Java identifiers. One benefit of doing this is that you can then use your headers inside a JMS Selector (whose SQL92 syntax mandates Java identifier syntax for headers). A simple strategy for mapping header names is used by default. The strategy is to replace any dots and hyphens in the header name as shown below and to reverse the replacement when the header name is restored from a JMS message sent over the wire. What does this mean? No more losing method names to invoke on a bean component, no more losing the filename header for the File Component, and so on. The current header name strategy for accepting header names in Camel is as follows:
OptionsYou can configure many different properties on the JMS endpoint which map to properties on the JMSConfiguration POJO.
The options are divided into two tables, the first one with the most common options used. The latter contains the rest. Most commonly used options
All the other options
Message Mapping between JMS and CamelCamel automatically maps messages between javax.jms.Message and org.apache.camel.Message. When sending a JMS message, Camel converts the message body to the following JMS message types:
When receiving a JMS message, Camel converts the JMS message to the following body type:
Disabling auto-mapping of JMS messagesYou can use the mapJmsMessage option to disable the auto-mapping above. If disabled, Camel will not try to map the received JMS message, but instead uses it directly as the payload. This allows you to avoid the overhead of mapping and let Camel just pass through the JMS message. For instance, it even allows you to route javax.jms.ObjectMessage JMS messages with classes you do not have on the classpath. Using a custom MessageConverterYou can use the messageConverter option to do the mapping yourself in a Spring org.springframework.jms.support.converter.MessageConverter class. For example, in the route below we use a custom message converter when sending a message to the JMS order queue:
from("file://inbox/order").to("jms:queue:order?messageConverter=#myMessageConverter");
You can also use a custom message converter when consuming from a JMS destination. Controlling the mapping strategy selectedYou can use the jmsMessageType option on the endpoint URL to force a specific message type for all messages.
from("file://inbox/order").to("jms:queue:order?jmsMessageType=Text");
You can also specify the message type to use for each messabe by setting the header with the key CamelJmsMessageType. For example:
from("file://inbox/order").setHeader("CamelJmsMessageType", JmsMessageType.Text).to("jms:queue:order");
The possible values are defined in the enum class, org.apache.camel.jms.JmsMessageType. Message format when sendingThe exchange that is sent over the JMS wire must conform to the JMS Message spec. For the exchange.in.header the following rules apply for the header keys:
For the exchange.in.header, the following rules apply for the header values:
Camel will log with category org.apache.camel.component.jms.JmsBinding at DEBUG level if it drops a given header value. For example:
2008-07-09 06:43:04,046 [main ] DEBUG JmsBinding
- Ignoring non primitive header: order of class: org.apache.camel.component.jms.issues.DummyOrder with value: DummyOrder{orderId=333, itemId=4444, quantity=2}
Message format when receivingCamel adds the following properties to the Exchange when it receives a message:
Camel adds the following JMS properties to the In message headers when it receives a JMS message:
As all the above information is standard JMS you can check the JMS documentation for further details. About using Camel to send and receive messages and JMSReplyToThe JMS component is complex and you have to pay close attention to how it works in some cases. So this is a short summary of some of the areas/pitfalls to look for. When Camel sends a message using its JMSProducer, it checks the following conditions:
All this can be a tad complex to understand and configure to support your use case. JmsProducerThe JmsProducer behaves as follows, depending on configuration:
JmsConsumerThe JmsConsumer behaves as follows, depending on configuration:
So pay attention to the message exchange pattern set on your exchanges. If you send a message to a JMS destination in the middle of your route you can specify the exchange pattern to use, see more at Request Reply. from("activemq:queue:in") .to("bean:validateOrder") .to(ExchangePattern.InOnly, "activemq:topic:order") .to("bean:handleOrder"); Reuse endpoint and send to different destinations computed at runtimeIf you need to send messages to a lot of different JMS destinations, it makes sense to reuse a JMS endpoint and specify the real destination in a message header. This allows Camel to reuse the same endpoint, but send to different destinations. This greatly reduces the number of endpoints created and economizes on memory and thread resources. You can specify the destination in the following headers:
For example, the following route shows how you can compute a destination at run time and use it to override the destination appearing in the JMS URL: from("file://inbox") .to("bean:computeDestination") .to("activemq:queue:dummy"); The queue name, dummy, is just a placeholder. It must be provided as part of the JMS endpoint URL, but it will be ignored in this example. In the computeDestination bean, specify the real destination by setting the CamelJmsDestinationName header as follows: public void setJmsHeader(Exchange exchange) { String id = .... exchange.getIn().setHeader("CamelJmsDestinationName", "order:" + id"); } Then Camel will read this header and use it as the destination instead of the one configured on the endpoint. So, in this example Camel sends the message to activemq:queue:order:2, assuming the id value was 2. If both the CamelJmsDestination and the CamelJmsDestinationName headers are set, CamelJmsDestination takes priority. Configuring different JMS providersYou can configure your JMS provider in Spring XML as follows: <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring"> <jmxAgent id="agent" disabled="true"/> </camelContext> <bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent"> <property name="connectionFactory"> <bean class="org.apache.activemq.ActiveMQConnectionFactory"> <property name="brokerURL" value="vm://localhost?broker.persistent=false&broker.useJmx=false"/> </bean> </property> </bean> Basically, you can configure as many JMS component instances as you wish and give them a unique name using the id attribute. The preceding example configures an activemq component. You could do the same to configure MQSeries, TibCo, BEA, Sonic and so on. Once you have a named JMS component, you can then refer to endpoints within that component using URIs. For example for the component name, activemq, you can then refer to destinations using the URI format, activemq:[queue:|topic:]destinationName. You can use the same approach for all other JMS providers. This works by the SpringCamelContext lazily fetching components from the spring context for the scheme name you use for Endpoint URIs and having the Component resolve the endpoint URIs. Using JNDI to find the ConnectionFactoryIf you are using a J2EE container, you might need to look up JNDI to find the JMS ConnectionFactory rather than use the usual <bean> mechanism in Spring. You can do this using Spring's factory bean or the new Spring XML namespace. For example: <bean id="weblogic" class="org.apache.camel.component.jms.JmsComponent"> <property name="connectionFactory" ref="myConnectionFactory"/> </bean> <jee:jndi-lookup id="myConnectionFactory" jndi-name="jms/connectionFactory"/> See The jee schema in the Spring reference documentation for more details about JNDI lookup. Concurrent ConsumingA common requirement with JMS is to consume messages concurrently in multiple threads in order to make an application more responsive. You can set the concurrentConsumers option to specify the number of threads servicing the JMS endpoint, as follows:
from("jms:SomeQueue?concurrentConsumers=20").
bean(MyClass.class);
You can configure this option in one of the following ways:
Request-reply over JMSCamel supports Request Reply over JMS. In essence the MEP of the Exchange should be InOut when you send a message to a JMS queue. Camel will automatic setup a consumer which listen on the reply queue, so you should not do anything.
from(xxx)
.inOut().to("activemq:queue:foo")
.threads(5)
.to(yyy)
.to(zzz);
In this route we instruct Camel to route replies asynchronously using a thread pool with 5 threads. Request-reply over JMS and using a shared fixed reply queueIf you use a fixed reply queue when doing Request Reply over JMS as shown in the example below, then pay attention.
from(xxx)
.inOut().to("activemq:queue:foo?replyTo=bar")
.to(yyy)
In this example the fixed reply queue named "bar" is used. By default Camel assumes the queue is shared when using fixed reply queues, and therefore it uses a JMSSelector to only pickup the expected reply messages (eg based on the JMSCorrelationID). See next section for exclusive fixed reply queues. That means its not as fast as temporary queues. You can speedup how often Camel will pull for reply messages using the receiveTimeout option. By default its 1000 millis. So to make it faster you can set it to 250 millis to pull 4 times per second as shown:
from(xxx)
.inOut().to("activemq:queue:foo?replyTo=bar&receiveTimeout=250")
.to(yyy)
Notice this will cause the Camel to send pull requests to the message broker more frequent, and thus require more network traffic. Request-reply over JMS and using an exclusive fixed reply queueAvailable as of Camel 2.9 In the previous example, Camel would anticipate the fixed reply queue named "bar" was shared, and thus it uses a JMSSelector to only consume reply messages which it expects. However there is a drawback doing this as JMS selectos is slower. Also the consumer on the reply queue is slower to update with new JMS selector ids. In fact it only updates when the receiveTimeout option times out, which by default is 1 second. So in theory the reply messages could take up till about 1 sec to be detected. On the other hand if the fixed reply queue is exclusive to the Camel reply consumer, then we can avoid using the JMS selectors, and thus be more performant. In fact as fast as using temporary queues. So in Camel 2.9 onwards we introduced the ReplyToType option which you can configure to Exclusive
from(xxx)
.inOut().to("activemq:queue:foo?replyTo=bar&replyToType=Exclusive")
.to(yyy)
Mind that the queue must be exclusive to each and every endpoint. So if you have two routes, then they each need an unique reply queue as shown in the next example: from(xxx) .inOut().to("activemq:queue:foo?replyTo=bar&replyToType=Exclusive") .to(yyy) from(aaa) .inOut().to("activemq:queue:order?replyTo=order.reply&replyToType=Exclusive") .to(bbb) The same applies if you run in a clustered environment. Then each node in the cluster must use an unique reply queue name. As otherwise each node in the cluster may pickup messages which was intended as a reply on another node. For clustered environments its recommended to use shared reply queues instead. Synchronizing clocks between senders and receiversWhen doing messaging between systems, its desirable that the systems have synchronized clocks. For example when sending a JMS message, then you can set a time to live value on the message. Then the receiver can inspect this value, and determine if the message is already expired, and thus drop the message instead of consume and process it. However this requires that both sender and receiver have synchronized clocks. If you are using ActiveMQ then you can use the timestamp plugin to synchronize clocks. About time to liveRead first above about synchronized clocks. When you do request/reply (InOut) over JMS with Camel then Camel uses a timeout on the sender side, which is default 20 seconds from the requestTimeout option. You can control this by setting a higher/lower value. However the time to live value is still set on the JMS message being send. So that requires the clocks to be synchronized between the systems. If they are not, then you may want to disable the time to live value being set. This is now possible using the disableTimeToLive option from Camel 2.8 onwards. So if you set this option to disableTimeToLive=true, then Camel does not set any time to live value when sending JMS messages. But the request timeout is still active. So for example if you do request/reply over JMS and have disabled time to live, then Camel will still use a timeout by 20 seconds (the requestTimeout option). That option can of course also be configured. So the two options requestTimeout and disableTimeToLive gives you fine grained control when doing request/reply. When you do fire and forget (InOut) over JMS with Camel then Camel by default does not set any time to live value on the message. You can configure a value by using the timeToLive option. For example to indicate a 5 sec., you set timeToLive=5000. The option disableTimeToLive can be used to force disabling the time to live, also for InOnly messaging. The requestTimeout option is not being used for InOnly messaging. Enabling Transacted ConsumptionA common requirement is to consume from a queue in a transaction and then process the message using the Camel route. To do this, just ensure that you set the following properties on the component/endpoint:
See the Transactional Client EIP pattern for further details.
Using JMSReplyTo for late repliesWhen using Camel as a JMS listener, it sets an Exchange property with the value of the ReplyTo javax.jms.Destination object, having the key ReplyTo. You can obtain this Destination as follows: Destination replyDestination = exchange.getIn().getHeader(JmsConstants.JMS_REPLY_DESTINATION, Destination.class); And then later use it to send a reply using regular JMS or Camel.
// we need to pass in the JMS component, and in this sample we use ActiveMQ
JmsEndpoint endpoint = JmsEndpoint.newInstance(replyDestination, activeMQComponent);
// now we have the endpoint we can use regular Camel API to send a message to it
template.sendBody(endpoint, "Here is the late reply.");
A different solution to sending a reply is to provide the replyDestination object in the same Exchange property when sending. Camel will then pick up this property and use it for the real destination. The endpoint URI must include a dummy destination, however. For example:
// we pretend to send it to some non existing dummy queue
template.send("activemq:queue:dummy, new Processor() {
public void process(Exchange exchange) throws Exception {
// and here we override the destination with the ReplyTo destination object so the message is sent to there instead of dummy
exchange.getIn().setHeader(JmsConstants.JMS_DESTINATION, replyDestination);
exchange.getIn().setBody("Here is the late reply.");
}
}
Using a request timeoutIn the sample below we send a Request Reply style message Exchange (we use the requestBody method = InOut) to the slow queue for further processing in Camel and we wait for a return reply: // send a in-out with a timeout for 5 sec Object out = template.requestBody("activemq:queue:slow?requestTimeout=5000", "Hello World"); SamplesJMS is used in many examples for other components as well. But we provide a few samples below to get started. Receiving from JMSIn the following sample we configure a route that receives JMS messages and routes the message to a POJO: from("jms:queue:foo"). to("bean:myBusinessLogic"); You can of course use any of the EIP patterns so the route can be context based. For example, here's how to filter an order topic for the big spenders: from("jms:topic:OrdersTopic"). filter().method("myBean", "isGoldCustomer"). to("jms:queue:BigSpendersQueue"); Sending to a JMSIn the sample below we poll a file folder and send the file content to a JMS topic. As we want the content of the file as a TextMessage instead of a BytesMessage, we need to convert the body to a String: from("file://orders"). convertBodyTo(String.class). to("jms:topic:OrdersTopic"); Using AnnotationsCamel also has annotations so you can use POJO Consuming and POJO Producing. Spring DSL sampleThe preceding examples use the Java DSL. Camel also supports Spring XML DSL. Here is the big spender sample using Spring DSL: <route> <from uri="jms:topic:OrdersTopic"/> <filter> <method bean="myBean" method="isGoldCustomer"/> <to uri="jms:queue:BigSpendersQueue"/> </filter> </route> Other samplesJMS appears in many of the examples for other components and EIP patterns, as well in this Camel documentation. So feel free to browse the documentation. If you have time, check out the this tutorial that uses JMS but focuses on how well Spring Remoting and Camel works together Tutorial-JmsRemoting. Using JMS as a Dead Letter Queue storing ExchangeNormally, when using JMS as the transport, it only transfers the body and headers as the payload. If you want to use JMS with a Dead Letter Channel, using a JMS queue as the Dead Letter Queue, then normally the caused Exception is not stored in the JMS message. You can, however, use the transferExchange option on the JMS dead letter queue to instruct Camel to store the entire Exchange in the queue as a javax.jms.ObjectMessage that holds a org.apache.camel.impl.DefaultExchangeHolder. This allows you to consume from the Dead Letter Queue and retrieve the caused exception from the Exchange property with the key Exchange.EXCEPTION_CAUGHT. The demo below illustrates this: // setup error handler to use JMS as queue and store the entire Exchange errorHandler(deadLetterChannel("jms:queue:dead?transferExchange=true")); Then you can consume from the JMS queue and analyze the problem: from("jms:queue:dead").to("bean:myErrorAnalyzer"); // and in our bean String body = exchange.getIn().getBody(); Exception cause = exchange.getProperty(Exchange.EXCEPTION_CAUGHT, Exception.class); // the cause message is String problem = cause.getMessage(); Using JMS as a Dead Letter Channel storing error onlyYou can use JMS to store the cause error message or to store a custom body, which you can initialize yourself. The following example uses the Message Translator EIP to do a transformation on the failed exchange before it is moved to the JMS dead letter queue: // we sent it to a seda dead queue first errorHandler(deadLetterChannel("seda:dead")); // and on the seda dead queue we can do the custom transformation before its sent to the JMS queue from("seda:dead").transform(exceptionMessage()).to("jms:queue:dead"); Here we only store the original cause error message in the transform. You can, however, use any Expression to send whatever you like. For example, you can invoke a method on a Bean or use a custom processor. Sending an InOnly message and keeping the JMSReplyTo headerWhen sending to a JMS destination using camel-jms the producer will use the MEP to detect if its InOnly or InOut messaging. However there can be times where you want to send an InOnly message but keeping the JMSReplyTo header. To do so you have to instruct Camel to keep it, otherwise the JMSReplyTo header will be dropped. For example to send an InOnly message to the foo queue, but with a JMSReplyTo with bar queue you can do as follows:
template.send("activemq:queue:foo?preserveMessageQos=true", new Processor() {
public void process(Exchange exchange) throws Exception {
exchange.getIn().setBody("World");
exchange.getIn().setHeader("JMSReplyTo", "bar");
}
});
Notice we use preserveMessageQos=true to instruct Camel to keep the JMSReplyTo header. Setting JMS provider options on the destinationSome JMS providers, like IBM's WebSphere MQ need options to be set on the JMS destination. For example, you may need to specify the targetClient option. Since targetClient is a WebSphere MQ option and not a Camel URI option, you need to set that on the JMS destination name like so: ... .setHeader("CamelJmsDestinationName", constant("queue:///MY_QUEUE?targetClient=1")) .to("wmq:queue:MY_QUEUE?useMessageIDAsCorrelationID=true"); Some versions of WMQ won't accept this option on the destination name and you will get an exception like:
A workaround is to use a custom DestinationResolver: JmsComponent wmq = new JmsComponent(connectionFactory); wmq.setDestinationResolver(new DestinationResolver(){ public Destination resolveDestinationName(Session session, String destinationName, boolean pubSubDomain) throws JMSException { MQQueueSession wmqSession = (MQQueueSession) session; return wmqSession.createQueue("queue:///" + destinationName + "?targetClient=1"); } }); See Also
Change Notification Preferences
View Online
|
View Changes
|
Add Comment
|
| Powered by Nabble | Edit this page |
