Aggregator message lost

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Aggregator message lost

wilson.lists
Hi,

I am using an aggregator to concatenate the body of a bunch of messages into a single message. The following code illustrates the scenario:

<code>
package my.package;

import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.spring.Main;

public class MyRouteBuilder extends RouteBuilder {
        private int messageIndex = 0;

    public static void main(String... args) {
        Main.main(args);
    }

    public void configure() {
   
        from("timer://foo?period=500")
        .process(new Processor() {
                               
                                public void process(Exchange exchange) throws Exception {
                                        exchange.getOut().setBody("[myBody-" + (messageIndex++) + "]");
                                       
                                        exchange.getOut().setHeader("aggregateGroup", "group1");
                                }
                        }).to("direct:step1");

        from("direct:step1").multicast().to("direct:step2", "direct:step3");
       
        from("direct:step2").to("direct:aggregator");
        from("direct:step3").to("direct:aggregator");
       
        from("direct:aggregator").aggregate(header("aggregateGroup"), new AggregationStrategy() {
                       
                        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                               
                                System.out.println("Getting new exchange in aggretator: " + newExchange);
                               
                                if(oldExchange == null){
                                        return newExchange;
                                }else{
                                        oldExchange.getOut().setBody((String)newExchange.getIn().getBody() + (String)oldExchange.getIn().getBody());
                                }
                               
                                return oldExchange;
                        }
                }).batchSize(10).batchTimeout(2000L).process(new Processor() {
                       
                        public void process(Exchange exchange) throws Exception {
                                System.out.println("Received group: " + exchange.getIn().getBody() + " - " + exchange.getIn().getHeader("aggregateGroup"));
                        }
                });
       
    }
}
</code>

When running, this route configuration generates the following output:

<output>
lease use a packageScan element instead.
[pache.camel.spring.Main.main()] SpringCamelContext             INFO  Starting Apache Camel as property ShouldStartContext is true
[pache.camel.spring.Main.main()] DefaultCamelContext            INFO  Apache Camel 2.0.0 (CamelContext:camelContext) is starting
[pache.camel.spring.Main.main()] DefaultCamelContext            INFO  Apache Camel 2.0.0 (CamelContext:camelContext) started
Getting new exchange in aggretator: Exchange[Message: [myBody-0]]
Getting new exchange in aggretator: Exchange[Message: [myBody-0]]
Getting new exchange in aggretator: Exchange[Message: [myBody-1]]
Getting new exchange in aggretator: Exchange[Message: [myBody-1]]
Getting new exchange in aggretator: Exchange[Message: [myBody-2]]
Getting new exchange in aggretator: Exchange[Message: [myBody-2]]
Getting new exchange in aggretator: Exchange[Message: [myBody-3]]
Getting new exchange in aggretator: Exchange[Message: [myBody-3]]
Getting new exchange in aggretator: Exchange[Message: [myBody-4]]
Getting new exchange in aggretator: Exchange[Message: [myBody-4]]
Received group: [myBody-4][myBody-3][myBody-3][myBody-2][myBody-2][myBody-1][myBody-1][myBody-0][myBody-0] - group1
Getting new exchange in aggretator: Exchange[Message: [myBody-5]]
Getting new exchange in aggretator: Exchange[Message: [myBody-5]]
Getting new exchange in aggretator: Exchange[Message: [myBody-6]]
Getting new exchange in aggretator: Exchange[Message: [myBody-6]]
Getting new exchange in aggretator: Exchange[Message: [myBody-7]]
Getting new exchange in aggretator: Exchange[Message: [myBody-7]]
Getting new exchange in aggretator: Exchange[Message: [myBody-8]]
Getting new exchange in aggretator: Exchange[Message: [myBody-8]]
Getting new exchange in aggretator: Exchange[Message: [myBody-9]]
Getting new exchange in aggretator: Exchange[Message: [myBody-9]]
Received group: [myBody-9][myBody-8][myBody-8][myBody-7][myBody-7][myBody-6][myBody-6][myBody-5][myBody-5] - group1
Getting new exchange in aggretator: Exchange[Message: [myBody-10]]
Getting new exchange in aggretator: Exchange[Message: [myBody-10]]
Getting new exchange in aggretator: Exchange[Message: [myBody-11]]
Getting new exchange in aggretator: Exchange[Message: [myBody-11]]
Getting new exchange in aggretator: Exchange[Message: [myBody-12]]
Getting new exchange in aggretator: Exchange[Message: [myBody-12]]
Getting new exchange in aggretator: Exchange[Message: [myBody-13]]
Getting new exchange in aggretator: Exchange[Message: [myBody-13]]
Getting new exchange in aggretator: Exchange[Message: [myBody-14]]
Getting new exchange in aggretator: Exchange[Message: [myBody-14]]
Received group: [myBody-14][myBody-13][myBody-13][myBody-12][myBody-12][myBody-11][myBody-11][myBody-10][myBody-10] - group1
</output>

