Strange behaviour of bean processor

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Strange behaviour of bean processor

S.R.-2
I'm trying to understand how the routes work, and especially, how the bean processors interact and make replies (for InOut pattern).

I have a following code:

// Processor  
Processor proc = new Processor() {
            public void process(Exchange exchange) throws Exception {
                // exchange.getOut().setBody(exchange.getIn().getBody() + " Reply "); - In this case all works fine!
                exchange.getIn().setBody(exchange.getIn().getBody() + " Reply ");
            }
        };

// Route        
from("jms:testQueue").process(proc).process(proc);

Other component sends the JMS message "abcd" to the "jms:testQueue" with JMSReplyTo field set to "jms:replies", then listens the "jms:replies" for response, and retrieves response like "[Processor]abcd Reply Reply " as expected.

But when I remove the last process(proc) element like:
from("jms:testQueue").process(proc),
and then repeat the scenario above, no replies sent on "jms:replies", and also, no exceptions thrown telling that no response during some timeout...

Is that expected behavior and maybe I have wrong understanding? I think it should either reply in both cases or not reply in both cases (and throw exception about no response).

Also, it's a bit confusing when exchange.setIn(...) and exchange.setOut(..) has the almost the same effect. As I've understood it, if we call exchange.setOut(Out) then that Out will become the Exchange.In for the next chain processor element, and if we don't, then we will have the same Exchange.In in the next processor. In any case, we will have the same result excluding the situation described above.

Thank you,
Sergey
Reply | Threaded
Open this post in threaded view
|

Re: Strange behaviour of bean processor

William Tam
Hi Sergey,

I think what's going on is, camel creates a route that includes a
pipeline processor to wrap the two "proc" processors if your route
looks like: from("activemq:a").process(proc).process(proc),

The debug message shows the route as follow:

2008-07-08 16:30:25,158 [main           ] DEBUG BeanInOutPatternTest
        - Routing Rules are:
[EventDrivenConsumerRoute[Endpoint[activemq:a] ->
Delegate(Pipeline[DeadLetterChannel[org.apache.camel.component.jms.BeanInOutPatternTest$1$1@435a3a,
RecipientList[log:org.apache.camel.DeadLetterChannel?level=error],
RedeliveryPolicy[maximumRedeliveries=1]],
DeadLetterChannel[org.apache.camel.component.jms.BeanInOutPatternTest$1$1@435a3a,
RecipientList[log:org.apache.camel.DeadLetterChannel?level=error],
RedeliveryPolicy[maximumRedeliveries=1]]])]]

At the end of the processor chain, the pipeline processor copies and
sets the out message in the Pipeline.process() method:
ExchangeHelper.copyResults(original, nextExchange).

That's how the out message is set even though it is not set by your processor.

If your route is: from("activemq:a").process(proc).process(proc),
there is no pipeline processor and the out body will remain null.  The
client won't get a reply.

2008-07-08 16:21:21,130 [main           ] DEBUG BeanInOutPatternTest
        - Routing Rules are:
[EventDrivenConsumerRoute[Endpoint[activemq:a] ->
Delegate(Delegate(DeadLetterChannel[Delegate(org.apache.camel.component.jms.BeanInOutPatternTest$1$1@a3d3b),
RecipientList[log:org.apache.camel.DeadLetterChannel?level=error],
RedeliveryPolicy[maximumRedeliveries=1]]))]]


(attached a test)

Thanks,
William



On Mon, Jul 7, 2008 at 8:18 AM, S.R. <[hidden email]> wrote:

