Putting several services on the same queue

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Putting several services on the same queue

Christian Schneider
Hi,

I am building webservice adapters for a legacy system. The system can be
accessed with a java api but is not multi threaded. So my problem is
that I want to offer several services in one process and still make sure
only one service is called at a time. To make this scalable I will then
run this process several
times on the same and different machines.

I also would like to make management of this system as easy as possible.
Ideally I would like this process to be a kind of application server
where I can install and deinstall services while it is running.
Still they should listen on one queue and work single threaded. I
imagined doing this with an osgi server. The problem is that probably
then each bundle will have it´s own cxf and they are not single threaded
anymore.

I have already asked this question on the cxf list and got an idea on
how a solution could look.

The idea is to publish the services with a local endpoint only. I could
the create a route the listens on jms in camel. This route would then
figure out what local endpoint the request should go to and invokes it.
Then it forwards the reply back to jms. It could find the correct
endpoint either by analysing the namespace of the xml or by using ws
addressing. The only thing is I do not know exactly how to implement this.

I think I will have to use the jms component and the cxf component for
the ends of the route. But how do I do the routing? I could imagine a
content based router that figures out what cxf endpoint to forward to
via xpath.

from("jms:myqueue").choice()
.when(xpath("//namespace=service1namespace")).to("cxf:mybeanname:myEndpointName")
.when(xpath("//namespace=service2namespace")).to("cxf:mybeanname2:myEndpointName2")
.otherwise().to("jms:deadletterqueue")

The xpath expression is not correct. I will have to look up how to do
this. Does the rest sound ok to you?

Will a configuration like this work in osgi where each service is a
separate bundle?

Best regards

Christian
Reply | Threaded
Open this post in threaded view
|

Re: Putting several services on the same queue

Willem.Jiang
Administrator
Hi Christian,

I think camel could help you to do the most content base routing work.
Here are some thing I should point out.
1. You need to specify the reply destination of the jms endpoint or you
can't get the response message from the cxf endpoint.
2. If you want to publish the cxf service through the local transport ,
your service can only be accessible for the client in the same JVM.

Since I don't know much about osgi, but for the thread of cxf client, it
dependent on the calling thread, if the jms consumer in the camel
context is run by a single thread, it's the single thread.

Willem

Christian Schneider wrote:

> Hi,
>
> I am building webservice adapters for a legacy system. The system can
> be accessed with a java api but is not multi threaded. So my problem is
> that I want to offer several services in one process and still make
> sure only one service is called at a time. To make this scalable I
> will then run this process several
> times on the same and different machines.
>
> I also would like to make management of this system as easy as
> possible. Ideally I would like this process to be a kind of
> application server where I can install and deinstall services while it
> is running.
> Still they should listen on one queue and work single threaded. I
> imagined doing this with an osgi server. The problem is that probably
> then each bundle will have it´s own cxf and they are not single
> threaded anymore.
>
> I have already asked this question on the cxf list and got an idea on
> how a solution could look.
>
> The idea is to publish the services with a local endpoint only. I
> could the create a route the listens on jms in camel. This route would
> then figure out what local endpoint the request should go to and
> invokes it. Then it forwards the reply back to jms. It could find the
> correct endpoint either by analysing the namespace of the xml or by
> using ws addressing. The only thing is I do not know exactly how to
> implement this.
>
> I think I will have to use the jms component and the cxf component for
> the ends of the route. But how do I do the routing? I could imagine a
> content based router that figures out what cxf endpoint to forward to
> via xpath.
>
> from("jms:myqueue").choice()
> .when(xpath("//namespace=service1namespace")).to("cxf:mybeanname:myEndpointName")
>
> .when(xpath("//namespace=service2namespace")).to("cxf:mybeanname2:myEndpointName2")
>
> .otherwise().to("jms:deadletterqueue")
>
> The xpath expression is not correct. I will have to look up how to do
> this. Does the rest sound ok to you?
>
> Will a configuration like this work in osgi where each service is a
> separate bundle?
>
> Best regards
>
> Christian
>


Reply | Threaded
Open this post in threaded view
|

Re: Putting several services on the same queue

Christian Schneider
Hi Willem,

the entry point to my route should be a listener on a queue. I imaginged
to achieve this either by using the jms component or the cxf component.
The client will calling my route will set the reply-to property of the
jms message. So I guess this property will have to be forwarded through
the whole route so when the exchange arrives back at the start the
component will know where to send the reply.

Does the jms or cxf component support the reply-to property and will it
be forwarded through the whole route?

I want to publish my services with local transport as they should not be
reachable directly from the outside. The only access point should be the
camel route. This will of course be reachable from the outside.
So I guess using the local tranport for the cxf endpoints should be ok
as far as I know.

Best regards,

Christian



Willem Jiang schrieb:

