Aggregator strategies (again)

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

Aggregator strategies (again)

almilo
Hi all:

First, Camel is a very interesting project. Congrats to the team!!

Now the question...

I´ve seen a post about better support for Aggregator pattern but with no answer. The testcases seem to be very simple and I think this is a really relevant pattern for distributed processing. But, being based in "a priori" batch size and timeout it lacks value for most of the uses I can think of.

Any plans for an Aggregator face-lift? :)

I attach an imaginative testcase :O) with some comments on FIXMEs

Lot of thanks, Alberto Mijares

// ------------ START ---------------//
package org.fundacionctic.taw;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.camel.CamelTemplate;
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.model.AggregatorType;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

public class AggregatorTest extends ContextTestSupport {

        private Log log = LogFactory.getLog(this.getClass());

        private static final String SURNAME_HEADER = "surname";

        private static final String TYPE_HEADER = "type";

        private static final String BROTHERS_TYPE = "brothers";

        public void testAggregator() throws Exception {

                String allNames = "Harpo Marx,Fiodor Karamazov,Chico Marx,Ivan Karamazov,Groucho Marx,Alexei Karamazov,Dimitri Karamazov";

                List<String> marxBrothers = new ArrayList<String>();
                marxBrothers.add("Harpo");
                marxBrothers.add("Chico");
                marxBrothers.add("Groucho");

                List<String> karamazovBrothers = new ArrayList<String>();
                karamazovBrothers.add("Fiodor");
                karamazovBrothers.add("Ivan");
                karamazovBrothers.add("Alexei");
                karamazovBrothers.add("Dimitri");

                Map<String, List> allBrothers = new HashMap<String, List>();
                allBrothers.put("Marx", marxBrothers);
                allBrothers.put("Karamazov", karamazovBrothers);

                MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result",
                                MockEndpoint.class);
                resultEndpoint.expectedMessageCount(1);
                resultEndpoint.expectedBodiesReceived(allBrothers);

                CamelTemplate template = new CamelTemplate(context);
                template.sendBody("direct:start", allNames);

                resultEndpoint.assertIsSatisfied();

        }

        @Override
        protected RouteBuilder createRouteBuilder() throws Exception {

                return new RouteBuilder() {

                        private void debugIn(String stringId, Exchange oldExchange,
                                        Exchange newExchange) {

                                log.debug(stringId + " old headers in: "
                                                + oldExchange.getIn().getHeaders());
                                log.debug(stringId + " old body in: "
                                                + oldExchange.getIn().getBody());
                                log.debug(stringId + " new headers in: "
                                                + newExchange.getIn().getHeaders());
                                log.debug(stringId + " new body in: "
                                                + newExchange.getIn().getBody());

                        }

                        private void debugOut(String stringId, Exchange exchange) {

                                log.debug(stringId + " old headers out: "
                                                + exchange.getIn().getHeaders());
                                log.debug(stringId + " old body out: "
                                                + exchange.getIn().getBody());

                        }

                        AggregationStrategy surnameAggregator = new AggregationStrategy() {

                                public Exchange aggregate(Exchange oldExchange,
                                                Exchange newExchange) {

                                        debugIn("Surname Aggregator", oldExchange, newExchange);

                                        Message oldIn = oldExchange.getIn();
                                        Message newIn = newExchange.getIn();

                                        List<String> brothers = null;
                                        if (oldIn.getBody() instanceof List) {

                                                brothers = oldIn.getBody(List.class);
                                                brothers.add(newIn.getBody(String.class));

                                        } else {

                                                brothers = new ArrayList<String>();
                                                brothers.add(oldIn.getBody(String.class));
                                                brothers.add(newIn.getBody(String.class));
                                                oldExchange.getIn().setBody(brothers);

                                        } // else

                                        debugOut("Surname Aggregator", oldExchange);

                                        return oldExchange;

                                }

                        };

                        AggregationStrategy brothersAggregator = new AggregationStrategy() {

                                public Exchange aggregate(Exchange oldExchange,
                                                Exchange newExchange) {

                                        debugIn("Brothers Aggregator", oldExchange, newExchange);

                                        Message oldIn = oldExchange.getIn();
                                        Message newIn = newExchange.getIn();

                                        Map<String, List> brothers = null;
                                        if (oldIn.getBody() instanceof Map) {

                                                brothers = oldIn.getBody(Map.class);
                                                brothers.put(newIn.getHeader(SURNAME_HEADER,
                                                                String.class), newIn.getBody(List.class));

                                        } else {

                                                brothers = new HashMap<String, List>();
                                                brothers.put(oldIn.getHeader(SURNAME_HEADER, String.class),
                                                                oldIn.getBody(List.class));
                                                brothers.put(newIn.getHeader(SURNAME_HEADER,
                                                                String.class), newIn.getBody(List.class));
                                                oldExchange.getIn().setBody(brothers);

                                        } // else

                                        debugOut("Brothers Aggregator", oldExchange);

                                        return oldExchange;

                                }

                        };

                        @Override
                        public void configure() throws Exception {

                                from("direct:start")
                                                // Separate people
                                                .splitter(bodyAs(String.class).tokenize(",")).process(

                                                // Split the name, erase the surname and put it in a
                                                // header
                                                                new Processor() {

                                                                        public void process(Exchange exchange)
                                                                                        throws Exception {

                                                                                String[] parts = exchange.getIn()
                                                                                                .getBody(String.class).split(
                                                                                                                " ");
                                                                                exchange.getIn().setBody(parts[0]);
                                                                                exchange.getIn().setHeader(
                                                                                                SURNAME_HEADER, parts[1]);

                                                                        } // process

                                                                }) // Processor

                                                .to("direct:joinSurnames");

                                // FIXME: This aggregator doesn´t usually fail but could also due to timeout
                                // or an incorrect batch size
                                // Join in a list by the surname on the header and mark as
                                // brothers list
                                from("direct:joinSurnames")
                                .aggregator(header(SURNAME_HEADER),
                                                surnameAggregator).setHeader(TYPE_HEADER,
                                                constant(BROTHERS_TYPE)).to("direct:joinBrothers");

                                // Join all brothers lists and remove surname and type headers
                                AggregatorType agg = from("direct:joinBrothers").aggregator(header(TYPE_HEADER),
                                                brothersAggregator);
                               
                                // FIXME: If these lines get commented the test fails some times with different errors
                                // due to a timeout or incorrect batch size that must be adjusted by hand
                                // There are two brothers lists to join but we don´t know always the number "a priori"
                                agg.setBatchSize(2);
                                agg.setBatchTimeout(10000);
                                agg.removeHeader(SURNAME_HEADER)
                                .removeHeader(TYPE_HEADER)
                                .to("mock:result");

                        }

                };

        }

}
// ------------ END ---------------//
Reply | Threaded
Open this post in threaded view
|

Re: Aggregator strategies (again)

RomKal
Hello!

I was thinking about this problem a little, and I would like to ask
someone more fluent in camel about his/her thoughts about the thing.

I thought about creating common base component based on something like
DelayedQueue class where whenever you receive an exchange you will
execute strategy class that is responsible for creating
DelayedExchange class (wrapper for Exchange that implements Delayed)
and putting it to DelayedQueue.

Then you have one thread that just monitors this DelayedQueue and
sends any exchange that is retrieved from this queue (very similar to
StreamResequencer).

This way we can create Delayer that will not block current thread.

On the other hand we can create this strategy in a way that on every
new exchange it will remove pending exchange from DelayedQueue, modify
it (using some AggregatorStrategy) and put it once again to this
Queue. If it will notice that aggregation is complete it will add
aggregated Exchange to this queue with delay == 0.

