[CONF] Apache Camel: JMS (page edited)

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[CONF] Apache Camel: JMS (page edited)

Dhiraj Bokde (Confluence)

JMS has been edited by willem jiang (Nov 12, 2008).

Change summary:

CAMEL-246 allowed the username and password to be specified on the JMSComponent

(View changes)

Content:

JMS Component

The JMS component allows messages to be sent to a JMS Queue or Topic; or messages to be consumed from a JMS Queue or Topic. The implementation of the JMS Component uses Spring's JMS support for declarative transactions, using Spring's JmsTemplate for sending and a MessageListenerContainer for consuming.

URI format

jms:[topic:]destinationName?properties

So for example to send to queue FOO.BAR you would use

jms:FOO.BAR

You can be completely specific if you wish via

jms:queue:FOO.BAR

If you want to send to a topic called Stocks.Prices then you would use

jms:topic:Stocks.Prices

Using Temporary Destinations

As of 1.4.0 of Camel you can use temporary queues using the following URL format

jms:temp:queue:foo

or temporary topics as

jms:temp:topic:bar

Where foo and bar, the text after the String jms:temp:queue: or jms:temp:topic:, are the names of the destinations. This enables multiple routes or processors or beans to refer to the same temporary destination. e.g. you can create 3 temporary destinations and use them in routes as inputs or outputs by referring to them by name.

Notes

If you are using ActiveMQ

Note that the JMS component reuses Spring 2's JmsTemplate for sending messages. This is not ideal for use in a non-J2EE container and typically requires some caching JMS provider to avoid performance being lousy.

So if you intent to use Apache ActiveMQ as your Message Broker - which is a good choice as ActiveMQ rocks , then we recommend that you either

  • use the ActiveMQ component which is already configured to use ActiveMQ efficiently
  • use the PoolingConnectionFactory in ActiveMQ

If you wish to use durable topic subscriptions, you need to specify both clientId and durableSubscriberName. Note that the value of the clientId must be unique and can only be used by a single JMS connection instance in your entire network. You may prefer to use Virtual Topics instead to avoid this limitation. More background on durable messaging here.

When using message headers; the JMS specification states that header names must be valid Java identifiers. So by default camel will ignore any headers which do not match this rule. So try name your headers as if they are valid Java identifiers. One added bonus of this is that you can then use your headers inside a JMS Selector - which uses SQL92 syntax which mandates Java identifier syntax for headers.

From Camel 1.4 a simple strategy for mapping headers names is used by default. The strategy is to replace any dots in the headername with underscore, and vice-versa when the header name is restored from the JMS message that was sent over the wire. What does this means? No more loosing method names to invoke on a bean component, no more loosing the filename header for the File Component etc.

Current header name strategy used for accepting header names in Camel:

  • replace all dots with underscores (e.g. org.apache.camel.MethodName => org_apache_camel_MethodName)
  • test if the name is a valid java identifier using the JDK core classes
  • if test success then the header is added and sent over the wire, if not its dropped (logged at DEBUG level)
For Consuming Messages cacheLevelName settings are vital!

If you are using Spring before 2.5.1 and Camel before 1.3.0 then you might want to set the cacheLevelName to be CACHE_CONSUMER for maximum performance.

Due to a bug in earlier Spring versions causing a lack of transactional integrity, previous versions of Camel and Camel versions from 1.3.0 onwwards when used with earlier Spring versions than 2.5.1 will default to use CACHE_CONNECTION. See the JIRAs CAMEL-163 and CAMEL-294.

Also if you are using XA or running in a J2EE container then you may want to set the cacheLevelName to be CACHE_NONE as we have seen using JBoss with TibCo EMS and JTA/XA you must disable caching.

Properties

You can configure lots of different properties on the JMS endpoint which map to properties on the JMSConfiguration POJO. Notice: Many of these properties maps to properties on Spring JMS that Camel uses for sending and receiving messages. So you can get more information about these properties by consulting the Spring documentation.