> Hi Christian,
>
> I think camel could help you to do the most content base routing work.
> Here are some thing I should point out.
> 1. You need to specify the reply destination of the jms endpoint or
> you can't get the response message from the cxf endpoint.
> 2. If you want to publish the cxf service through the local transport
> , your service can only be accessible for the client in the same JVM.
>
> Since I don't know much about osgi, but for the thread of cxf client,
> it dependent on the calling thread, if the jms consumer in the camel
> context is run by a single thread, it's the single thread.
>
> Willem
>
> Christian Schneider wrote:
>> Hi,
>>
>> I am building webservice adapters for a legacy system. The system can
>> be accessed with a java api but is not multi threaded. So my problem is
>> that I want to offer several services in one process and still make
>> sure only one service is called at a time. To make this scalable I
>> will then run this process several
>> times on the same and different machines.
>>
>> I also would like to make management of this system as easy as
>> possible. Ideally I would like this process to be a kind of
>> application server where I can install and deinstall services while
>> it is running.
>> Still they should listen on one queue and work single threaded. I
>> imagined doing this with an osgi server. The problem is that probably
>> then each bundle will have it´s own cxf and they are not single
>> threaded anymore.
>>
>> I have already asked this question on the cxf list and got an idea on
>> how a solution could look.
>>
>> The idea is to publish the services with a local endpoint only. I
>> could the create a route the listens on jms in camel. This route
>> would then figure out what local endpoint the request should go to
>> and invokes it. Then it forwards the reply back to jms. It could find
>> the correct endpoint either by analysing the namespace of the xml or
>> by using ws addressing. The only thing is I do not know exactly how
>> to implement this.
>>
>> I think I will have to use the jms component and the cxf component
>> for the ends of the route. But how do I do the routing? I could
>> imagine a content based router that figures out what cxf endpoint to
>> forward to via xpath.
>>
>> from("jms:myqueue").choice()
>> .when(xpath("//namespace=service1namespace")).to("cxf:mybeanname:myEndpointName")
>>
>> .when(xpath("//namespace=service2namespace")).to("cxf:mybeanname2:myEndpointName2")
>>
>> .otherwise().to("jms:deadletterqueue")
>>
>> The xpath expression is not correct. I will have to look up how to do
>> this. Does the rest sound ok to you?
>>
>> Will a configuration like this work in osgi where each service is a
>> separate bundle?
>>
>> Best regards
>>
>> Christian
>>
>
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Putting several services on the same queue

Willem.Jiang
Administrator
Hi Christian,

The default exchange pattern of camel-cxf component is InOut, but  for
the camel-jms consumer it just receive message from message queue.
If you want send the response message back , you have to put the message
to the response queue, just like this

from("jms:myqueue").choice()
.when(xpath("//namespace=service1namespace")).to("cxf:mybeanname:myEndpointName").to("jms:responsequeue")
.when(xpath("//namespace=service2namespace")).to("cxf:mybeanname2:myEndpointName2").to("jms:responsequeue")
.otherwise().to("jms:deadletterqueue")

BTW, If you alway want to let CXF take care of the service publishing
work, you could use camel transport which will be more effective than
the local transport.
Here are  the example[1] and document[2]

[1]http://cwiki.apache.org/CAMEL/cxf-example.html
[2]http://cwiki.apache.org/CAMEL/camel-transport-for-cxf.html

Willem

Christian Schneider wrote:

> Hi Willem,
>
> the entry point to my route should be a listener on a queue. I
> imaginged to achieve this either by using the jms component or the cxf
> component. The client will calling my route will set the reply-to
> property of the jms message. So I guess this property will have to be
> forwarded through the whole route so when the exchange arrives back at
> the start the component will know where to send the reply.
>
> Does the jms or cxf component support the reply-to property and will
> it be forwarded through the whole route?
>
> I want to publish my services with local transport as they should not
> be reachable directly from the outside. The only access point should
> be the camel route. This will of course be reachable from the outside.
> So I guess using the local tranport for the cxf endpoints should be ok
> as far as I know.
>
> Best regards,
>
> Christian
>
>
>
> Willem Jiang schrieb:
>> Hi Christian,
>>
>> I think camel could help you to do the most content base routing work.
>> Here are some thing I should point out.
>> 1. You need to specify the reply destination of the jms endpoint or
>> you can't get the response message from the cxf endpoint.
>> 2. If you want to publish the cxf service through the local transport
>> , your service can only be accessible for the client in the same JVM.
>>
>> Since I don't know much about osgi, but for the thread of cxf client,
>> it dependent on the calling thread, if the jms consumer in the camel
>> context is run by a single thread, it's the single thread.
>>
>> Willem
>>
>> Christian Schneider wrote:
>>> Hi,
>>>
>>> I am building webservice adapters for a legacy system. The system
>>> can be accessed with a java api but is not multi threaded. So my
>>> problem is
>>> that I want to offer several services in one process and still make
>>> sure only one service is called at a time. To make this scalable I
>>> will then run this process several
>>> times on the same and different machines.
>>>
>>> I also would like to make management of this system as easy as
>>> possible. Ideally I would like this process to be a kind of
>>> application server where I can install and deinstall services while
>>> it is running.
>>> Still they should listen on one queue and work single threaded. I
>>> imagined doing this with an osgi server. The problem is that
>>> probably then each bundle will have it´s own cxf and they are not
>>> single threaded anymore.
>>>
>>> I have already asked this question on the cxf list and got an idea
>>> on how a solution could look.
>>>
>>> The idea is to publish the services with a local endpoint only. I
>>> could the create a route the listens on jms in camel. This route
>>> would then figure out what local endpoint the request should go to
>>> and invokes it. Then it forwards the reply back to jms. It could
>>> find the correct endpoint either by analysing the namespace of the
>>> xml or by using ws addressing. The only thing is I do not know
>>> exactly how to implement this.
>>>
>>> I think I will have to use the jms component and the cxf component
>>> for the ends of the route. But how do I do the routing? I could
>>> imagine a content based router that figures out what cxf endpoint to
>>> forward to via xpath.
>>>
>>> from("jms:myqueue").choice()
>>> .when(xpath("//namespace=service1namespace")).to("cxf:mybeanname:myEndpointName")
>>>
>>> .when(xpath("//namespace=service2namespace")).to("cxf:mybeanname2:myEndpointName2")
>>>
>>> .otherwise().to("jms:deadletterqueue")
>>>
>>> The xpath expression is not correct. I will have to look up how to
>>> do this. Does the rest sound ok to you?
>>>
>>> Will a configuration like this work in osgi where each service is a
>>> separate bundle?
>>>
>>> Best regards
>>>
>>> Christian
>>>
>>
>>
>>
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Putting several services on the same queue