Logic to specify correlationId, default delay or batch size can be
exposed as an Expression.

I've already started to code it this way so if you have any comments
for this then let me know.
I hope that if it will be finished it could be included in camel distribution.

Cheers
Roman
2007/11/5, almilo <[hidden email]>:

>
> Hi all:
>
> First, Camel is a very interesting project. Congrats to the team!!
>
> Now the question...
>
> I´ve seen a post about better support for Aggregator pattern but with no
> answer. The testcases seem to be very simple and I think this is a really
> relevant pattern for distributed processing. But, being based in "a priori"
> batch size and timeout it lacks value for most of the uses I can think of.
>
> Any plans for an Aggregator face-lift? :)
>
> I attach an imaginative testcase :O) with some comments on FIXMEs
>
> Lot of thanks, Alberto Mijares
>
> // ------------ START ---------------//
> package org.fundacionctic.taw;
>
> import java.util.ArrayList;
> import java.util.HashMap;
> import java.util.List;
> import java.util.Map;
>
> import org.apache.camel.CamelTemplate;
> import org.apache.camel.ContextTestSupport;
> import org.apache.camel.Exchange;
> import org.apache.camel.Message;
> import org.apache.camel.Processor;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.component.mock.MockEndpoint;
> import org.apache.camel.model.AggregatorType;
> import org.apache.camel.processor.aggregate.AggregationStrategy;
> import org.apache.commons.logging.Log;
> import org.apache.commons.logging.LogFactory;
>
> public class AggregatorTest extends ContextTestSupport {
>
>         private Log log = LogFactory.getLog(this.getClass());
>
>         private static final String SURNAME_HEADER = "surname";
>
>         private static final String TYPE_HEADER = "type";
>
>         private static final String BROTHERS_TYPE = "brothers";
>
>         public void testAggregator() throws Exception {
>
>                 String allNames = "Harpo Marx,Fiodor Karamazov,Chico Marx,Ivan
> Karamazov,Groucho Marx,Alexei Karamazov,Dimitri Karamazov";
>
>                 List<String> marxBrothers = new ArrayList<String>();
>                 marxBrothers.add("Harpo");
>                 marxBrothers.add("Chico");
>                 marxBrothers.add("Groucho");
>
>                 List<String> karamazovBrothers = new ArrayList<String>();
>                 karamazovBrothers.add("Fiodor");
>                 karamazovBrothers.add("Ivan");
>                 karamazovBrothers.add("Alexei");
>                 karamazovBrothers.add("Dimitri");
>
>                 Map<String, List> allBrothers = new HashMap<String, List>();
>                 allBrothers.put("Marx", marxBrothers);
>                 allBrothers.put("Karamazov", karamazovBrothers);
>
>                 MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result",
>                                 MockEndpoint.class);
>                 resultEndpoint.expectedMessageCount(1);
>                 resultEndpoint.expectedBodiesReceived(allBrothers);
>
>                 CamelTemplate template = new CamelTemplate(context);
>                 template.sendBody("direct:start", allNames);
>
>                 resultEndpoint.assertIsSatisfied();
>
>         }
>
>         @Override
>         protected RouteBuilder createRouteBuilder() throws Exception {
>
>                 return new RouteBuilder() {
>
>                         private void debugIn(String stringId, Exchange oldExchange,
>                                         Exchange newExchange) {
>
>                                 log.debug(stringId + " old headers in: "
>                                                 + oldExchange.getIn().getHeaders());
>                                 log.debug(stringId + " old body in: "
>                                                 + oldExchange.getIn().getBody());
>                                 log.debug(stringId + " new headers in: "
>                                                 + newExchange.getIn().getHeaders());
>                                 log.debug(stringId + " new body in: "
>                                                 + newExchange.getIn().getBody());
>
>                         }
>
>                         private void debugOut(String stringId, Exchange exchange) {
>
>                                 log.debug(stringId + " old headers out: "
>                                                 + exchange.getIn().getHeaders());
>                                 log.debug(stringId + " old body out: "
>                                                 + exchange.getIn().getBody());
>
>                         }
>
>                         AggregationStrategy surnameAggregator = new AggregationStrategy() {
>
>                                 public Exchange aggregate(Exchange oldExchange,
>                                                 Exchange newExchange) {
>
>                                         debugIn("Surname Aggregator", oldExchange, newExchange);
>
>                                         Message oldIn = oldExchange.getIn();
>                                         Message newIn = newExchange.getIn();
>
>                                         List<String> brothers = null;
>                                         if (oldIn.getBody() instanceof List) {
>
>                                                 brothers = oldIn.getBody(List.class);
>                                                 brothers.add(newIn.getBody(String.class));
>
>                                         } else {
>
>                                                 brothers = new ArrayList<String>();
>                                                 brothers.add(oldIn.getBody(String.class));
>                                                 brothers.add(newIn.getBody(String.class));
>                                                 oldExchange.getIn().setBody(brothers);
>
>                                         } // else
>
>                                         debugOut("Surname Aggregator", oldExchange);
>
>                                         return oldExchange;
>
>                                 }
>
>                         };
>
>                         AggregationStrategy brothersAggregator = new AggregationStrategy() {
>
>                                 public Exchange aggregate(Exchange oldExchange,
>                                                 Exchange newExchange) {
>
>                                         debugIn("Brothers Aggregator", oldExchange, newExchange);
>
>                                         Message oldIn = oldExchange.getIn();
>                                         Message newIn = newExchange.getIn();
>
>                                         Map<String, List> brothers = null;
>                                         if (oldIn.getBody() instanceof Map) {
>
>                                                 brothers = oldIn.getBody(Map.class);
>                                                 brothers.put(newIn.getHeader(SURNAME_HEADER,
>                                                                 String.class), newIn.getBody(List.class));
>
>                                         } else {
>
>                                                 brothers = new HashMap<String, List>();
>                                                 brothers.put(oldIn.getHeader(SURNAME_HEADER, String.class),
>                                                                 oldIn.getBody(List.class));
>                                                 brothers.put(newIn.getHeader(SURNAME_HEADER,
>                                                                 String.class), newIn.getBody(List.class));
>                                                 oldExchange.getIn().setBody(brothers);
>
>                                         } // else
>
>                                         debugOut("Brothers Aggregator", oldExchange);
>
>                                         return oldExchange;
>
>                                 }
>
>                         };
>
>                         @Override
>                         public void configure() throws Exception {
>
>                                 from("direct:start")
>                                                 // Separate people
>                                                 .splitter(bodyAs(String.class).tokenize(",")).process(
>
>                                                 // Split the name, erase the surname and put it in a
>                                                 // header
>                                                                 new Processor() {
>
>                                                                         public void process(Exchange exchange)
>                                                                                         throws Exception {
>
>                                                                                 String[] parts = exchange.getIn()
>                                                                                                 .getBody(String.class).split(
>                                                                                                                 " ");
>                                                                                 exchange.getIn().setBody(parts[0]);
>                                                                                 exchange.getIn().setHeader(
>                                                                                                 SURNAME_HEADER, parts[1]);
>
>                                                                         } // process
>
>                                                                 }) // Processor
>
>                                                 .to("direct:joinSurnames");
>
>                                 // FIXME: This aggregator doesn´t usually fail but could also due to
> timeout
>                                 // or an incorrect batch size
>                                 // Join in a list by the surname on the header and mark as
>                                 // brothers list
>                                 from("direct:joinSurnames")
>                                 .aggregator(header(SURNAME_HEADER),
>                                                 surnameAggregator).setHeader(TYPE_HEADER,
>                                                 constant(BROTHERS_TYPE)).to("direct:joinBrothers");
>
>                                 // Join all brothers lists and remove surname and type headers
>                                 AggregatorType agg =
> from("direct:joinBrothers").aggregator(header(TYPE_HEADER),
>                                                 brothersAggregator);
>
>                                 // FIXME: If these lines get commented the test fails some times with
> different errors
>                                 // due to a timeout or incorrect batch size that must be adjusted by
> hand
>                                 // There are two brothers lists to join but we don´t know always the
> number "a priori"
>                                 agg.setBatchSize(2);
>                                 agg.setBatchTimeout(10000);
>                                 agg.removeHeader(SURNAME_HEADER)
>                                 .removeHeader(TYPE_HEADER)
>                                 .to("mock:result");
>
>                         }
>
>                 };
>
>         }
>
> }
> // ------------ END ---------------//
>
> --
> View this message in context: http://www.nabble.com/Aggregator-strategies-%28again%29-tf4750834s22882.html#a13584751
> Sent from the Camel - Users mailing list archive at Nabble.com.
>
>
Reply | Threaded
Open this post in threaded view
|