Property Default Value Description
acceptMessagesWhileStopping false Should the consumer accept messages while it is stopping
acknowledgementModeName "AUTO_ACKNOWLEDGE" The JMS acknowledgement name which is one of: TRANSACTED, CLIENT_ACKNOWLEDGE, AUTO_ACKNOWLEDGE, DUPS_OK_ACKNOWLEDGE
acknowledgementMode -1 The JMS acknowledgement mode defined as an Integer. Allows to set vendor-specific extensions to the acknowledgment mode. For the regular modes prefer to use the acknowledgementModeName instead.
alwaysCopyMessage false If true then Camel will always make a JMS message copy of the message when it's passed to the producer for sending. Copying the message is needed in some situations such as when a replyToDestinationSelectorName is set (Camel will by the way set the alwaysCopyMessage to true if a replyToDestinationSelectorName is set)
autoStartup true Should the consumer container auto-startup
cacheLevelName "CACHE_CONSUMER" Sets the cache level by name for the underlying JMS resources. Possible values are: CACHE_AUTO, CACHE_CONNECTION, CACHE_CONSUMER, CACHE_NONE and CACHE_SESSION. See the Spring documentation.
cacheLevel -1 Sets the cache level by id for the underlying JMS resources
clientId null Sets the JMS client ID to use. Note that this value if specified must be unique and can only be used by a single JMS connection instance. Its typically only required for durable topic subscriptions. You may prefer to use Virtual Topics instead
consumerType Default The consumer type to use, either: Simple, Default or ServerSessionPool. The consumer type determines which Spring JMS listener should be used. Default will use org.springframework.jms.listener.DefaultMessageListenerContainer. Simple will use org.springframework.jms.listener.SimpleMessageListenerContainer and ServerSessionPool will use org.springframework.jms.listener.serversession.ServerSessionMessageListenerContainer. If option useVersion102=true then Camel will of course use the JMS 1.0.2 Spring classes instead. ServerSessionPool is @deprecated and will be removed in Camel 2.0.
concurrentConsumers 1 Specifies the default number of concurrent consumers
connectionFactory null The default JMS connection factory to use for the listenerConnectionFactory and templateConnectionFactory if neither are specified
deliveryPersistent true Is persistent delivery used by default?
disableReplyTo false Do you want to ignore the JMSReplyTo header and so treat messages as InOnly by default and not send a reply back?
durableSubscriptionName null The durable subscriber name for specifying durable topic subscriptions
eagerLoadingOfProperties false Enables eager loading of JMS properties as soon as a message is received which generally is inefficient as the JMS properties may not be required but sometimes can catch early any issues with the underlying JMS provider and the use of JMS properties. Can be used for testing purpose to ensure JMS properties can be understood and handled correctly.
exceptionListener null The JMS Exception Listener used to be notified of any underlying JMS exceptions
explicitQosEnabled false Set if the deliveryMode, priority or timeToLive should be used when sending messages
exposeListenerSession true Set if the listener session should be exposed when consuming messages
idleTaskExecutionLimit 1 Specify the limit for idle executions of a receive task, not having received any message within its execution. If this limit is reached, the task will shut down and leave receiving to other executing tasks (in case of dynamic scheduling; see the "maxConcurrentConsumers" setting).
jmsOperations null Allow to use your own implementation of the org.springframework.jms.core.JmsOperations interface. Camel uses JmsTemplate as default. Can be used for testing purpose, but not used much as stated in the spring API docs.
listenerConnectionFactory null The JMS connection factory used for consuming messages
maxConcurrentConsumers 1 Specifies the maximum number of concurrent consumers
maxMessagesPerTask 1 The number of messages per task
messageConverter null The Spring Message Converter
messageIdEnabled true When sending, should message IDs be added
messageTimestampEnabled true Should timestamps be enabled by default on sending messages
password null The password which is set for the connector factory
priority -1 Values of > 1 specify the message priority when sending, if the explicitQosEnabled property is specified (with 0 as the lowest priority and 9 as the highest)
preserveMessageQos false Set to true if you want to send message using the QoS settings specified on the message, instead of the QoS settings on the JMS endpoint
pubSubNoLocal false Set whether to inhibit the delivery of messages published by its own connection
selector null Sets the JMS Selector which is an SQL 92 predicate used to apply to messages to filter them at the message broker. You may have to encode special characters such as = as %3D
receiveTimeout none The timeout when receiving messages
recoveryInterval none The recovery interval
replyToTempDestinationAffinity endpoint defines the component created temporary replyTo destination sharing strategy. Possible values are: component, endpoint or producer. component = a single temp queue is shared among all producers for a given component instance. endpoint = a single temp queue is shared among all producers for a given endpoint instance. producer = a single temp queue is created per producer.
replyToDestination null Provides an explicit reply to destination which overrides any incoming value of Message.getJMSReplyTo()
replyToDestinationSelectorName null Sets the JMS Selector using the fixed name to be used so you can filter out your own replies from the others, when using a shared queue (i.e. if you are not using a temporary reply queue).
replyToDeliveryPersistent true Is persistent delivery used by default for reply?
requestTimeout 20000 The timeout when sending messages
serverSessionFactory null @deprecated - will be removed in Camel 2.0. The JMS ServerSessionFactory if you wish to use ServerSessionFactory for consumption
subscriptionDurable false Enabled by default if you specify a durableSubscriberName and a clientId
taskExecutor null Allows you to specify a custom task executor for consuming messages
templateConnectionFactory null The JMS connection factory used for sending messages
timeToLive null Is a time to live specified when sending messages
transacted false Is transacted mode used for sending/receiving messages?
transactionManager null The Spring transaction manager to use
transactionName null The name of the transaction to use
transactionTimeout null The timeout value of the transaction if using transacted mode
username null The username which is set for the connector factory
useMessageIDAsCorrelationID false Should JMSMessageID be used as JMSCorrelationID for InOut messages. Camel will by default use a GUID
useVersion102 false Should the old JMS API be used