The output shows that all messages are passing through the Aggregator but the last one is missing in the resulting Exchange body. For example: the first message group is composed by 10 messages: {[myBody-0],[myBody-0],[myBody-1],[myBody-1],[myBody-2],[myBody-2],[myBody-3],[myBody-3],[myBody-4],[myBody-4]} but the resulting body is missing the last message ([myBody-14]).

Am I doing anything wrong?

Thank you.

--
Wilson Freitas
Vetta Technologies
http://www.vettatech.com
Reply | Threaded
Open this post in threaded view
|

Re: Aggregator message lost

Claus Ibsen-2
Hi

Do you "loose" message every time you run the unit test?
Have you tried with a higher batch timeout?



On Tue, Oct 20, 2009 at 4:43 PM, Wilson <[hidden email]> wrote:

>
> Hi,
>
> I am using an aggregator to concatenate the body of a bunch of messages into
> a single message. The following code illustrates the scenario:
>
> <code>
> package my.package;
>
> import org.apache.camel.Exchange;
> import org.apache.camel.Processor;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.processor.aggregate.AggregationStrategy;
> import org.apache.camel.spring.Main;
>
> public class MyRouteBuilder extends RouteBuilder {
>        private int messageIndex = 0;
>
>    public static void main(String... args) {
>        Main.main(args);
>    }
>
>    public void configure() {
>
>        from("timer://foo?period=500")
>                .process(new Processor() {
>
>                                public void process(Exchange exchange) throws Exception {
>                                        exchange.getOut().setBody("[myBody-" + (messageIndex++) + "]");
>
>                                        exchange.getOut().setHeader("aggregateGroup", "group1");
>                                }
>                        }).to("direct:step1");
>
>        from("direct:step1").multicast().to("direct:step2", "direct:step3");
>
>        from("direct:step2").to("direct:aggregator");
>        from("direct:step3").to("direct:aggregator");
>
>        from("direct:aggregator").aggregate(header("aggregateGroup"), new
> AggregationStrategy() {
>
>                        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
>
>                                System.out.println("Getting new exchange in aggretator: " +
> newExchange);
>
>                                if(oldExchange == null){
>                                        return newExchange;
>                                }else{
>                                        oldExchange.getOut().setBody((String)newExchange.getIn().getBody() +
> (String)oldExchange.getIn().getBody());
>                                }
>
>                                return oldExchange;
>                        }
>                }).batchSize(10).batchTimeout(2000L).process(new Processor() {
>
>                        public void process(Exchange exchange) throws Exception {
>                                System.out.println("Received group: " + exchange.getIn().getBody() + " -
> " + exchange.getIn().getHeader("aggregateGroup"));
>                        }
>                });
>
>    }
> }
> </code>
>
> When running, this route configuration generates the following output:
>
> <output>
> lease use a packageScan element instead.
> [pache.camel.spring.Main.main()] SpringCamelContext             INFO
> Starting Apache Camel as property ShouldStartContext is true
> [pache.camel.spring.Main.main()] DefaultCamelContext            INFO  Apache
> Camel 2.0.0 (CamelContext:camelContext) is starting
> [pache.camel.spring.Main.main()] DefaultCamelContext            INFO  Apache
> Camel 2.0.0 (CamelContext:camelContext) started
> Getting new exchange in aggretator: Exchange[Message: [myBody-0]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-0]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-1]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-1]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-2]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-2]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-3]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-3]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-4]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-4]]
> Received group:
> [myBody-4][myBody-3][myBody-3][myBody-2][myBody-2][myBody-1][myBody-1][myBody-0][myBody-0]
> - group1
> Getting new exchange in aggretator: Exchange[Message: [myBody-5]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-5]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-6]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-6]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-7]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-7]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-8]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-8]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-9]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-9]]
> Received group:
> [myBody-9][myBody-8][myBody-8][myBody-7][myBody-7][myBody-6][myBody-6][myBody-5][myBody-5]
> - group1
> Getting new exchange in aggretator: Exchange[Message: [myBody-10]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-10]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-11]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-11]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-12]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-12]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-13]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-13]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-14]]
> Getting new exchange in aggretator: Exchange[Message: [myBody-14]]
> Received group:
> [myBody-14][myBody-13][myBody-13][myBody-12][myBody-12][myBody-11][myBody-11][myBody-10][myBody-10]
> - group1
> </output>
>
> The output shows that all messages are passing through the Aggregator but
> the last one is missing in the resulting Exchange body. For example: the
> first message group is composed by 10 messages:
> {[myBody-0],[myBody-0],[myBody-1],[myBody-1],[myBody-2],[myBody-2],[myBody-3],[myBody-3],[myBody-4],[myBody-4]}
> but the resulting body is missing the last message ([myBody-14]).
>
> Am I doing anything wrong?
>
> Thank you.
>
> --
> Wilson Freitas
> Vetta Technologies
> http://www.vettatech.com
> --
> View this message in context: http://www.nabble.com/Aggregator-message-lost-tp25976380p25976380.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>
>