Re: Aggregator strategies (again)

jstrachan
On 05/11/2007, Roman Kalukiewicz <[hidden email]> wrote:

> Hello!
>
> I was thinking about this problem a little, and I would like to ask
> someone more fluent in camel about his/her thoughts about the thing.
>
> I thought about creating common base component based on something like
> DelayedQueue class where whenever you receive an exchange you will
> execute strategy class that is responsible for creating
> DelayedExchange class (wrapper for Exchange that implements Delayed)
> and putting it to DelayedQueue.

Interesting. You could reuse the SedaEndpoint class; which is an
endpoint that uses any BlockingQueue implementation; though I guess
we'd need to modify it a little to deal with the wrapping of a Delayed
object; so reuse might not be as clean as we'd like.

BTW we have the delayer...
http://activemq.apache.org/camel/delayer.html

which is kinda similar conceptually - though the Delayer is really
simple in that it just delays the current message's delivery by a time
expression. Another approach could be to couple this will a
resequencer to reorder the messages based on their expiration time
(soonest first) before feeding it into the delayer.

The only worry with things like using a DelayedQueue is transactions &
reliability - e.g. if your application terminates - do you loose
messages? Also if you're not careful the DelayedQueue can get rather
large. But there are so many different use cases; I can definitely see
the attraction of the DelayedQueue idea.

FWIW folks have wanted a delayed message dispatch in ActiveMQ for some time...
http://issues.apache.org/activemq/browse/AMQ-499

so it'd be great to provide this feature!

I guess we could one day provide alternative DelayedQueue
implementations; such as some kind of persistent delayed queue so that
messages can be sent reliably to the delayed queue (i.e. the send can
be a database insert). i.e. use the same Delayer component/endpoint
but allow a different factory to be provided to implement the
DelayedQueue container.

FWIW the BAM module...
http://cwiki.apache.org/CAMEL/bam.html

has a JPA based delay logic used to find expiration based alerts of
business processes; its in the ActivityMonitorEngine which in
pseudocode does...

                final Date timeNow = new Date(now);

                transactionTemplate.execute(new
TransactionCallbackWithoutResult() {
                    protected void
doInTransactionWithoutResult(TransactionStatus status) {
                        List<ActivityState> list =
template.find("select x from  ActivityState x where x.timeOverdue <
?1", timeNow);
                        for (ActivityState activityState : list) {
                            process(activityState);
                        }
                    }
                });


So I guess a JPA based implementation of the DelayedQueue would be
pretty easy to do; the hardest bit is figuring out how to persist the
message headers & body cleanly with JPA (but then that'd be a handy
thing anyway).



> Then you have one thread that just monitors this DelayedQueue and
> sends any exchange that is retrieved from this queue (very similar to
> StreamResequencer).
>
> This way we can create Delayer that will not block current thread.

Sounds cool to me.


> On the other hand we can create this strategy in a way that on every
> new exchange it will remove pending exchange from DelayedQueue, modify
> it (using some AggregatorStrategy) and put it once again to this
> Queue. If it will notice that aggregation is complete it will add
> aggregated Exchange to this queue with delay == 0.
>
> Logic to specify correlationId, default delay or batch size can be
> exposed as an Expression.
>
> I've already started to code it this way so if you have any comments
> for this then let me know.
> I hope that if it will be finished it could be included in camel distribution.

Great! Looking forward to your patch :)
--
James
-------
http://macstrac.blogspot.com/

Open Source SOA
http://open.iona.com
Reply | Threaded
Open this post in threaded view
|

Re: Aggregator strategies (again)

jstrachan
In reply to this post by almilo
Hi Alberto!

On 05/11/2007, almilo <[hidden email]> wrote:
> Hi all:
>
> First, Camel is a very interesting project. Congrats to the team!!

Many thanks! :)


> Now the question...
>
> I´ve seen a post about better support for Aggregator pattern but with no
> answer. The testcases seem to be very simple and I think this is a really
> relevant pattern for distributed processing. But, being based in "a priori"
> batch size and timeout it lacks value for most of the uses I can think of.

Yeah! That design BTW was done purely to work nicely with JMS
transactions. There are other designs and use cases in the Aggregator
space though! :)



> Any plans for an Aggregator face-lift? :)

We love contributions! :)
http://activemq.apache.org/camel/contributing.html

> I attach an imaginative testcase :O) with some comments on FIXMEs

I wonder if we could create a StreamAggregator that works kinda like
the StreamResequencer that Martin Krasser contributed; where rather
than using a batch size, the aggregator could use some counters on the
message to know how many messages to consume before completing the
aggregation window?

Another approach is to persist the message state to a database to
store the previous state then using some kind of predicate to
determine when the aggregation has been completed so that it can be
sent.

e.g. if you want to split an Invoice message into LineItems then later
on aggregate it; you might want to store the LineItems in some
persistent table somewhere and only send the aggregated Invoice
message when all the LineItems have been received. This is a little
like the BAM stuff...
http://activemq.apache.org/camel/bam.html

--
James
-------
http://macstrac.blogspot.com/

Open Source SOA
http://open.iona.com
Reply | Threaded
Open this post in threaded view
|

Re: Aggregator strategies (again)

jstrachan
In reply to this post by almilo
BTW Alberto I've added your test case to the distro so folks can
noodle on it and see if we can improve things some. See
AlbertoAggregatorTest in the camel-core module.

I commented out the use of the set batch size; as we don't really need
to worry about that in this case, as we can just assume if the timeout
fires after a few seconds we've got to the end of any possible batch.

The tricky thing is knowing when you're at the end of the batch
really. One simple solution would be to add some kinda predicate to
detect batch-completion. For example  when we split messages we could
record how many split messages there are and each messages' counter so
that we know when we've aggregated them all together again?


On 05/11/2007, almilo <[hidden email]> wrote:

>
> Hi all:
>
> First, Camel is a very interesting project. Congrats to the team!!
>
> Now the question...
>
> I´ve seen a post about better support for Aggregator pattern but with no
> answer. The testcases seem to be very simple and I think this is a really
> relevant pattern for distributed processing. But, being based in "a priori"
> batch size and timeout it lacks value for most of the uses I can think of.
>
> Any plans for an Aggregator face-lift? :)
>
> I attach an imaginative testcase :O) with some comments on FIXMEs
>
> Lot of thanks, Alberto Mijares
>
> // ------------ START ---------------//
> package org.fundacionctic.taw;
>
> import java.util.ArrayList;
> import java.util.HashMap;
> import java.util.List;
> import java.util.Map;
>
> import org.apache.camel.CamelTemplate;
> import org.apache.camel.ContextTestSupport;
> import org.apache.camel.Exchange;
> import org.apache.camel.Message;
> import org.apache.camel.Processor;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.component.mock.MockEndpoint;
> import org.apache.camel.model.AggregatorType;
> import org.apache.camel.processor.aggregate.AggregationStrategy;
> import org.apache.commons.logging.Log;
> import org.apache.commons.logging.LogFactory;
>
> public class AggregatorTest extends ContextTestSupport {
>
>         private Log log = LogFactory.getLog(this.getClass());
>
>         private static final String SURNAME_HEADER = "surname";
>
>         private static final String TYPE_HEADER = "type";
>
>         private static final String BROTHERS_TYPE = "brothers";
>
>         public void testAggregator() throws Exception {
>
>                 String allNames = "Harpo Marx,Fiodor Karamazov,Chico Marx,Ivan
> Karamazov,Groucho Marx,Alexei Karamazov,Dimitri Karamazov";
>
>                 List<String> marxBrothers = new ArrayList<String>();
>                 marxBrothers.add("Harpo");
>                 marxBrothers.add("Chico");
>                 marxBrothers.add("Groucho");
>
>                 List<String> karamazovBrothers = new ArrayList<String>();
>                 karamazovBrothers.add("Fiodor");
>                 karamazovBrothers.add("Ivan");
>                 karamazovBrothers.add("Alexei");
>                 karamazovBrothers.add("Dimitri");
>
>                 Map<String, List> allBrothers = new HashMap<String, List>();
>                 allBrothers.put("Marx", marxBrothers);
>                 allBrothers.put("Karamazov", karamazovBrothers);
>
>                 MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result",
>                                 MockEndpoint.class);
>                 resultEndpoint.expectedMessageCount(1);
>                 resultEndpoint.expectedBodiesReceived(allBrothers);
>
>                 CamelTemplate template = new CamelTemplate(context);
>                 template.sendBody("direct:start", allNames);
>
>                 resultEndpoint.assertIsSatisfied();
>
>         }
>
>         @Override
>         protected RouteBuilder createRouteBuilder() throws Exception {
>
>                 return new RouteBuilder() {
>
>                         private void debugIn(String stringId, Exchange oldExchange,
>                                         Exchange newExchange) {
>
>                                 log.debug(stringId + " old headers in: "
>                                                 + oldExchange.getIn().getHeaders());
>                                 log.debug(stringId + " old body in: "
>                                                 + oldExchange.getIn().getBody());
>                                 log.debug(stringId + " new headers in: "
>                                                 + newExchange.getIn().getHeaders());
>                                 log.debug(stringId + " new body in: "
>                                                 + newExchange.getIn().getBody());
>
>                         }
>
>                         private void debugOut(String stringId, Exchange exchange) {
>
>                                 log.debug(stringId + " old headers out: "
>                                                 + exchange.getIn().getHeaders());
>                                 log.debug(stringId + " old body out: "
>                                                 + exchange.getIn().getBody());
>
>                         }
>
>                         AggregationStrategy surnameAggregator = new AggregationStrategy() {
>
>                                 public Exchange aggregate(Exchange oldExchange,
>                                                 Exchange newExchange) {
>
>                                         debugIn("Surname Aggregator", oldExchange, newExchange);
>
>                                         Message oldIn = oldExchange.getIn();
>                                         Message newIn = newExchange.getIn();
>
>                                         List<String> brothers = null;
>                                         if (oldIn.getBody() instanceof List) {
>
>                                                 brothers = oldIn.getBody(List.class);
>                                                 brothers.add(newIn.getBody(String.class));
>
>                                         } else {
>
>                                                 brothers = new ArrayList<String>();
>                                                 brothers.add(oldIn.getBody(String.class));
>                                                 brothers.add(newIn.getBody(String.class));
>                                                 oldExchange.getIn().setBody(brothers);
>
>                                         } // else
>
>                                         debugOut("Surname Aggregator", oldExchange);
>
>                                         return oldExchange;
>
>                                 }
>
>                         };
>
>                         AggregationStrategy brothersAggregator = new AggregationStrategy() {
>
>                                 public Exchange aggregate(Exchange oldExchange,
>                                                 Exchange newExchange) {
>
>                                         debugIn("Brothers Aggregator", oldExchange, newExchange);
>
>                                         Message oldIn = oldExchange.getIn();
>                                         Message newIn = newExchange.getIn();
>
>                                         Map<String, List> brothers = null;
>                                         if (oldIn.getBody() instanceof Map) {
>
>                                                 brothers = oldIn.getBody(Map.class);
>                                                 brothers.put(newIn.getHeader(SURNAME_HEADER,
>                                                                 String.class), newIn.getBody(List.class));
>
>                                         } else {
>
>                                                 brothers = new HashMap<String, List>();
>                                                 brothers.put(oldIn.getHeader(SURNAME_HEADER, String.class),
>                                                                 oldIn.getBody(List.class));
>                                                 brothers.put(newIn.getHeader(SURNAME_HEADER,
>                                                                 String.class), newIn.getBody(List.class));
>                                                 oldExchange.getIn().setBody(brothers);
>
>                                         } // else
>
>                                         debugOut("Brothers Aggregator", oldExchange);
>
>                                         return oldExchange;
>
>                                 }
>
>                         };
>
>                         @Override
>                         public void configure() throws Exception {
>
>                                 from("direct:start")
>                                                 // Separate people
>                                                 .splitter(bodyAs(String.class).tokenize(",")).process(
>
>                                                 // Split the name, erase the surname and put it in a
>                                                 // header
>                                                                 new Processor() {
>
>                                                                         public void process(Exchange exchange)
>                                                                                         throws Exception {
>
>                                                                                 String[] parts = exchange.getIn()
>                                                                                                 .getBody(String.class).split(
>                                                                                                                 " ");
>                                                                                 exchange.getIn().setBody(parts[0]);
>                                                                                 exchange.getIn().setHeader(
>                                                                                                 SURNAME_HEADER, parts[1]);
>
>                                                                         } // process
>
>                                                                 }) // Processor
>
>                                                 .to("direct:joinSurnames");
>
>                                 // FIXME: This aggregator doesn´t usually fail but could also due to
> timeout
>                                 // or an incorrect batch size
>                                 // Join in a list by the surname on the header and mark as
>                                 // brothers list
>                                 from("direct:joinSurnames")
>                                 .aggregator(header(SURNAME_HEADER),
>                                                 surnameAggregator).setHeader(TYPE_HEADER,
>                                                 constant(BROTHERS_TYPE)).to("direct:joinBrothers");
>
>                                 // Join all brothers lists and remove surname and type headers
>                                 AggregatorType agg =
> from("direct:joinBrothers").aggregator(header(TYPE_HEADER),
>                                                 brothersAggregator);
>
>                                 // FIXME: If these lines get commented the test fails some times with
> different errors
>                                 // due to a timeout or incorrect batch size that must be adjusted by
> hand
>                                 // There are two brothers lists to join but we don´t know always the
> number "a priori"
>                                 agg.setBatchSize(2);
>                                 agg.setBatchTimeout(10000);
>                                 agg.removeHeader(SURNAME_HEADER)
>                                 .removeHeader(TYPE_HEADER)
>                                 .to("mock:result");
>
>                         }
>
>                 };
>
>         }
>
> }
> // ------------ END ---------------//
>
>
> --
> View this message in context: http://www.nabble.com/Aggregator-strategies-%28again%29-tf4750834s22882.html#a13584751
> Sent from the Camel - Users mailing list archive at Nabble.com.
>
>