Message Mapping between JMS and Camel

Camel will automatically map messages between javax.jms.Message and org.apache.camel.Message.

When sending a JMS message Camel will convert the body to the following JMS message:

Body Type JMS Message Comment
String javax.jms.TextMessage  
org.w3c.dom.Node javax.jms.TextMessage The DOM will be converted to String
Map javax.jms.MapMessage  
java.io.Serializable javax.jms.ObjectMessage  
byte[] javax.jms.BytesMessage  
java.io.File javax.jms.BytesMessage  
java.io.Reader javax.jms.BytesMessage  
java.io.InputStream javax.jms.BytesMessage  
java.nio.ByteBuffer javax.jms.BytesMessage  

When receiving a JMS message Camel will convert the JMS message to the following body type:

JMS Message Body Type Comment
javax.jms.TextMessage String  
javax.jms.BytesMessage byte[]  
javax.jms.MapMessage Map<String, Object>  
javax.jms.ObjectMessage Object  

Message format when sending

The exchange that is sent over the JMS wire must conform to the JMS Message spec.

For the exchange.in.header the following rules apply for the keys:

  • Keys stating with JMS or JMSX is reserved. All user keys will be dropped.
  • exchange.in.headers keys must be literals and all be valid Java identifiers. (do not use dots in the key name)
  • In Camel 1.4 onwards Camel will automatically replace all dots with underscore for key names. And vice-versa when Camel consumes JMS messages.

For the exchange.in.header the following rules apply for the values:

  • The values must be primitives or their counter objects (such as Integer, Long, Character). String, CharSequence, Date, BigDecimal or BigInteger is all converted to their toString() representation. All other types is dropped.

Camel will log with category org.apache.camel.component.jms.JmsBinding at DEBUG level if it drops a given header value. Example:

2008-07-09 06:43:04,046 [main           ] DEBUG JmsBinding  
  - Ignoring non primitive header: order of class: org.apache.camel.component.jms.issues.DummyOrder with value: DummyOrder{orderId=333, itemId=4444, quantity=2}

Message format when receiving

Camel will add the following properties to the Exchange when it receives a message:

Property Type Description
org.apache.camel.jms.replyDestination javax.jms.Destination The reply destination

Camel will add the following JMS properties to the IN Message headers when it receives a JMS message:

Header Type Description
JMSCorrelationID String The JMS correlation id
JMSDeliveryMode int The JMS delivery mode
JMSDestination javax.jms.Destination The JMS destination
JMSExpiration long The JMS expiration
JMSMessageID String The JMS unique message id
JMSPriority int The JMS priority (with 0 as the lowest priority and 9 as the highest)
JMSRedelivered boolean Is the JMS message redelivered
JMSReplyTo javax.jms.Destination The JMS reply to destination
JMSTimestamp long The JMS timestamp
JMSType String The JMS type
JMSXGroupID String The JMS group id