--
Claus Ibsen
Apache Camel Committer

Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus
Reply | Threaded
Open this post in threaded view
|

Re: Aggregator message lost

wilson.lists
Hi Claus,

Thanks for your answer.

Claus Ibsen-2 wrote
Hi

Do you "loose" message every time you run the unit test?

Have you tried with a higher batch timeout?


--
Claus Ibsen
Apache Camel Committer

Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus
Yes. The "message lost" happens every time I run the code. I think the problem is not related to batchTimeout. BTW increasing the batchTimeout to 100 secs has no effect. As you can see in the logs, the messages are produced very quickly and all of then are passing through the Aggregator.

Looks like the exchange produced by the Aggregator is the one created when the message n-1 is aggregated. I am not sure, but I think the exchange returned by the last aggregator cycle is being ignored.

This is what I think is happening:

For n messages:
-->Exchange(1)--Aggregator-->AggregatedExchange(1)-->
-->Exchange(2),AggregatedExchange(1)-->Aggregator-->AggregatedExchange(2)-->
-->Exchange(3),AggregatedExchange(2)-->Aggregator-->AggregatedExchange(3)-->
...
-->Exchange(n-1),AggregatedExchange(n-2)-->Aggregator-->AggregatedExchange(n-1)-->(this one is produced by the aggregator)
-->Exchange(n),AggregatedExchange(n-1)-->Aggregator-->AggregatedExchange(n)-->(this one is being lost or ignored somehow)

Thank you,

--
Wilson Freitas
Vetta Technologies
http://www.vettatech.com
Reply | Threaded
Open this post in threaded view
|

Re: Aggregator message lost

Claus Ibsen-2
Hi

See this unit test which I have created that works.
http://svn.apache.org/viewvc?rev=828961&view=rev


On Wed, Oct 21, 2009 at 8:15 PM, Wilson <[hidden email]> wrote:

>
> Hi Claus,
>
> Thanks for your answer.
>
>
> Claus Ibsen-2 wrote:
>>
>> Hi
>>
>> Do you "loose" message every time you run the unit test?
>>
>> Have you tried with a higher batch timeout?
>>
>>
>> --
>> Claus Ibsen
>> Apache Camel Committer
>>
>> Open Source Integration: http://fusesource.com
>> Blog: http://davsclaus.blogspot.com/
>> Twitter: http://twitter.com/davsclaus
>>
>>
>
> Yes. The "message lost" happens every time I run the code. I think the
> problem is not related to batchTimeout. BTW increasing the batchTimeout to
> 100 secs has no effect. As you can see in the logs, the messages are
> produced very quickly and all of then are passing through the Aggregator.
>
> Looks like the exchange produced by the Aggregator is the one created when
> the message n-1 is aggregated. I am not sure, but I think the exchange
> returned by the last aggregator cycle is being ignored.
>
> This is what I think is happening:
>
> For n messages:
> -->Exchange(1)--Aggregator-->AggregatedExchange(1)-->
> -->Exchange(2),AggregatedExchange(1)-->Aggregator-->AggregatedExchange(2)-->
> -->Exchange(3),AggregatedExchange(2)-->Aggregator-->AggregatedExchange(3)-->
> ...
> -->Exchange(n-1),AggregatedExchange(n-2)-->Aggregator-->AggregatedExchange(n-1)-->(this
> one is produced by the aggregator)
> -->Exchange(n),AggregatedExchange(n-1)-->Aggregator-->AggregatedExchange(n)-->(this
> one is being lost or ignored somehow)
>
> Thank you,
>
> --
> Wilson Freitas
> Vetta Technologies
> http://www.vettatech.com
> --
> View this message in context: http://www.nabble.com/Aggregator-message-lost-tp25976380p25997564.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>
>



--
Claus Ibsen
Apache Camel Committer

Author of Camel in Action: http://www.manning.com/ibsen/
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus
Reply | Threaded
Open this post in threaded view
|

Re: Aggregator message lost

wilson.lists
Hi Claus,

I was out on vacation so my feedback is a bit late.

I figured out why my code is not working.

Your AggregationStrategy implementation is setting the message body this way:
  oldExchange.getIn().setBody(oldBody + "," + newBody);

My code is doing this:
  oldExchange.getOut().setBody(oldBody + "," + newBody);

I am used to the Processor interface that expects the body to be set in the "out exchange". It is strange the fact that the Aggregation Strategy expects the "in exchange" to be updated.

Thank you.

--
Wilson Freitas
Vetta Technologies
http://www.vettatech.com


Claus Ibsen-2 wrote
Hi

See this unit test which I have created that works.
http://svn.apache.org/viewvc?rev=828961&view=rev


On Wed, Oct 21, 2009 at 8:15 PM, Wilson <wilson.lists@gmail.com> wrote:
>
> Hi Claus,
>
> Thanks for your answer.
>
>
> Claus Ibsen-2 wrote:
>>
>> Hi
>>
>> Do you "loose" message every time you run the unit test?
>>
>> Have you tried with a higher batch timeout?
>>
>>
>> --
>> Claus Ibsen
>> Apache Camel Committer
>>
>> Open Source Integration: http://fusesource.com
>> Blog: http://davsclaus.blogspot.com/
>> Twitter: http://twitter.com/davsclaus
>>
>>
>
> Yes. The "message lost" happens every time I run the code. I think the
> problem is not related to batchTimeout. BTW increasing the batchTimeout to
> 100 secs has no effect. As you can see in the logs, the messages are
> produced very quickly and all of then are passing through the Aggregator.
>
> Looks like the exchange produced by the Aggregator is the one created when
> the message n-1 is aggregated. I am not sure, but I think the exchange
> returned by the last aggregator cycle is being ignored.
>
> This is what I think is happening:
>
> For n messages:
> -->Exchange(1)--Aggregator-->AggregatedExchange(1)-->
> -->Exchange(2),AggregatedExchange(1)-->Aggregator-->AggregatedExchange(2)-->
> -->Exchange(3),AggregatedExchange(2)-->Aggregator-->AggregatedExchange(3)-->
> ...
> -->Exchange(n-1),AggregatedExchange(n-2)-->Aggregator-->AggregatedExchange(n-1)-->(this
> one is produced by the aggregator)
> -->Exchange(n),AggregatedExchange(n-1)-->Aggregator-->AggregatedExchange(n)-->(this
> one is being lost or ignored somehow)
>
> Thank you,
>
> --
> Wilson Freitas
> Vetta Technologies
> http://www.vettatech.com
> --
> View this message in context: http://www.nabble.com/Aggregator-message-lost-tp25976380p25997564.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>
>