--
James
-------
http://macstrac.blogspot.com/

Open Source SOA
http://open.iona.com
Reply | Threaded
Open this post in threaded view
|

Re: Aggregator strategies (again)

Hiram Chirino
On 11/5/07, James Strachan <[hidden email]> wrote:

> BTW Alberto I've added your test case to the distro so folks can
> noodle on it and see if we can improve things some. See
> AlbertoAggregatorTest in the camel-core module.
>
> I commented out the use of the set batch size; as we don't really need
> to worry about that in this case, as we can just assume if the timeout
> fires after a few seconds we've got to the end of any possible batch.
>
> The tricky thing is knowing when you're at the end of the batch
> really. One simple solution would be to add some kinda predicate to
> detect batch-completion. For example  when we split messages we could
> record how many split messages there are and each messages' counter so
> that we know when we've aggregated them all together again?

I wonder if the simplest way to implement the described split and
aggregate pattern could be to just combine the splitter and aggregator
into 1 delegate processor.  For example it would take it's input,
split it and forward each part to the delegate for processing.  Then
it would wait for the result of processing and aggregate those
results.  The delegate processor would know how many things it split
up so it would know how many things to aggregate.

Regards,
Hiram


>
>
> On 05/11/2007, almilo <[hidden email]> wrote:
> >
> > Hi all:
> >
> > First, Camel is a very interesting project. Congrats to the team!!
> >
> > Now the question...
> >
> > I´ve seen a post about better support for Aggregator pattern but with no
> > answer. The testcases seem to be very simple and I think this is a really
> > relevant pattern for distributed processing. But, being based in "a priori"
> > batch size and timeout it lacks value for most of the uses I can think of.
> >
> > Any plans for an Aggregator face-lift? :)
> >
> > I attach an imaginative testcase :O) with some comments on FIXMEs
> >
> > Lot of thanks, Alberto Mijares
> >
> > // ------------ START ---------------//
> > package org.fundacionctic.taw;
> >
> > import java.util.ArrayList;
> > import java.util.HashMap;
> > import java.util.List;
> > import java.util.Map;
> >
> > import org.apache.camel.CamelTemplate;
> > import org.apache.camel.ContextTestSupport;
> > import org.apache.camel.Exchange;
> > import org.apache.camel.Message;
> > import org.apache.camel.Processor;
> > import org.apache.camel.builder.RouteBuilder;
> > import org.apache.camel.component.mock.MockEndpoint;
> > import org.apache.camel.model.AggregatorType;
> > import org.apache.camel.processor.aggregate.AggregationStrategy;
> > import org.apache.commons.logging.Log;
> > import org.apache.commons.logging.LogFactory;
> >
> > public class AggregatorTest extends ContextTestSupport {
> >
> >         private Log log = LogFactory.getLog(this.getClass());
> >
> >         private static final String SURNAME_HEADER = "surname";
> >
> >         private static final String TYPE_HEADER = "type";
> >
> >         private static final String BROTHERS_TYPE = "brothers";
> >
> >         public void testAggregator() throws Exception {
> >
> >                 String allNames = "Harpo Marx,Fiodor Karamazov,Chico Marx,Ivan
> > Karamazov,Groucho Marx,Alexei Karamazov,Dimitri Karamazov";
> >
> >                 List<String> marxBrothers = new ArrayList<String>();
> >                 marxBrothers.add("Harpo");
> >                 marxBrothers.add("Chico");
> >                 marxBrothers.add("Groucho");
> >
> >                 List<String> karamazovBrothers = new ArrayList<String>();
> >                 karamazovBrothers.add("Fiodor");
> >                 karamazovBrothers.add("Ivan");
> >                 karamazovBrothers.add("Alexei");
> >                 karamazovBrothers.add("Dimitri");
> >
> >                 Map<String, List> allBrothers = new HashMap<String, List>();
> >                 allBrothers.put("Marx", marxBrothers);
> >                 allBrothers.put("Karamazov", karamazovBrothers);
> >
> >                 MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:result",
> >                                 MockEndpoint.class);
> >                 resultEndpoint.expectedMessageCount(1);
> >                 resultEndpoint.expectedBodiesReceived(allBrothers);
> >
> >                 CamelTemplate template = new CamelTemplate(context);
> >                 template.sendBody("direct:start", allNames);
> >
> >                 resultEndpoint.assertIsSatisfied();
> >
> >         }
> >
> >         @Override
> >         protected RouteBuilder createRouteBuilder() throws Exception {
> >
> >                 return new RouteBuilder() {
> >
> >                         private void debugIn(String stringId, Exchange oldExchange,
> >                                         Exchange newExchange) {
> >
> >                                 log.debug(stringId + " old headers in: "
> >                                                 + oldExchange.getIn().getHeaders());
> >                                 log.debug(stringId + " old body in: "
> >                                                 + oldExchange.getIn().getBody());
> >                                 log.debug(stringId + " new headers in: "
> >                                                 + newExchange.getIn().getHeaders());
> >                                 log.debug(stringId + " new body in: "
> >                                                 + newExchange.getIn().getBody());
> >
> >                         }
> >
> >                         private void debugOut(String stringId, Exchange exchange) {
> >
> >                                 log.debug(stringId + " old headers out: "
> >                                                 + exchange.getIn().getHeaders());
> >                                 log.debug(stringId + " old body out: "
> >                                                 + exchange.getIn().getBody());
> >
> >                         }
> >
> >                         AggregationStrategy surnameAggregator = new AggregationStrategy() {
> >
> >                                 public Exchange aggregate(Exchange oldExchange,
> >                                                 Exchange newExchange) {
> >
> >                                         debugIn("Surname Aggregator", oldExchange, newExchange);
> >
> >                                         Message oldIn = oldExchange.getIn();
> >                                         Message newIn = newExchange.getIn();
> >
> >                                         List<String> brothers = null;
> >                                         if (oldIn.getBody() instanceof List) {
> >
> >                                                 brothers = oldIn.getBody(List.class);
> >                                                 brothers.add(newIn.getBody(String.class));
> >
> >                                         } else {
> >
> >                                                 brothers = new ArrayList<String>();
> >                                                 brothers.add(oldIn.getBody(String.class));
> >                                                 brothers.add(newIn.getBody(String.class));
> >                                                 oldExchange.getIn().setBody(brothers);
> >
> >                                         } // else
> >
> >                                         debugOut("Surname Aggregator", oldExchange);
> >
> >                                         return oldExchange;
> >
> >                                 }
> >
> >                         };
> >
> >                         AggregationStrategy brothersAggregator = new AggregationStrategy() {
> >
> >                                 public Exchange aggregate(Exchange oldExchange,
> >                                                 Exchange newExchange) {
> >
> >                                         debugIn("Brothers Aggregator", oldExchange, newExchange);
> >
> >                                         Message oldIn = oldExchange.getIn();
> >                                         Message newIn = newExchange.getIn();
> >
> >                                         Map<String, List> brothers = null;
> >                                         if (oldIn.getBody() instanceof Map) {
> >
> >                                                 brothers = oldIn.getBody(Map.class);
> >                                                 brothers.put(newIn.getHeader(SURNAME_HEADER,
> >                                                                 String.class), newIn.getBody(List.class));
> >
> >                                         } else {
> >
> >                                                 brothers = new HashMap<String, List>();
> >                                                 brothers.put(oldIn.getHeader(SURNAME_HEADER, String.class),
> >                                                                 oldIn.getBody(List.class));
> >                                                 brothers.put(newIn.getHeader(SURNAME_HEADER,
> >                                                                 String.class), newIn.getBody(List.class));
> >                                                 oldExchange.getIn().setBody(brothers);
> >
> >                                         } // else
> >
> >                                         debugOut("Brothers Aggregator", oldExchange);
> >
> >                                         return oldExchange;
> >
> >                                 }
> >
> >                         };
> >
> >                         @Override
> >                         public void configure() throws Exception {
> >
> >                                 from("direct:start")
> >                                                 // Separate people
> >                                                 .splitter(bodyAs(String.class).tokenize(",")).process(
> >
> >                                                 // Split the name, erase the surname and put it in a
> >                                                 // header
> >                                                                 new Processor() {
> >
> >                                                                         public void process(Exchange exchange)
> >                                                                                         throws Exception {
> >
> >                                                                                 String[] parts = exchange.getIn()
> >                                                                                                 .getBody(String.class).split(
> >                                                                                                                 " ");
> >                                                                                 exchange.getIn().setBody(parts[0]);
> >                                                                                 exchange.getIn().setHeader(
> >                                                                                                 SURNAME_HEADER, parts[1]);
> >
> >                                                                         } // process
> >
> >                                                                 }) // Processor
> >
> >                                                 .to("direct:joinSurnames");
> >
> >                                 // FIXME: This aggregator doesn´t usually fail but could also due to
> > timeout
> >                                 // or an incorrect batch size
> >                                 // Join in a list by the surname on the header and mark as
> >                                 // brothers list
> >                                 from("direct:joinSurnames")
> >                                 .aggregator(header(SURNAME_HEADER),
> >                                                 surnameAggregator).setHeader(TYPE_HEADER,
> >                                                 constant(BROTHERS_TYPE)).to("direct:joinBrothers");
> >
> >                                 // Join all brothers lists and remove surname and type headers
> >                                 AggregatorType agg =
> > from("direct:joinBrothers").aggregator(header(TYPE_HEADER),
> >                                                 brothersAggregator);
> >
> >                                 // FIXME: If these lines get commented the test fails some times with
> > different errors
> >                                 // due to a timeout or incorrect batch size that must be adjusted by
> > hand
> >                                 // There are two brothers lists to join but we don´t know always the
> > number "a priori"
> >                                 agg.setBatchSize(2);
> >                                 agg.setBatchTimeout(10000);
> >                                 agg.removeHeader(SURNAME_HEADER)
> >                                 .removeHeader(TYPE_HEADER)
> >                                 .to("mock:result");
> >
> >                         }
> >
> >                 };
> >
> >         }
> >
> > }
> > // ------------ END ---------------//
> >
> >
> > --
> > View this message in context: http://www.nabble.com/Aggregator-strategies-%28again%29-tf4750834s22882.html#a13584751
> > Sent from the Camel - Users mailing list archive at Nabble.com.
> >
> >
>
>
> --
> James
> -------
> http://macstrac.blogspot.com/
>
> Open Source SOA
> http://open.iona.com
>


