splitter/aggregator scenatio

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

splitter/aggregator scenatio

RomKal
Hello!

I wanted to implement splitter/aggregator scenario and I face following problem:

When I split the original message I would like to aggregate results in
the same order - to do this I have to somehow know which message was
first, second, .. then I can use resequencer at the end to order them
and aggregate in right order.
Now I don't see any way to 'number' those messages that go from splitter.

Another thing is aggregation - I' affraid I cannot use aggregator
because few features of aggregator.
In such scenario I cannot dynamically specify how many messages to
aggregate (If I splitted to n parts i want to merge n responses) so
I'll have to wait until timeout.
Moreover timeout in BatchProcessor is something that is not started by
first message from the bath - timeouts just tick all the time and they
don't care when messages arrive.

Is there any ready solution for my problem or it just requires
creating new classes that are similar to BatchProcessor where timeout
is 'event based' and we can use some Expression to define number of
elements to collect.

Moreover maybe it is possible to set some standard header in splitter
to mark the number of the message produced? In my case to know how
many messages we have can be evaluated using xpath and set as a header
before splitter.

Thank you for all your thoughts,
Romek
Reply | Threaded
Open this post in threaded view
|

Re: splitter/aggregator scenatio

jstrachan
On 31/10/2007, Roman Kalukiewicz <[hidden email]> wrote:
> Hello!
>
> I wanted to implement splitter/aggregator scenario and I face following problem:
>
> When I split the original message I would like to aggregate results in
> the same order - to do this I have to somehow know which message was
> first, second, .. then I can use resequencer at the end to order them
> and aggregate in right order.
> Now I don't see any way to 'number' those messages that go from splitter.

See below... I've patched the Splitter...


> Another thing is aggregation - I' affraid I cannot use aggregator
> because few features of aggregator.
> In such scenario I cannot dynamically specify how many messages to
> aggregate (If I splitted to n parts i want to merge n responses) so
> I'll have to wait until timeout.

Yeah - we should allow some kinda Predicate to determine whether a
batch has been completed or not.

Right now the implementation is kinda simplistic; we could change it
to send a fully aggregated message as soon as a set of messages have
been fully aggregated - and not really have the concept of a batch.
Though the worry is we end up with a monster RAM collection of
messages waiting to be aggregated.

As mentioned in the other thread, we may want to use a database to
deal with massive numbers of messages which aggregate over a long
period of time.
http://www.nabble.com/Re%3A-Aggregator-strategies-%28again%29-p13587184s22882.html


> Moreover timeout in BatchProcessor is something that is not started by
> first message from the bath - timeouts just tick all the time and they
> don't care when messages arrive.
>
> Is there any ready solution for my problem or it just requires
> creating new classes that are similar to BatchProcessor where timeout
> is 'event based' and we can use some Expression to define number of
> elements to collect.

Yeah - I think we need to refactor BatchProcessor to use some kinda
pluggable predicate to determine when a batch is completed.

I've just added a little isBatchCompleted(index) method so a derived
class could add some extra predicates to decide if a batch is
completed.

I've also had a little experiment adding a Predicate to the Aggregator
and making a PredicateAggregatorCollection which only adds exchanges
to the collection to be dispatched when the predicate matches. I've no
idea if it works or anything - I've not written any tests yet :)

I wondered if it'd be of any use to you as a base to build on to
experiment with aggregations using custom predicates to determine the
completion? If its of no use, its easy to delete later :)

I've also raised this JIRA to track the predicate based Aggregator idea
http://issues.apache.org/activemq/browse/CAMEL-207


> Moreover maybe it is possible to set some standard header in splitter
> to mark the number of the message produced? In my case to know how
> many messages we have can be evaluated using xpath and set as a header
> before splitter.

Thats a great idea!

I've raised
http://issues.apache.org/activemq/browse/CAMEL-206

and just committed a test case and patch for this one.


> Thank you for all your thoughts,

And thank you! :)
--
James
-------
http://macstrac.blogspot.com/

Open Source SOA
http://open.iona.com