--
Claus Ibsen
Apache Camel Committer

Author of Camel in Action: http://www.manning.com/ibsen/
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus
Reply | Threaded
Open this post in threaded view
|

Re: Re: Aggregator message lost

Bruno Borges
Maybe it will make sense if you consider the aggregation is been done over  
incoming messages.


Cheers,
Bruno

On Nov 16, 2009 2:28pm, Wilson <[hidden email]> wrote:


> Hi Claus,



> I was out on vacation so my feedback is a bit late.



> I figured out why my code is not working.



> Your AggregationStrategy implementation is setting the message body this

> way:

> oldExchange.getIn().setBody(oldBody + "," + newBody);



> My code is doing this:

> oldExchange.getOut().setBody(oldBody + "," + newBody);



> I am used to the Processor interface that expects the body to be set in  
> the

> "out exchange". It is strange the fact that the Aggregation Strategy  
> expects

> the "in exchange" to be updated.



> Thank you.



> --

> Wilson Freitas

> Vetta Technologies

> http://www.vettatech.com







> Claus Ibsen-2 wrote:

> >

> > Hi

> >

> > See this unit test which I have created that works.

> > http://svn.apache.org/viewvc?rev=828961&view=rev

> >

> >

> > On Wed, Oct 21, 2009 at 8:15 PM, Wilson [hidden email]> wrote:

> >>

> >> Hi Claus,

> >>

> >> Thanks for your answer.

> >>

> >>

> >> Claus Ibsen-2 wrote:

> >>>

> >>> Hi

> >>>

> >>> Do you "loose" message every time you run the unit test?

> >>>

> >>> Have you tried with a higher batch timeout?

> >>>

> >>>

> >>> --

> >>> Claus Ibsen

> >>> Apache Camel Committer

> >>>

> >>> Open Source Integration: http://fusesource.com

> >>> Blog: http://davsclaus.blogspot.com/

> >>> Twitter: http://twitter.com/davsclaus

> >>>

> >>>

> >>

> >> Yes. The "message lost" happens every time I run the code. I think the

> >> problem is not related to batchTimeout. BTW increasing the batchTimeout

> >> to

> >> 100 secs has no effect. As you can see in the logs, the messages are

> >> produced very quickly and all of then are passing through the  
> Aggregator.

> >>

> >> Looks like the exchange produced by the Aggregator is the one created

> >> when

> >> the message n-1 is aggregated. I am not sure, but I think the exchange

> >> returned by the last aggregator cycle is being ignored.

> >>

> >> This is what I think is happening:

> >>

> >> For n messages:

> >> -->Exchange(1)--Aggregator-->AggregatedExchange(1)-->

> >>  
> -->Exchange(2),AggregatedExchange(1)-->Aggregator-->AggregatedExchange(2)-->

> >>  
> -->Exchange(3),AggregatedExchange(2)-->Aggregator-->AggregatedExchange(3)-->

> >> ...

> >>  
> -->Exchange(n-1),AggregatedExchange(n-2)-->Aggregator-->AggregatedExchange(n-1)-->(this

> >> one is produced by the aggregator)

> >>  
> -->Exchange(n),AggregatedExchange(n-1)-->Aggregator-->AggregatedExchange(n)-->(this

> >> one is being lost or ignored somehow)

> >>

> >> Thank you,

> >>

> >> --

> >> Wilson Freitas

> >> Vetta Technologies

> >> http://www.vettatech.com

> >> --

> >> View this message in context:

> >> http://www.nabble.com/Aggregator-message-lost-tp25976380p25997564.html

> >> Sent from the Camel - Users mailing list archive at Nabble.com.

> >>

> >>

> >

> >

> >

> > --

> > Claus Ibsen

> > Apache Camel Committer

> >

> > Author of Camel in Action: http://www.manning.com/ibsen/

> > Open Source Integration: http://fusesource.com

> > Blog: http://davsclaus.blogspot.com/

> > Twitter: http://twitter.com/davsclaus

> >

> >