Christian Schneider
Hi Willem,

thanks for the help. I will try the camel transport for cxf.

I have checked the EndpointMessageListener code and I think the
JmsConsumer can work with InOut Exchanges. If I understand the code
correctly it reads the reply-to header and will send the message back to
the supplied queue from the incoming message. So I guess this should
work. I will try to create a running example.

I will try with the following route:
from("jms:myqueue").choice()
.when(xpath("//namespace=service1namespace")).to("cxf:local:myEndpointName")
.when(xpath("//namespace=service2namespace")).to("cxf:local:myEndpointName2")
.otherwise().to("jms:deadletterqueue")

Best regards

Christian


----
Some code from EndpointMessageListener to show the InOut handling
----
public void onMessage(final Message message) {
        try {
            if (LOG.isDebugEnabled()) {
                LOG.debug(endpoint + " receiving JMS message: " + message);
            }
            Destination replyDestination = getReplyToDestination(message);
            final JmsExchange exchange = createExchange(message,
replyDestination);
            if (eagerLoadingOfProperties) {
                exchange.getIn().getHeaders();
            }
            processor.process(exchange);

            final JmsMessage out = exchange.getOut(false);
            if (out != null && !disableReplyTo) {
                sendReply(replyDestination, message, exchange, out);
            }
        } catch (Exception e) {
            throw new RuntimeCamelException(e);
        }
    }

protected Destination getReplyToDestination(Message message) throws
JMSException {
        // lets send a response back if we can
        Destination destination = replyToDestination;
        if (destination == null) {
            destination = message.getJMSReplyTo();
        }
        return destination;
    }


Willem Jiang schrieb:

> Hi Christian,
>
> The default exchange pattern of camel-cxf component is InOut, but  for
> the camel-jms consumer it just receive message from message queue.
> If you want send the response message back , you have to put the
> message to the response queue, just like this
>
> from("jms:myqueue").choice()
> .when(xpath("//namespace=service1namespace")).to("cxf:mybeanname:myEndpointName").to("jms:responsequeue")
>
> .when(xpath("//namespace=service2namespace")).to("cxf:mybeanname2:myEndpointName2").to("jms:responsequeue")
>
> .otherwise().to("jms:deadletterqueue")
>
> BTW, If you alway want to let CXF take care of the service publishing
> work, you could use camel transport which will be more effective than
> the local transport.
> Here are  the example[1] and document[2]
>
> [1]http://cwiki.apache.org/CAMEL/cxf-example.html
> [2]http://cwiki.apache.org/CAMEL/camel-transport-for-cxf.html
>
> Willem
>
> Christian Schneider wrote:

Reply | Threaded
Open this post in threaded view
|

Re: Putting several services on the same queue

Christian Schneider
In reply to this post by Willem.Jiang
Hi,

I have set up an example for routing from a jms endpoint to a cxf
endpoint using the camel transport in cxf.
To test the service I am using a normal cxf client on the jms queue.

When I run my test I get an exception. It seems the unmarshal method
tries to convert the incoming message to either TextMessage,
BytesMessage or ObjectMessage.
As the class ActiveMqMessage is none of these it does not work. Any idea
why this happens?

I have attached the exception, my log, the java code and the cxf config.

java.lang.ClassCastException:
org.apache.activemq.command.ActiveMQMessage cannot be cast to
javax.jms.ObjectMessage
    at
org.apache.cxf.transport.jms.JMSTransportBase.unmarshal(JMSTransportBase.java:125)
    at org.apache.cxf.transport.jms.JMSConduit.receive(JMSConduit.java:162)
    at
org.apache.cxf.transport.jms.JMSConduit.access$100(JMSConduit.java:55)
    at
org.apache.cxf.transport.jms.JMSConduit$JMSOutputStream.handleResponse(JMSConduit.java:366)
    at
org.apache.cxf.transport.jms.JMSConduit$JMSOutputStream.doClose(JMSConduit.java:258)
    at
org.apache.cxf.io.CachedOutputStream.close(CachedOutputStream.java:156)
    at
org.apache.cxf.io.CacheAndWriteOutputStream.postClose(CacheAndWriteOutputStream.java:47)
    at
org.apache.cxf.io.CachedOutputStream.close(CachedOutputStream.java:159)
    at
org.apache.cxf.transport.AbstractConduit.close(AbstractConduit.java:66)
    at
org.apache.cxf.interceptor.MessageSenderInterceptor$MessageSenderEndingInterceptor.handleMessage(MessageSenderInterceptor.java:62)
    at
org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:221)
    at org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:276)
    at org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:222)
    at org.apache.cxf.frontend.ClientProxy.invokeSync(ClientProxy.java:73)
    at