--
Regards,
Hiram

Blog: http://hiramchirino.com
Reply | Threaded
Open this post in threaded view
|

Re: Aggregator strategies (again)

jstrachan
On 05/11/2007, Hiram Chirino <[hidden email]> wrote:

> On 11/5/07, James Strachan <[hidden email]> wrote:
> > BTW Alberto I've added your test case to the distro so folks can
> > noodle on it and see if we can improve things some. See
> > AlbertoAggregatorTest in the camel-core module.
> >
> > I commented out the use of the set batch size; as we don't really need
> > to worry about that in this case, as we can just assume if the timeout
> > fires after a few seconds we've got to the end of any possible batch.
> >
> > The tricky thing is knowing when you're at the end of the batch
> > really. One simple solution would be to add some kinda predicate to
> > detect batch-completion. For example  when we split messages we could
> > record how many split messages there are and each messages' counter so
> > that we know when we've aggregated them all together again?
>
> I wonder if the simplest way to implement the described split and
> aggregate pattern could be to just combine the splitter and aggregator
> into 1 delegate processor.  For example it would take it's input,
> split it and forward each part to the delegate for processing.  Then
> it would wait for the result of processing and aggregate those
> results.  The delegate processor would know how many things it split
> up so it would know how many things to aggregate.

Thats a great idea for a nice simple solution that solves many
problems :) Increasingly it might help if we can try to roll up
smaller patterns into larger patterns (or protocols?) so combining a
splitter and aggregator together into a single unit might help
simplify things for folks (and make it easier to avoid making
mistakes).

I guess the downside of this approach is that only one large message
can be processed at once. For example using Invoices and LineItems as
an example; if you had a pool of processors, you could only let folks
process one Invoice at once (as the thread would block before sending
any other invoice's LineItems to be processed) but that might avoid
complications.

--
James
-------
http://macstrac.blogspot.com/

Open Source SOA
http://open.iona.com
Reply | Threaded
Open this post in threaded view
|

Re: Aggregator strategies (again)

almilo
In reply to this post by jstrachan
Hi James:

James.Strachan wrote
BTW Alberto I've added your test case to the distro so folks can
noodle on it and see if we can improve things some. See
AlbertoAggregatorTest in the camel-core module.
Many thanks, the example is a little strange but fun (hope so... :O)

James.Strachan wrote
The tricky thing is knowing when you're at the end of the batch
really. One simple solution would be to add some kinda predicate to
detect batch-completion. For example  when we split messages we could
record how many split messages there are and each messages' counter so
that we know when we've aggregated them all together again?
I reviewed all other replies and seem too complicated for me... (I´m new to Camel and I chose it by simplicity = patterns + POJOs :)

I was thinking more in this simple 10.000 feet high way that you pointed out above:
 - Because this is a "patterns" project I think that when "splitting" messages would be nice to count or better correlate the pieces alltogether. Probably when theres a "split" later there is a "join".

 - I interpreted that the batchprocessor is conservative and doesn´t call AggregationStrategy until it is really needed. It is very nice to have but could it be possible to do AggregationStrategy more like this?:

        // newExchange: exchange that fullfils the AggregationExpression and needs to be processed
        // AggregationContext: managed space to store intermediate data
        // Returns: the resulting exchange when finished, null when not finished
        public Exchange aggregate(Exchange newExchange, AggregationContext context);

Ideas:
 - When a Message is splitted a "sourceMessage" property could be populated. The splitter correlates all new messages through this value. Then a default aggregation expression could be constructed. This is obviously too simple and something more robust is needed but seems natural to split and join and now this needs a lot of "glue" headers or the like to be done. Parallel processing pipelines through POJOs and SEDA seem very easy to be built with Camel DSL and are very powerful.

 - Using the AggregationContext you can store partial results and, when finished (you control the batch),  the new Exchange is built and returned. Actual strategy seems also possible to be implemented as a specialization. Through context should also possible to know when there are more messages pending and when started the batch so you can keep a rate of returning exchanges or wait a little bit more.

 - This context could be transient or persistent (configured by DSL or XML) and maybe bound to a transaction to avoid crash fails (UnitOfWork??). Could camel manage that persistence? <- sounds not good for simplicity ;O)

I feel that these suggestions have a lot of problems with the issues of concurrency and concepts in Camel. Am I wrong?

James, if this is good maybe I´m implementing a prototype and posting it :O)

Thanks, Alberto.
Reply | Threaded
Open this post in threaded view
|

Re: Aggregator strategies (again)

Hiram Chirino
In reply to this post by jstrachan
On 11/5/07, James Strachan <[hidden email]> wrote:

> On 05/11/2007, Hiram Chirino <[hidden email]> wrote:
> > On 11/5/07, James Strachan <[hidden email]> wrote:
> > > BTW Alberto I've added your test case to the distro so folks can
> > > noodle on it and see if we can improve things some. See
> > > AlbertoAggregatorTest in the camel-core module.
> > >
> > > I commented out the use of the set batch size; as we don't really need
> > > to worry about that in this case, as we can just assume if the timeout
> > > fires after a few seconds we've got to the end of any possible batch.
> > >
> > > The tricky thing is knowing when you're at the end of the batch
> > > really. One simple solution would be to add some kinda predicate to
> > > detect batch-completion. For example  when we split messages we could
> > > record how many split messages there are and each messages' counter so
> > > that we know when we've aggregated them all together again?
> >
> > I wonder if the simplest way to implement the described split and
> > aggregate pattern could be to just combine the splitter and aggregator
> > into 1 delegate processor.  For example it would take it's input,
> > split it and forward each part to the delegate for processing.  Then
> > it would wait for the result of processing and aggregate those
> > results.  The delegate processor would know how many things it split
> > up so it would know how many things to aggregate.
>
> Thats a great idea for a nice simple solution that solves many
> problems :) Increasingly it might help if we can try to roll up
> smaller patterns into larger patterns (or protocols?) so combining a
> splitter and aggregator together into a single unit might help
> simplify things for folks (and make it easier to avoid making
> mistakes).
>
> I guess the downside of this approach is that only one large message
> can be processed at once. For example using Invoices and LineItems as
> an example; if you had a pool of processors, you could only let folks
> process one Invoice at once (as the thread would block before sending
> any other invoice's LineItems to be processed) but that might avoid
> complications.
>

Actually that would not be true.  It should be safe to call
Splitter/Aggregator concurrently since it's state would be local the
exchange being processed.  For example:

from("jms:queue:input").thread(5).splitAndAggregate(...)...

And the processing of the split parts would not have to happen in
sequentially either if the  Splitter/Aggregator was implemented an
AsyncProcessor.  It could do something like:

  ...splitAndAggregate(...).thread(5).processor(stepProcessor);

basically it would sequentially send all the steps to the thread
processor but those would return right away since an async thread
would be started to complete process each step.  The
Splitter/Aggregator  would then just listen to the async completion
events so that it could aggregate the results and return when all the
steps have been aggregated.



> --
> James
> -------
> http://macstrac.blogspot.com/
>
> Open Source SOA
> http://open.iona.com
>


--
Regards,
Hiram

Blog: http://hiramchirino.com
Reply | Threaded
Open this post in threaded view
|

Re: Aggregator strategies (again)

jstrachan
Oooh! I forgot about that! Thats a great idea! :) Lets do it!

I guess a split/aggregate pair is gonna be very common.

BTW you could argue that split/aggregate is kinda like map/reduce too...


On 05/11/2007, Hiram Chirino <[hidden email]> wrote:

> On 11/5/07, James Strachan <[hidden email]> wrote:
> > On 05/11/2007, Hiram Chirino <[hidden email]> wrote:
> > > On 11/5/07, James Strachan <[hidden email]> wrote:
> > > > BTW Alberto I've added your test case to the distro so folks can
> > > > noodle on it and see if we can improve things some. See
> > > > AlbertoAggregatorTest in the camel-core module.
> > > >
> > > > I commented out the use of the set batch size; as we don't really need
> > > > to worry about that in this case, as we can just assume if the timeout
> > > > fires after a few seconds we've got to the end of any possible batch.
> > > >
> > > > The tricky thing is knowing when you're at the end of the batch
> > > > really. One simple solution would be to add some kinda predicate to
> > > > detect batch-completion. For example  when we split messages we could
> > > > record how many split messages there are and each messages' counter so
> > > > that we know when we've aggregated them all together again?
> > >
> > > I wonder if the simplest way to implement the described split and
> > > aggregate pattern could be to just combine the splitter and aggregator
> > > into 1 delegate processor.  For example it would take it's input,
> > > split it and forward each part to the delegate for processing.  Then
> > > it would wait for the result of processing and aggregate those
> > > results.  The delegate processor would know how many things it split
> > > up so it would know how many things to aggregate.
> >
> > Thats a great idea for a nice simple solution that solves many
> > problems :) Increasingly it might help if we can try to roll up
> > smaller patterns into larger patterns (or protocols?) so combining a
> > splitter and aggregator together into a single unit might help
> > simplify things for folks (and make it easier to avoid making
> > mistakes).
> >
> > I guess the downside of this approach is that only one large message
> > can be processed at once. For example using Invoices and LineItems as
> > an example; if you had a pool of processors, you could only let folks
> > process one Invoice at once (as the thread would block before sending
> > any other invoice's LineItems to be processed) but that might avoid
> > complications.
> >
>
> Actually that would not be true.  It should be safe to call
> Splitter/Aggregator concurrently since it's state would be local the
> exchange being processed.  For example:
>
> from("jms:queue:input").thread(5).splitAndAggregate(...)...
>
> And the processing of the split parts would not have to happen in
> sequentially either if the  Splitter/Aggregator was implemented an
> AsyncProcessor.  It could do something like:
>
>   ...splitAndAggregate(...).thread(5).processor(stepProcessor);
>
> basically it would sequentially send all the steps to the thread
> processor but those would return right away since an async thread
> would be started to complete process each step.  The
> Splitter/Aggregator  would then just listen to the async completion
> events so that it could aggregate the results and return when all the
> steps have been aggregated.
>
>
>
> > --
>
> > James
> > -------
> > http://macstrac.blogspot.com/
> >
> > Open Source SOA
> > http://open.iona.com
> >
>
>
> --
> Regards,
> Hiram
>
> Blog: http://hiramchirino.com
>


--
James
-------
http://macstrac.blogspot.com/

Open Source SOA
http://open.iona.com
Reply | Threaded
Open this post in threaded view
|

Re: Aggregator strategies (again)

almilo
In reply to this post by jstrachan
James.Strachan wrote
> I wonder if the simplest way to implement the described split and
> aggregate pattern could be to just combine the splitter and aggregator
> into 1 delegate processor.  For example it would take it's input,
> split it and forward each part to the delegate for processing.  Then
> it would wait for the result of processing and aggregate those
> results.  The delegate processor would know how many things it split
> up so it would know how many things to aggregate.

Thats a great idea for a nice simple solution that solves many
problems :) Increasingly it might help if we can try to roll up
smaller patterns into larger patterns (or protocols?) so combining a
splitter and aggregator together into a single unit might help
simplify things for folks (and make it easier to avoid making
mistakes).

I guess the downside of this approach is that only one large message
can be processed at once. For example using Invoices and LineItems as
an example; if you had a pool of processors, you could only let folks
process one Invoice at once (as the thread would block before sending
any other invoice's LineItems to be processed) but that might avoid
complications.
Not sure if I got it but seems simpler and higher level (more like patterns :). Could anybody point me to an example, similar testcase or so with a DelegateProcessor?

Tahnks a lot, Alberto.
Reply | Threaded
Open this post in threaded view
|

Re: Aggregator strategies (again)

jstrachan
In reply to this post by almilo
On 05/11/2007, almilo <[hidden email]> wrote:
> James.Strachan wrote:
> >
> > BTW Alberto I've added your test case to the distro so folks can
> > noodle on it and see if we can improve things some. See
> > AlbertoAggregatorTest in the camel-core module.
> >
>
> Many thanks, the example is a little strange but fun (hope so... :O)

Yeah :)