>
> I'm trying to understand how the routes work, and especially, how the bean
> processors interact and make replies (for InOut pattern).
>
> I have a following code:
>
> // Processor
> Processor proc = new Processor() {
>            public void process(Exchange exchange) throws Exception {
>                // exchange.getOut().setBody(exchange.getIn().getBody() + "
> Reply "); - In this case all works fine!
>                exchange.getIn().setBody(exchange.getIn().getBody() + "
> Reply ");
>            }
>        };
>
> // Route
> from("jms:testQueue").process(proc).process(proc);
>
> Other component sends the JMS message "abcd" to the "jms:testQueue" with
> JMSReplyTo field set to "jms:replies", then listens the "jms:replies" for
> response, and retrieves response like "[Processor]abcd Reply Reply " as
> expected.
>
> But when I remove the last process(proc) element like:
> from("jms:testQueue").process(proc),
> and then repeat the scenario above, no replies sent on "jms:replies", and
> also, no exceptions thrown telling that no response during some timeout...
>
> Is that expected behavior and maybe I have wrong understanding? I think it
> should either reply in both cases or not reply in both cases (and throw
> exception about no response).
>
> Also, it's a bit confusing when exchange.setIn(...) and exchange.setOut(..)
> has the almost the same effect. As I've understood it, if we call
> exchange.setOut(Out) then that Out will become the Exchange.In for the next
> chain processor element, and if we don't, then we will have the same
> Exchange.In in the next processor. In any case, we will have the same result
> excluding the situation described above.
>
> Thank you,
> Sergey
> --
> View this message in context: http://www.nabble.com/Strange-behaviour-of-bean-processor-tp18315409s22882p18315409.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>
>

BeanInOutPatternTest.java (3K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Strange behaviour of bean processor

S.R.-2
Hi William,

Thank you for your answer!

Now I see, that's clear.

Sergey

William Tam wrote
Hi Sergey,

I think what's going on is, camel creates a route that includes a
pipeline processor to wrap the two "proc" processors if your route
looks like: from("activemq:a").process(proc).process(proc),

The debug message shows the route as follow:

2008-07-08 16:30:25,158 [main           ] DEBUG BeanInOutPatternTest
        - Routing Rules are:
[EventDrivenConsumerRoute[Endpoint[activemq:a] ->
Delegate(Pipeline[DeadLetterChannel[org.apache.camel.component.jms.BeanInOutPatternTest$1$1@435a3a,
RecipientList[log:org.apache.camel.DeadLetterChannel?level=error],
RedeliveryPolicy[maximumRedeliveries=1]],
DeadLetterChannel[org.apache.camel.component.jms.BeanInOutPatternTest$1$1@435a3a,
RecipientList[log:org.apache.camel.DeadLetterChannel?level=error],
RedeliveryPolicy[maximumRedeliveries=1]]])]]

At the end of the processor chain, the pipeline processor copies and
sets the out message in the Pipeline.process() method:
ExchangeHelper.copyResults(original, nextExchange).

That's how the out message is set even though it is not set by your processor.

If your route is: from("activemq:a").process(proc).process(proc),
there is no pipeline processor and the out body will remain null.  The
client won't get a reply.

2008-07-08 16:21:21,130 [main           ] DEBUG BeanInOutPatternTest
        - Routing Rules are:
[EventDrivenConsumerRoute[Endpoint[activemq:a] ->
Delegate(Delegate(DeadLetterChannel[Delegate(org.apache.camel.component.jms.BeanInOutPatternTest$1$1@a3d3b),
RecipientList[log:org.apache.camel.DeadLetterChannel?level=error],
RedeliveryPolicy[maximumRedeliveries=1]]))]]


(attached a test)

Thanks,
William



On Mon, Jul 7, 2008 at 8:18 AM, S.R. <srassokhin@gmail.com> wrote:
>
> I'm trying to understand how the routes work, and especially, how the bean
> processors interact and make replies (for InOut pattern).
>
> I have a following code:
>
> // Processor
> Processor proc = new Processor() {
>            public void process(Exchange exchange) throws Exception {
>                // exchange.getOut().setBody(exchange.getIn().getBody() + "
> Reply "); - In this case all works fine!
>                exchange.getIn().setBody(exchange.getIn().getBody() + "
> Reply ");
>            }
>        };
>
> // Route
> from("jms:testQueue").process(proc).process(proc);
>
> Other component sends the JMS message "abcd" to the "jms:testQueue" with
> JMSReplyTo field set to "jms:replies", then listens the "jms:replies" for
> response, and retrieves response like "[Processor]abcd Reply Reply " as
> expected.
>
> But when I remove the last process(proc) element like:
> from("jms:testQueue").process(proc),
> and then repeat the scenario above, no replies sent on "jms:replies", and
> also, no exceptions thrown telling that no response during some timeout...
>
> Is that expected behavior and maybe I have wrong understanding? I think it
> should either reply in both cases or not reply in both cases (and throw
> exception about no response).
>
> Also, it's a bit confusing when exchange.setIn(...) and exchange.setOut(..)
> has the almost the same effect. As I've understood it, if we call
> exchange.setOut(Out) then that Out will become the Exchange.In for the next
> chain processor element, and if we don't, then we will have the same
> Exchange.In in the next processor. In any case, we will have the same result
> excluding the situation described above.
>
> Thank you,
> Sergey
> --
> View this message in context: http://www.nabble.com/Strange-behaviour-of-bean-processor-tp18315409s22882p18315409.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>
>

package org.apache.camel.component.jms;

import static org.apache.camel.component.jms.JmsComponent.jmsComponent;

import javax.jms.ConnectionFactory;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.management.JmxSystemPropertyKeys;

public class BeanInOutPatternTest extends ContextTestSupport {
   
    @Override
    public void setUp() throws Exception {
        System.setProperty(JmxSystemPropertyKeys.DISABLED, "True");
        super.setUp();
    }

    protected RouteBuilder createRouteBuilder() throws Exception {
        return new RouteBuilder() {
         

            @Override
            public void configure() throws Exception {
               
                Processor proc = new Processor() {
                    public void process(Exchange exchange) throws Exception {
                        System.out.println("My processor: " + exchange);
                        exchange.getIn().setBody(exchange.getIn().getBody() + "Reply ");
                    }
                };
               
                errorHandler(deadLetterChannel().maximumRedeliveries(1));
                from("activemq:a").process(proc).process(proc);
            }

        };
    }

    public void testExchange() throws Exception {
        Exchange exchange = template.send("activemq:a", ExchangePattern.InOut, new Processor() {
            public void process(Exchange exchange) throws Exception {
                exchange.getIn().setBody("abcd");
            }
        });
       
        System.out.println("in = " + exchange.getIn());
        System.out.println("out = " + exchange.getOut());

    }
   
    protected CamelContext createCamelContext() throws Exception {
        CamelContext camelContext = super.createCamelContext();

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
            "vm://localhost?broker.persistent=false");
        camelContext.addComponent("activemq", jmsComponent(connectionFactory));

        return camelContext;
    }
}