org.apache.cxf.jaxws.JaxWsClientProxy.invoke(JaxWsClientProxy.java:177)
    at $Proxy36.echo(Unknown Source)
    at JmsToCXF.run(JmsToCXF.java:125)
    at JmsToCXF.main(JmsToCXF.java:136)


-----



import java.util.Arrays;

import javax.jms.ConnectionFactory;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.cxf.HelloService;
import org.apache.camel.component.cxf.HelloServiceImpl;
import org.apache.camel.component.cxf.transport.CamelTransportFactory;
import org.apache.camel.component.jms.JmsComponent;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.cxf.Bus;
import org.apache.cxf.BusFactory;
import org.apache.cxf.bus.spring.SpringBusFactory;
import org.apache.cxf.endpoint.ServerImpl;
import org.apache.cxf.interceptor.LoggingInInterceptor;
import org.apache.cxf.interceptor.LoggingOutInterceptor;
import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
import org.apache.cxf.jaxws.JaxWsServerFactoryBean;
import org.apache.cxf.transport.ConduitInitiatorManager;
import org.apache.cxf.transport.DestinationFactoryManager;

/**
 * An example class for demonstrating some of the basics behind camel This
 * example will send some text messages on to a JMS Queue, consume them and
 * persist them to disk
 *
 * @version $Revision: 529902 $
 */
public final class JmsToCXF {

    private JmsToCXF() {      
    }
   
    /**
     * @return
     * @throws Exception
     */
    private CamelContext createCamel() throws Exception {
        CamelContext context = new DefaultCamelContext();
        ConnectionFactory connectionFactory = new
ActiveMQConnectionFactory("tcp://localhost:61616");
        context.addComponent("test-jms",
JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
        context.addRoutes(new RouteBuilder() {
            public void configure() {
                from("test-jms:queue:test")
                .to("direct:test");
            }
        });
        return context;
    }
   
    public void createCXFBus(CamelContext context) {
        //setup the camel transport for the bus
        Bus bus = new SpringBusFactory().createBus("cxf.xml");
        CamelTransportFactory camelTransportFactory = new
CamelTransportFactory();
        camelTransportFactory.setCamelContext(context);
        camelTransportFactory.setBus(bus);
        // register the conduit initiator
        ConduitInitiatorManager cim =
bus.getExtension(ConduitInitiatorManager.class);
        cim.registerConduitInitiator(CamelTransportFactory.TRANSPORT_ID,
camelTransportFactory);
        // register the destination factory
        DestinationFactoryManager dfm =
bus.getExtension(DestinationFactoryManager.class);
       
dfm.registerDestinationFactory(CamelTransportFactory.TRANSPORT_ID,
camelTransportFactory);
        // set the default bus for
        BusFactory.setDefaultBus(bus);
        String[] trIds = new String[] {CamelTransportFactory.TRANSPORT_ID};
        camelTransportFactory.setTransportIds(Arrays.asList(trIds));
    }
   
    public ServerImpl startService() {  
        JaxWsServerFactoryBean svrBean = new JaxWsServerFactoryBean();
        svrBean.setAddress("camel://direct:test");
        svrBean.setServiceClass(HelloService.class);
        svrBean.setServiceBean(new HelloServiceImpl());
        svrBean.getInInterceptors().add(new LoggingInInterceptor());
        svrBean.getOutInterceptors().add(new LoggingOutInterceptor());
        ServerImpl server = (ServerImpl)svrBean.create();
        server.start();
        return server;
    }
   
    /**
     * Start a client that calls the service over hte jms queue to
     * test the route
     * @return
     */
    public HelloService createClient() {
        JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean();
        factory.setServiceClass(HelloService.class);
        factory.setAddress("jms://");
        return (HelloService) factory.create();
    }
   
    public void run() throws Exception {
       
        /**
         * Start camel context with a jms component
         */
        CamelContext context = createCamel();
       
        /**
         * Create the cxf bus and integrate it with camel
         */
        createCXFBus(context);
        ServerImpl server = startService();
        HelloService client = createClient();

        context.start();

       String reply = client.echo("hallo");

        context.stop();
        server.stop();
        //broker.stop();
        System.exit(0);
    }

   
   
    public static void main(String args[]) throws Exception {
        new JmsToCXF().run();
    }
}


-------------------


    <jms:conduit
name="{http://cxf.component.camel.apache.org/}HelloServicePort.jms-conduit">
        <jms:clientConfig clientReceiveTimeout="11000"
messageTimeToLive="10000"/>
        <jms:address destinationStyle="queue"
jndiConnectionFactoryName="ConnectionFactory"
            jndiDestinationName="dynamicQueues/test"
connectionUserName="testUser"
            connectionPassword="testPassword">
            <jms:JMSNamingProperty name="java.naming.factory.initial"
               
value="org.apache.activemq.jndi.ActiveMQInitialContextFactory" />
            <jms:JMSNamingProperty name="java.naming.provider.url"
value="tcp://localhost:61616" />
        </jms:address>
       
    </jms:conduit>

------------------------------

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd">

    <modelVersion>4.0.0</modelVersion>
    <groupId>eu.cschneider</groupId>
    <artifactId>cameltest</artifactId>
    <version>1.0.0-SNAPSHOT</version>
    <repositories>
        <repository>
            <id>apache.incubating.releases</id>
            <name>Apache Incubating Release Distribution Repository</name>
           
<url>http://people.apache.org/repo/m2-incubating-repository</url>
        </repository>
    </repositories>
        <properties>
        <spring.version>2.5.4</spring.version>
        <cxf.version>2.1</cxf.version>
        <camel-version>1.3.0</camel-version>
    </properties>