> James.Strachan wrote:
> >
> > The tricky thing is knowing when you're at the end of the batch
> > really. One simple solution would be to add some kinda predicate to
> > detect batch-completion. For example  when we split messages we could
> > record how many split messages there are and each messages' counter so
> > that we know when we've aggregated them all together again?
> >
>
> I reviewed all other replies and seem too complicated for me... (I´m new to
> Camel and I chose it by simplicity = patterns + POJOs :)

:)

We definitely should strive to make things as simple as they possibly can be.


> I was thinking more in this simple 10.000 feet high way that you pointed out
> above:
>  - Because this is a "patterns" project I think that when "splitting"
> messages would be nice to count or better correlate the pieces alltogether.
> Probably when theres a "split" later there is a "join".

Yeah. I've recently patched the splitter to keep track of each
message's counter and the total number of messages split. Though we
don't as yet assign a unique message ID to be used for correlating
later on. I guess we could do that?


>  - I interpreted that the batchprocessor is conservative and doesn´t call
> AggregationStrategy until it is really needed. It is very nice to have but
> could it be possible to do AggregationStrategy more like this?:
>
>         // newExchange: exchange that fullfils the AggregationExpression and needs
> to be processed
>         // AggregationContext: managed space to store intermediate data
>         // Returns: the resulting exchange when finished, null when not finished
>         public Exchange aggregate(Exchange newExchange, AggregationContext
> context);

Just a thought - you can use an Exchange to hold any intermediate
data. So you can take the old and new exchange and create a totally
new empty exchange with some calculated values inside it if you like.
Or you could just apply the new exchange onto the old exchange (or
vice versa). This is why we made the strategy API take both new and
old exchanges and returns an exchange.

So am not sure if we need an AggregationContext per se - I think it'd
be easier to reuse stuff if we just used an Exchange (possibly the
same or possibly new) to store any state.

The one downside this gives is there's no easy way to return the
completed/not-completed state, unless we use a property / header.

So maybe we could combine approaches; using an AggregationState bean
which is something which creates the response Exchange and also
determines whether or not the aggregation is complete - then an
implementor of a strategy could just aggregate exchanges together and
store that; or use a separate exchange, or use totally separate state
and just create the final exchange dynamically etc.

So maybe something like

public interface AggregationState extends Processor {
  // default state
  boolean isCompleted();
  Exchange getAggregatedExchange();
}

public interface AggregationStrategy {
  AggregationState createAggregationState(Exchange firstMessage);
}


Then the state is created on the first message exchange received; then
after that the process() method is called on the  AggregationState to
update itself and after its completed, the getAggregatedExchange() is
used to determine what the aggregated exchange looks like?


> Ideas:
>  - When a Message is splitted a "sourceMessage" property could be populated.
> The splitter correlates all new messages through this value. Then a default
> aggregation expression could be constructed.

The Aggregator assumes there's some expression used to correlate the
messages; but the Splitter doesn't necessarily attach a unique message
header to each split message. We should maybe add that?


> This is obviously too simple
> and something more robust is needed but seems natural to split and join and
> now this needs a lot of "glue" headers or the like to be done. Parallel
> processing pipelines through POJOs and SEDA seem very easy to be built with
> Camel DSL and are very powerful.
>
>  - Using the AggregationContext you can store partial results and, when
> finished (you control the batch),  the new Exchange is built and returned.
> Actual strategy seems also possible to be implemented as a specialization.
> Through context should also possible to know when there are more messages
> pending and when started the batch so you can keep a rate of returning
> exchanges or wait a little bit more.
>
>  - This context could be transient or persistent (configured by DSL or XML)
> and maybe bound to a transaction to avoid crash fails (UnitOfWork??). Could
> camel manage that persistence? <- sounds not good for simplicity ;O)

Hopefully we could create a simple-to-use persistence :). I guess a
persistent strategy could use dependency injection in Spring to deal
with the JPA stuff; so keeping camel nice and simple?

Using transactions is also cool - the only problem is if you have
messages for different correlationIDs mixed up. e.g. imagine if you
split all messages into 2 pieces. Then for A we have A1, A2 and B we
have B1, B2. The problems' gonna be if you receive them like A1, B1,
A2, C1, B2, D1, C2....

you're never gonna reach a boundary where you can commit without
possibly loosing part of another aggregation. But I guess you could
put a resequencer in front to ensure order-across-correlationIDs is
ensured.


> I feel that these suggestions have a lot of problems with the issues of
> concurrency and concepts in Camel. Am I wrong?
>
> James, if this is good maybe I´m implementing a prototype and posting it :O)

Great! :)

I'm still digesting all the recent thoughts on this topic - e.g. I
like the simplicity of Hiram's idea - I feel like we're making
progress & some great ideas are flowing. Hopefully we can end up with
some really powerful and flexible - yet simple to use solutions.

--
James
-------
http://macstrac.blogspot.com/

Open Source SOA
http://open.iona.com
Reply | Threaded
Open this post in threaded view
|

Re: Aggregator strategies (again)

almilo
In reply to this post by Hiram Chirino
Hiram Chirino wrote
> > I wonder if the simplest way to implement the described split and
> > aggregate pattern could be to just combine the splitter and aggregator
> > into 1 delegate processor.  For example it would take it's input,
> > split it and forward each part to the delegate for processing.  Then
> > it would wait for the result of processing and aggregate those
> > results.  The delegate processor would know how many things it split
> > up so it would know how many things to aggregate.
>
> Thats a great idea for a nice simple solution that solves many
> problems :) Increasingly it might help if we can try to roll up
> smaller patterns into larger patterns (or protocols?) so combining a
> splitter and aggregator together into a single unit might help
> simplify things for folks (and make it easier to avoid making
> mistakes).
>
> I guess the downside of this approach is that only one large message
> can be processed at once. For example using Invoices and LineItems as
> an example; if you had a pool of processors, you could only let folks
> process one Invoice at once (as the thread would block before sending
> any other invoice's LineItems to be processed) but that might avoid
> complications.
>

Actually that would not be true.  It should be safe to call
Splitter/Aggregator concurrently since it's state would be local the
exchange being processed.  For example:

from("jms:queue:input").thread(5).splitAndAggregate(...)...

And the processing of the split parts would not have to happen in
sequentially either if the  Splitter/Aggregator was implemented an
AsyncProcessor.  It could do something like:

  ...splitAndAggregate(...).thread(5).processor(stepProcessor);

basically it would sequentially send all the steps to the thread
processor but those would return right away since an async thread
would be started to complete process each step.  The
Splitter/Aggregator  would then just listen to the async completion
events so that it could aggregate the results and return when all the
steps have been aggregated.
Completely agree with this approach, but this creates a strong dependency on the DSL, doesn´t? I mean, you have to do always things in this way:
from("someEndpoint").<concurrency.>splitAndAggregate().<concurrency.>withThisProcessor().to("someOtherEndpoint");

Isn´t this too coupled? Or the DSL is flexible enough for doing something equivalent to...:

from("seda:lotsOfWork").multicastSplitter("markedWithThisToken").to("seda:oneProcessor", "seda:anotherProcessor");

from("seda:oneProcessor").beanRef("doSomethingLenghty1", "withPOJOs")
.to("seda:joinResults");

from("seda:anotherProcessor").beanRef("doSomethingLenghty2", "withBeans")
.multicast().to("seda:joinResults", "jpa:butSaveThisOnes");

from("seda:joinResults").multicastAggregator("markedWithThisToken", nonBatchStrategy)
.to("direct:result");

(Omitted threads and details... :)

Well, it´s simple but very powerful and flexible... like Camel? ;O)

Thanks, Alberto.