As all the above information is standard JMS you can check the JMS documentation for further details.

Configuring different JMS providers

You can configure your JMS provider inside the Spring XML as follows...

<camelContext id="camel" xmlns="http://activemq.apache.org/camel/schema/spring">
</camelContext>

<bean id="activemq" class="org.apache.camel.component.jms.JmsComponent">
  <property name="connectionFactory">
    <bean class="org.apache.activemq.ActiveMQConnectionFactory">
      <property name="brokerURL" value="vm://localhost?broker.persistent=false"/>
    </bean>
  </property>
</bean>

Basically you can configure as many JMS component instances as you wish and give them a unique name via the id attribute. The above example configures an 'activemq' component. You could do the same to configure MQSeries, TibCo, BEA, Sonic etc.

Once you have a named JMS component you can then refer to endpoints within that component using URIs. For example for the component name'activemq' you can then refer to destinations as activemq:[queue:|topic:]destinationName. So you could use the same approach for working with all other JMS providers.

This works by the SpringCamelContext lazily fetching components from the spring context for the scheme name you use for Endpoint URIs and having the Component resolve the endpoint URIs.

Using JNDI to find the ConnectionFactory

If you are using a J2EE container you might want to lookup in JNDI to find your ConnectionFactory rather than use the usual <bean> mechanism in spring. You can do this using Spring's factory bean or the new Spring XML namespace. e.g.

<bean id="weblogic" class="org.apache.camel.component.jms.JmsComponent">
  <property name="connectionFactory" ref="myConnectionFactory"/>
</bean>

<jee:jndi-lookup id="myConnectionFactory" jndi-name="jms/connectionFactory"/>

Concurrent Consuming

A common requirement with JMS is to consume messages concurrently in many threads to achieve high throughput. As shown above you use the concurrentConsumers property above.

from("jms:SomeQueue?concurrentConsumers=20").bean(MyClass.class);

You can configure the properties on the JmsComponent if you wish or on specific endpoints via the URI or by configuring the JmsEndpoint directly.

Enabling Transacted Consumption

A common requirement is to consume from a queue in a transaction then process the message using the Camel route. To do this just ensure you set the following properties on the component/endpoint

  • transacted = true
  • transactionManager = a Transsaction Manager - typically the JmsTransactionManager

See also the Transactional Client EIP pattern for further details.

Using request timeout

In the sample below we send a request-reply style message (we use the requestBody method = InOut) to the slow queue for further processing in Camel and we wait for a return reply.

// send a in-out with a timeout for 5 sec
Object out = template.requestBody("activemq:queue:slow?requestTimeout=5000", "Hello World");

Samples

JMS is used in many examples for other components as well. But we provide a few samples below to get started.

Receiving from JMS

In this sample we configure a route that receives JMS messages and routes the message to a POJO

from("jms:queue:foo").to("bean:myBusinessLogic");

You can of course use any of the EIP pattern so the route can be context based, such as filtering an order topic for the big spenders:

from("jms:topic:OrdersTopic").
  filter().method("myBean", "isGoldCustomer").
    to("jms:queue:BigSpendersQueue");

Sending to a JMS

In the sample below we poll a file folder and send the file content to a JMS topic. As we want the content of the file as a TextMessage instead of a BytesMessage we need to convert the body to a String.

from("file://orders").convertBodyTo(String.class).to("jms:topic:OrdersTopic");

Using Annotations

Camel also has annotations so you can use POJO Consuming and POJO Producing.

Spring DSL sample

The sample above are using the Java DSL. Camel also supports using Spring XML DSL. Here is the big spender sample using Spring DSL:

<route>
  <from uri="jms:topic:OrdersTopic"/>
  <filter>
    <method bean="myBean" method="isGoldCustomer"/>
    <to uri="jms:queue:BigSpendersQueue"/>
  </filter>
</route>

Other samples

JMS is used a lot in other samples for other components and EIP patterns as well in this Camel documentation. So feel free to browse the documentation. If you have good time then check out the this tutorial that uses JMS but focuses on how well Spring Remoting and Camel works together Tutorial-JmsRemoting.

See Also