    <dependencies>
        <!-- Spring is directly included to override the version 2.0.4
cxf brings -->
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-beans</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>${spring.version}</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-web</artifactId>
            <version>${spring.version}</version>
        </dependency>

        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>1.2.14</version>
        </dependency>
        <!-- Depending on your requirements you may need more or less
modules from cxf. To make the start easier there are
            some more listed than you need for this example. -->
        <dependency>
            <groupId>org.apache.cxf</groupId>
            <artifactId>cxf-rt-core</artifactId>
            <version>${cxf.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.cxf</groupId>
            <artifactId>cxf-rt-frontend-simple</artifactId>
            <version>${cxf.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.cxf</groupId>
            <artifactId>cxf-rt-frontend-jaxws</artifactId>
            <version>${cxf.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.cxf</groupId>
            <artifactId>cxf-rt-databinding-aegis</artifactId>
            <version>${cxf.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.cxf</groupId>
            <artifactId>cxf-rt-transports-local</artifactId>
            <version>${cxf.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.cxf</groupId>
            <artifactId>cxf-rt-transports-http</artifactId>
            <version>${cxf.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.cxf</groupId>
            <artifactId>cxf-rt-transports-http-jetty</artifactId>
            <version>${cxf.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.cxf</groupId>
            <artifactId>cxf-rt-transports-jms</artifactId>
            <version>${cxf.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.cxf</groupId>
            <artifactId>cxf-rt-management</artifactId>
            <version>${cxf.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.cxf</groupId>
            <artifactId>cxf-common-utilities</artifactId>
            <version>${cxf.version}</version>
        </dependency>
        <!--
            <dependency>
            <groupId>org.mortbay.jetty</groupId>
            <artifactId>jetty</artifactId>
            <version>6.1.5</version>
            </dependency>
        -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>3.8.2</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.tibco</groupId>
            <artifactId>tibjms</artifactId>
            <version>4.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-core</artifactId>
            <version>5.1.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-core</artifactId>
            <version>${camel-version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-jms</artifactId>
            <version>${camel-version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.camel</groupId>
            <artifactId>camel-cxf</artifactId>
            <version>${camel-version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.cxf</groupId>
            <artifactId>cxf-rt-core</artifactId>
            <version>${cxf.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.cxf</groupId>
            <artifactId>cxf-rt-frontend-simple</artifactId>
            <version>${cxf.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.cxf</groupId>
            <artifactId>cxf-rt-frontend-jaxws</artifactId>
            <version>${cxf.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.cxf</groupId>
            <artifactId>cxf-rt-databinding-aegis</artifactId>
            <version>${cxf.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.cxf</groupId>
            <artifactId>cxf-rt-transports-local</artifactId>
            <version>${cxf.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.cxf</groupId>
            <artifactId>cxf-rt-transports-http</artifactId>
            <version>${cxf.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.cxf</groupId>
            <artifactId>cxf-rt-transports-http-jetty</artifactId>
            <version>${cxf.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.cxf</groupId>
            <artifactId>cxf-rt-transports-jms</artifactId>
            <version>${cxf.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.cxf</groupId>
            <artifactId>cxf-rt-management</artifactId>
            <version>${cxf.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.cxf</groupId>
            <artifactId>cxf-common-utilities</artifactId>
            <version>${cxf.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.cxf</groupId>
            <artifactId>cxf-tools-common</artifactId>
            <version>${cxf.version}</version>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <source>1.5</source>
                    <target>1.5</target>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>



Willem Jiang schrieb:

> Hi Christian,
>
> The default exchange pattern of camel-cxf component is InOut, but  for
> the camel-jms consumer it just receive message from message queue.
> If you want send the response message back , you have to put the
> message to the response queue, just like this
>
> from("jms:myqueue").choice()
> .when(xpath("//namespace=service1namespace")).to("cxf:mybeanname:myEndpointName").to("jms:responsequeue")
>
> .when(xpath("//namespace=service2namespace")).to("cxf:mybeanname2:myEndpointName2").to("jms:responsequeue")
>
> .otherwise().to("jms:deadletterqueue")
>
> BTW, If you alway want to let CXF take care of the service publishing
> work, you could use camel transport which will be more effective than
> the local transport.
> Here are  the example[1] and document[2]
>
> [1]http://cwiki.apache.org/CAMEL/cxf-example.html
> [2]http://cwiki.apache.org/CAMEL/camel-transport-for-cxf.html
>
> Willem
>
> Christian Schneider wrote:

Reply | Threaded
Open this post in threaded view
|

Re: Putting several services on the same queue

Willem.Jiang
Administrator
Hi Christian,

Could you also use camel transport instead of jms transport in CXF client?
CXF jms transport do not touch any ActiveMqMessage.

Willem
Christian Schneider wrote:

> Hi,
>
> I have set up an example for routing from a jms endpoint to a cxf
> endpoint using the camel transport in cxf.
> To test the service I am using a normal cxf client on the jms queue.
>
> When I run my test I get an exception. It seems the unmarshal method
> tries to convert the incoming message to either TextMessage,
> BytesMessage or ObjectMessage.
> As the class ActiveMqMessage is none of these it does not work. Any
> idea why this happens?
>
> I have attached the exception, my log, the java code and the cxf config.
>
> java.lang.ClassCastException:
> org.apache.activemq.command.ActiveMQMessage cannot be cast to
> javax.jms.ObjectMessage
>    at
> org.apache.cxf.transport.jms.JMSTransportBase.unmarshal(JMSTransportBase.java:125)
>
>    at
> org.apache.cxf.transport.jms.JMSConduit.receive(JMSConduit.java:162)
>    at
> org.apache.cxf.transport.jms.JMSConduit.access$100(JMSConduit.java:55)
>    at
> org.apache.cxf.transport.jms.JMSConduit$JMSOutputStream.handleResponse(JMSConduit.java:366)
>
>    at
> org.apache.cxf.transport.jms.JMSConduit$JMSOutputStream.doClose(JMSConduit.java:258)
>
>    at
> org.apache.cxf.io.CachedOutputStream.close(CachedOutputStream.java:156)
>    at
> org.apache.cxf.io.CacheAndWriteOutputStream.postClose(CacheAndWriteOutputStream.java:47)
>
>    at
> org.apache.cxf.io.CachedOutputStream.close(CachedOutputStream.java:159)
>    at
> org.apache.cxf.transport.AbstractConduit.close(AbstractConduit.java:66)
>    at
> org.apache.cxf.interceptor.MessageSenderInterceptor$MessageSenderEndingInterceptor.handleMessage(MessageSenderInterceptor.java:62)
>
>    at
> org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:221)
>
>    at org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:276)
>    at org.apache.cxf.endpoint.ClientImpl.invoke(ClientImpl.java:222)
>    at org.apache.cxf.frontend.ClientProxy.invokeSync(ClientProxy.java:73)
>    at
> org.apache.cxf.jaxws.JaxWsClientProxy.invoke(JaxWsClientProxy.java:177)
>    at $Proxy36.echo(Unknown Source)
>    at JmsToCXF.run(JmsToCXF.java:125)
>    at JmsToCXF.main(JmsToCXF.java:136)
>
>
> -----
>
>
>
> import java.util.Arrays;
>
> import javax.jms.ConnectionFactory;
>
> import org.apache.activemq.ActiveMQConnectionFactory;
> import org.apache.activemq.broker.BrokerService;
> import org.apache.camel.CamelContext;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.component.cxf.HelloService;
> import org.apache.camel.component.cxf.HelloServiceImpl;
> import org.apache.camel.component.cxf.transport.CamelTransportFactory;
> import org.apache.camel.component.jms.JmsComponent;
> import org.apache.camel.impl.DefaultCamelContext;
> import org.apache.cxf.Bus;
> import org.apache.cxf.BusFactory;
> import org.apache.cxf.bus.spring.SpringBusFactory;
> import org.apache.cxf.endpoint.ServerImpl;
> import org.apache.cxf.interceptor.LoggingInInterceptor;
> import org.apache.cxf.interceptor.LoggingOutInterceptor;
> import org.apache.cxf.jaxws.JaxWsProxyFactoryBean;
> import org.apache.cxf.jaxws.JaxWsServerFactoryBean;
> import org.apache.cxf.transport.ConduitInitiatorManager;
> import org.apache.cxf.transport.DestinationFactoryManager;
>
> /**
> * An example class for demonstrating some of the basics behind camel This
> * example will send some text messages on to a JMS Queue, consume them
> and
> * persist them to disk
> *
> * @version $Revision: 529902 $
> */
> public final class JmsToCXF {
>
>    private JmsToCXF() {          }
>      /**
>     * @return
>     * @throws Exception
>     */
>    private CamelContext createCamel() throws Exception {
>        CamelContext context = new DefaultCamelContext();
>        ConnectionFactory connectionFactory = new
> ActiveMQConnectionFactory("tcp://localhost:61616");
>        context.addComponent("test-jms",
> JmsComponent.jmsComponentAutoAcknowledge(connectionFactory));
>        context.addRoutes(new RouteBuilder() {
>            public void configure() {
>                from("test-jms:queue:test")
>                .to("direct:test");
>            }
>        });
>        return context;
>    }
>      public void createCXFBus(CamelContext context) {
>        //setup the camel transport for the bus
>        Bus bus = new SpringBusFactory().createBus("cxf.xml");
>        CamelTransportFactory camelTransportFactory = new
> CamelTransportFactory();
>        camelTransportFactory.setCamelContext(context);
>        camelTransportFactory.setBus(bus);
>        // register the conduit initiator
>        ConduitInitiatorManager cim =
> bus.getExtension(ConduitInitiatorManager.class);
>        
> cim.registerConduitInitiator(CamelTransportFactory.TRANSPORT_ID,
> camelTransportFactory);
>        // register the destination factory
>        DestinationFactoryManager dfm =
> bus.getExtension(DestinationFactoryManager.class);
>        
> dfm.registerDestinationFactory(CamelTransportFactory.TRANSPORT_ID,
> camelTransportFactory);
>        // set the default bus for
>        BusFactory.setDefaultBus(bus);
>        String[] trIds = new String[]
> {CamelTransportFactory.TRANSPORT_ID};
>        camelTransportFactory.setTransportIds(Arrays.asList(trIds));
>    }
>      public ServerImpl startService() {          
> JaxWsServerFactoryBean svrBean = new JaxWsServerFactoryBean();
>        svrBean.setAddress("camel://direct:test");
>        svrBean.setServiceClass(HelloService.class);
>        svrBean.setServiceBean(new HelloServiceImpl());
>        svrBean.getInInterceptors().add(new LoggingInInterceptor());
>        svrBean.getOutInterceptors().add(new LoggingOutInterceptor());
>        ServerImpl server = (ServerImpl)svrBean.create();
>        server.start();
>        return server;
>    }
>      /**
>     * Start a client that calls the service over hte jms queue to
>     * test the route
>     * @return
>     */
>    public HelloService createClient() {
>        JaxWsProxyFactoryBean factory = new JaxWsProxyFactoryBean();
>        factory.setServiceClass(HelloService.class);
>        factory.setAddress("jms://");
>        return (HelloService) factory.create();
>    }
>      public void run() throws Exception {
>              /**
>         * Start camel context with a jms component
>         */
>        CamelContext context = createCamel();
>              /**
>         * Create the cxf bus and integrate it with camel
>         */
>        createCXFBus(context);
>        ServerImpl server = startService();
>        HelloService client = createClient();
>
>        context.start();
>
>       String reply = client.echo("hallo");
>
>        context.stop();
>        server.stop();
>        //broker.stop();
>        System.exit(0);
>    }
>
>        public static void main(String args[]) throws Exception {
>        new JmsToCXF().run();
>    }
> }
>
>
> -------------------
>
>
>    <jms:conduit
> name="{http://cxf.component.camel.apache.org/}HelloServicePort.jms-conduit">
>
>        <jms:clientConfig clientReceiveTimeout="11000"
> messageTimeToLive="10000"/>
>        <jms:address destinationStyle="queue"
> jndiConnectionFactoryName="ConnectionFactory"
>            jndiDestinationName="dynamicQueues/test"
> connectionUserName="testUser"
>            connectionPassword="testPassword">
>            <jms:JMSNamingProperty name="java.naming.factory.initial"
>                
> value="org.apache.activemq.jndi.ActiveMQInitialContextFactory" />
>            <jms:JMSNamingProperty name="java.naming.provider.url"
> value="tcp://localhost:61616" />
>        </jms:address>
>          </jms:conduit>
>
> ------------------------------
>
> <?xml version="1.0" encoding="UTF-8"?>
> <project xmlns="http://maven.apache.org/POM/4.0.0"
> xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
>    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
> http://maven.apache.org/maven-v4_0_0.xsd">
>
>    <modelVersion>4.0.0</modelVersion>
>    <groupId>eu.cschneider</groupId>
>    <artifactId>cameltest</artifactId>
>    <version>1.0.0-SNAPSHOT</version>
>    <repositories>
>        <repository>
>            <id>apache.incubating.releases</id>
>            <name>Apache Incubating Release Distribution Repository</name>
>            
> <url>http://people.apache.org/repo/m2-incubating-repository</url>
>        </repository>
>    </repositories>
>        <properties>
>        <spring.version>2.5.4</spring.version>
>        <cxf.version>2.1</cxf.version>
>        <camel-version>1.3.0</camel-version>
>    </properties>
>
>    <dependencies>
>        <!-- Spring is directly included to override the version 2.0.4
> cxf brings -->
>        <dependency>
>            <groupId>org.springframework</groupId>
>            <artifactId>spring-core</artifactId>
>            <version>${spring.version}</version>
>        </dependency>
>        <dependency>
>            <groupId>org.springframework</groupId>
>            <artifactId>spring-beans</artifactId>
>            <version>${spring.version}</version>
>        </dependency>
>        <dependency>
>            <groupId>org.springframework</groupId>
>            <artifactId>spring-context</artifactId>
>            <version>${spring.version}</version>
>        </dependency>
>        <dependency>
>            <groupId>org.springframework</groupId>
>            <artifactId>spring-web</artifactId>
>            <version>${spring.version}</version>
>        </dependency>
>
>        <dependency>
>            <groupId>log4j</groupId>
>            <artifactId>log4j</artifactId>
>            <version>1.2.14</version>
>        </dependency>
>        <!-- Depending on your requirements you may need more or less
> modules from cxf. To make the start easier there are
>            some more listed than you need for this example. -->
>        <dependency>
>            <groupId>org.apache.cxf</groupId>
>            <artifactId>cxf-rt-core</artifactId>
>            <version>${cxf.version}</version>
>        </dependency>
>        <dependency>
>            <groupId>org.apache.cxf</groupId>
>            <artifactId>cxf-rt-frontend-simple</artifactId>
>            <version>${cxf.version}</version>
>        </dependency>
>        <dependency>
>            <groupId>org.apache.cxf</groupId>
>            <artifactId>cxf-rt-frontend-jaxws</artifactId>
>            <version>${cxf.version}</version>
>        </dependency>
>        <dependency>
>            <groupId>org.apache.cxf</groupId>
>            <artifactId>cxf-rt-databinding-aegis</artifactId>
>            <version>${cxf.version}</version>
>        </dependency>
>        <dependency>
>            <groupId>org.apache.cxf</groupId>
>            <artifactId>cxf-rt-transports-local</artifactId>
>            <version>${cxf.version}</version>
>        </dependency>
>        <dependency>
>            <groupId>org.apache.cxf</groupId>
>            <artifactId>cxf-rt-transports-http</artifactId>
>            <version>${cxf.version}</version>
>        </dependency>
>        <dependency>
>            <groupId>org.apache.cxf</groupId>
>            <artifactId>cxf-rt-transports-http-jetty</artifactId>
>            <version>${cxf.version}</version>
>        </dependency>
>        <dependency>
>            <groupId>org.apache.cxf</groupId>
>            <artifactId>cxf-rt-transports-jms</artifactId>
>            <version>${cxf.version}</version>
>        </dependency>
>        <dependency>
>            <groupId>org.apache.cxf</groupId>
>            <artifactId>cxf-rt-management</artifactId>
>            <version>${cxf.version}</version>
>        </dependency>
>        <dependency>
>            <groupId>org.apache.cxf</groupId>
>            <artifactId>cxf-common-utilities</artifactId>
>            <version>${cxf.version}</version>
>        </dependency>
>        <!--
>            <dependency>
>            <groupId>org.mortbay.jetty</groupId>
>            <artifactId>jetty</artifactId>
>            <version>6.1.5</version>
>            </dependency>
>        -->
>        <dependency>
>            <groupId>junit</groupId>
>            <artifactId>junit</artifactId>
>            <version>3.8.2</version>
>            <scope>test</scope>
>        </dependency>
>        <dependency>
>            <groupId>com.tibco</groupId>
>            <artifactId>tibjms</artifactId>
>            <version>4.3.0</version>
>        </dependency>
>        <dependency>
>            <groupId>org.apache.activemq</groupId>
>            <artifactId>activemq-core</artifactId>
>            <version>5.1.0</version>
>        </dependency>
>        <dependency>
>            <groupId>org.apache.camel</groupId>
>            <artifactId>camel-core</artifactId>
>            <version>${camel-version}</version>
>        </dependency>
>        <dependency>
>            <groupId>org.apache.camel</groupId>
>            <artifactId>camel-jms</artifactId>
>            <version>${camel-version}</version>
>        </dependency>
>        <dependency>
>            <groupId>org.apache.camel</groupId>
>            <artifactId>camel-cxf</artifactId>
>            <version>${camel-version}</version>
>        </dependency>
>        <dependency>
>            <groupId>org.apache.cxf</groupId>
>            <artifactId>cxf-rt-core</artifactId>
>            <version>${cxf.version}</version>
>        </dependency>
>        <dependency>
>            <groupId>org.apache.cxf</groupId>
>            <artifactId>cxf-rt-frontend-simple</artifactId>
>            <version>${cxf.version}</version>
>        </dependency>
>        <dependency>
>            <groupId>org.apache.cxf</groupId>
>            <artifactId>cxf-rt-frontend-jaxws</artifactId>
>            <version>${cxf.version}</version>
>        </dependency>
>        <dependency>
>            <groupId>org.apache.cxf</groupId>
>            <artifactId>cxf-rt-databinding-aegis</artifactId>
>            <version>${cxf.version}</version>
>        </dependency>
>        <dependency>
>            <groupId>org.apache.cxf</groupId>
>            <artifactId>cxf-rt-transports-local</artifactId>
>            <version>${cxf.version}</version>
>        </dependency>
>        <dependency>
>            <groupId>org.apache.cxf</groupId>
>            <artifactId>cxf-rt-transports-http</artifactId>
>            <version>${cxf.version}</version>
>        </dependency>
>        <dependency>
>            <groupId>org.apache.cxf</groupId>
>            <artifactId>cxf-rt-transports-http-jetty</artifactId>
>            <version>${cxf.version}</version>
>        </dependency>
>        <dependency>
>            <groupId>org.apache.cxf</groupId>
>            <artifactId>cxf-rt-transports-jms</artifactId>
>            <version>${cxf.version}</version>
>        </dependency>
>        <dependency>
>            <groupId>org.apache.cxf</groupId>
>            <artifactId>cxf-rt-management</artifactId>
>            <version>${cxf.version}</version>
>        </dependency>
>        <dependency>
>            <groupId>org.apache.cxf</groupId>
>            <artifactId>cxf-common-utilities</artifactId>
>            <version>${cxf.version}</version>
>        </dependency>
>        <dependency>
>            <groupId>org.apache.cxf</groupId>
>            <artifactId>cxf-tools-common</artifactId>
>            <version>${cxf.version}</version>
>        </dependency>
>    </dependencies>
>    <build>
>        <plugins>
>            <plugin>
>                <artifactId>maven-compiler-plugin</artifactId>
>                <configuration>
>                    <source>1.5</source>
>                    <target>1.5</target>
>                </configuration>
>            </plugin>
>        </plugins>
>    </build>
> </project>
>
>
>
> Willem Jiang schrieb:
>> Hi Christian,
>>
>> The default exchange pattern of camel-cxf component is InOut, but  
>> for the camel-jms consumer it just receive message from message queue.
>> If you want send the response message back , you have to put the
>> message to the response queue, just like this
>>
>> from("jms:myqueue").choice()
>> .when(xpath("//namespace=service1namespace")).to("cxf:mybeanname:myEndpointName").to("jms:responsequeue")
>>
>> .when(xpath("//namespace=service2namespace")).to("cxf:mybeanname2:myEndpointName2").to("jms:responsequeue")
>>
>> .otherwise().to("jms:deadletterqueue")
>>
>> BTW, If you alway want to let CXF take care of the service publishing
>> work, you could use camel transport which will be more effective than
>> the local transport.
>> Here are  the example[1] and document[2]
>>
>> [1]http://cwiki.apache.org/CAMEL/cxf-example.html
>> [2]http://cwiki.apache.org/CAMEL/camel-transport-for-cxf.html
>>
>> Willem
>>
>> Christian Schneider wrote:
>
>