> --

> View this message in context:  
> http://old.nabble.com/Aggregator-message-lost-tp25976380p26374868.html

> Sent from the Camel - Users mailing list archive at Nabble.com.



Reply | Threaded
Open this post in threaded view
|

Re: Aggregator message lost

Willem.Jiang
Administrator
In reply to this post by wilson.lists
Since Camel is using Pipeline to chain to endpoints together, the out
message which comes from first endpoint will be treat as in message for
the second endpoint; if there is on out message that be set to exchange
from first endpoint, second endpoint can still can get the in message
from first endpoint :)

Willem

Wilson wrote:

> Hi Claus,
>
> I was out on vacation so my feedback is a bit late.
>
> I figured out why my code is not working.
>
> Your AggregationStrategy implementation is setting the message body this
> way:
>   oldExchange.getIn().setBody(oldBody + "," + newBody);
>
> My code is doing this:
>   oldExchange.getOut().setBody(oldBody + "," + newBody);
>
> I am used to the Processor interface that expects the body to be set in the
> "out exchange". It is strange the fact that the Aggregation Strategy expects
> the "in exchange" to be updated.
>
> Thank you.
>
> --
> Wilson Freitas
> Vetta Technologies
> http://www.vettatech.com
>
>
>
> Claus Ibsen-2 wrote:
>> Hi
>>
>> See this unit test which I have created that works.
>> http://svn.apache.org/viewvc?rev=828961&view=rev
>>
>>
>> On Wed, Oct 21, 2009 at 8:15 PM, Wilson <[hidden email]> wrote:
>>> Hi Claus,
>>>
>>> Thanks for your answer.
>>>
>>>
>>> Claus Ibsen-2 wrote:
>>>> Hi
>>>>
>>>> Do you "loose" message every time you run the unit test?
>>>>
>>>> Have you tried with a higher batch timeout?
>>>>
>>>>
>>>> --
>>>> Claus Ibsen
>>>> Apache Camel Committer
>>>>
>>>> Open Source Integration: http://fusesource.com
>>>> Blog: http://davsclaus.blogspot.com/
>>>> Twitter: http://twitter.com/davsclaus
>>>>
>>>>
>>> Yes. The "message lost" happens every time I run the code. I think the
>>> problem is not related to batchTimeout. BTW increasing the batchTimeout
>>> to
>>> 100 secs has no effect. As you can see in the logs, the messages are
>>> produced very quickly and all of then are passing through the Aggregator.
>>>
>>> Looks like the exchange produced by the Aggregator is the one created
>>> when
>>> the message n-1 is aggregated. I am not sure, but I think the exchange
>>> returned by the last aggregator cycle is being ignored.
>>>
>>> This is what I think is happening:
>>>
>>> For n messages:
>>> -->Exchange(1)--Aggregator-->AggregatedExchange(1)-->
>>> -->Exchange(2),AggregatedExchange(1)-->Aggregator-->AggregatedExchange(2)-->
>>> -->Exchange(3),AggregatedExchange(2)-->Aggregator-->AggregatedExchange(3)-->
>>> ...
>>> -->Exchange(n-1),AggregatedExchange(n-2)-->Aggregator-->AggregatedExchange(n-1)-->(this
>>> one is produced by the aggregator)
>>> -->Exchange(n),AggregatedExchange(n-1)-->Aggregator-->AggregatedExchange(n)-->(this
>>> one is being lost or ignored somehow)
>>>
>>> Thank you,
>>>
>>> --
>>> Wilson Freitas
>>> Vetta Technologies
>>> http://www.vettatech.com
>>> --
>>> View this message in context:
>>> http://www.nabble.com/Aggregator-message-lost-tp25976380p25997564.html
>>> Sent from the Camel - Users mailing list archive at Nabble.com.
>>>
>>>
>>
>>
>> --
>> Claus Ibsen
>> Apache Camel Committer
>>
>> Author of Camel in Action: http://www.manning.com/ibsen/
>> Open Source Integration: http://fusesource.com
>> Blog: http://davsclaus.blogspot.com/
>> Twitter: http://twitter.com/davsclaus
>>
>>
>