How to use Aggregator and AggregationCollection?

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

How to use Aggregator and AggregationCollection?

huntc
Hi there,

Firstly, congrats on Camel. I've been using other ESBs and I'm really liking the way this one hangs together.

However, I have a problem and after lots of trawling I've not been able to sort this one for myself.

I'm wanting to read a bunch of files from a directory and then aggregate all of the files into a collection and then process that collection.

How to do this?

Here's my config so far:

	from(
			"file://target/classes/timetables?noop=true&consumer.recursive=true")
			.aggregator(new AggregationCollection(constant(true), this)).constant(true)
			.process(new Processor() {
				public void process(Exchange e) {
					System.out.println("Received exchange: " + e.getIn());
				}
			});
... and my strategy looks like:
	public Exchange aggregate(Exchange arg0, Exchange arg1) {
		return arg1;
	}
Now, I know my strategy doesn't make sense (nor my config either), but I'm not quite sure what to do about a strategy here. My goal is simple enough: simply collect all of the files.

Help!

Kind regards,
Christopher

Reply | Threaded
Open this post in threaded view
|

RE: How to use Aggregator and AggregationCollection?

Claus Ibsen
Hi

Do you have any predicates for when the files have reached the end?
Does the file have metadata such as:
- File 1 of 3
  Data for file 1
- File 2 of 3
  Data for file 2
- File 3 of 3
  Data for file 3

Then you can use that metadata for the predicate that determines the end condition is reached.

See the loan broker example:
http://activemq.apache.org/camel/loan-broker-example.html
that shows how to use aggregate. A good example that is.

However if you files do not have such meta data (File 1 of 3) then I actually think we need to discuss it some more, how to determine the "end condition".

For instance the file consumer could be improved to determine the list of files at first, and then afterwards iterate this list so it can add this metadata itself to each exchange. Then you can determine the completed predicate. However I can see this problem for other consumers as well, so a more general solution would be nice to discuss. For instance maybe we need something extra to polling consumers so they can support callbacks for end.

Any thoughts?



Med venlig hilsen
 
Claus Ibsen
......................................
Silverbullet
Skovsgårdsvænget 21
8362 Hørning
Tlf. +45 2962 7576
Web: www.silverbullet.dk

-----Original Message-----
From: huntc [mailto:[hidden email]]
Sent: 30. september 2008 13:02
To: [hidden email]
Subject: How to use Aggregator and AggregationCollection?


Hi there,

Firstly, congrats on Camel. I've been using other ESBs and I'm really liking
the way this one hangs together.

However, I have a problem and after lots of trawling I've not been able to
sort this one for myself.

I'm wanting to read a bunch of files from a directory and then aggregate all
of the files into a collection and then process that collection.

How to do this?

Here's my config so far:


        from(
                        "file://target/classes/timetables?noop=true&consumer.recursive=true")
                        .aggregator(new AggregationCollection(constant(true),
this)).constant(true)
                        .process(new Processor() {
                                public void process(Exchange e) {
                                        System.out.println("Received exchange: " + e.getIn());
                                }
                        });


... and my strategy looks like:


        public Exchange aggregate(Exchange arg0, Exchange arg1) {
                return arg1;
        }


Now, I know my strategy doesn't make sense (nor my config either), but I'm
not quite sure what to do about a strategy here. My goal is simple enough:
simply collect all of the files.

Help!

Kind regards,
Christopher
--
View this message in context: http://www.nabble.com/How-to-use-Aggregator-and-AggregationCollection--tp19739918s22882p19739918.html
Sent from the Camel - Users mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

RE: How to use Aggregator and AggregationCollection?

huntc
Hi Claus,

Thank you very much for your reply. My response:

Claus Ibsen wrote
Do you have any predicates for when the files have reached the end?
I believe that the aggregator's default predicate is to timeout after 1 second or 100 exchanges.

Claus Ibsen wrote
See the loan broker example:
http://activemq.apache.org/camel/loan-broker-example.html
that shows how to use aggregate. A good example that is.
Thank you. I have studied this example.

Claus Ibsen wrote
For instance the file consumer could be improved to determine the list of files at first, and then afterwards iterate this list so it can add this metadata itself to each exchange. Then you can determine the completed predicate. However I can see this problem for other consumers as well, so a more general solution would be nice to discuss. For instance maybe we need something extra to polling consumers so they can support callbacks for end.
I think that what I need to do is within the realms of the aggregator we have, and perhaps the aggregator collection class that we have. I just do not understand how they can work in this instance. For example, there are no examples re. how an aggregator collection class operates.

Thanks once again.

Kind regards,
Christopher
Reply | Threaded
Open this post in threaded view
|

RE: How to use Aggregator and AggregationCollection?

Claus Ibsen
Hi

Just to inform that I have created a ticket on our issue tracker, for improving the wiki documentation for the aggregator
https://issues.apache.org/activemq/browse/CAMEL-951

BTW: The aggregator EIP is documented here:
http://activemq.apache.org/camel/aggregator.html

Usually studying the source code for the unit can help. And the javadoc for the classes.

AggregationCollection is what you will get when its complete. It has a size and iterate method where you can "iterate" all the exchanges.




Med venlig hilsen
 
Claus Ibsen
......................................
Silverbullet
Skovsgårdsvænget 21
8362 Hørning
Tlf. +45 2962 7576
Web: www.silverbullet.dk

-----Original Message-----
From: huntc [mailto:[hidden email]]
Sent: 1. oktober 2008 02:35
To: [hidden email]
Subject: RE: How to use Aggregator and AggregationCollection?


Hi Claus,

Thank you very much for your reply. My response:


Claus Ibsen wrote:
>
> Do you have any predicates for when the files have reached the end?
>

I believe that the aggregator's default predicate is to timeout after 1
second or 100 exchanges.


Claus Ibsen wrote:
>
> See the loan broker example:
> http://activemq.apache.org/camel/loan-broker-example.html
> that shows how to use aggregate. A good example that is.
>
Thank you. I have studied this example.


Claus Ibsen wrote:
>
> For instance the file consumer could be improved to determine the list of
> files at first, and then afterwards iterate this list so it can add this
> metadata itself to each exchange. Then you can determine the completed
> predicate. However I can see this problem for other consumers as well, so
> a more general solution would be nice to discuss. For instance maybe we
> need something extra to polling consumers so they can support callbacks
> for end.
>
I think that what I need to do is within the realms of the aggregator we
have, and perhaps the aggregator collection class that we have. I just do
not understand how they can work in this instance. For example, there are no
examples re. how an aggregator collection class operates.

Thanks once again.

Kind regards,
Christopher
--
View this message in context: http://www.nabble.com/How-to-use-Aggregator-and-AggregationCollection--tp19739918s22882p19752772.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

RE: How to use Aggregator and AggregationCollection?

huntc
Claus Ibsen wrote
Just to inform that I have created a ticket on our issue tracker, for improving the wiki documentation for the aggregator
https://issues.apache.org/activemq/browse/CAMEL-951
Thank you for this.

Claus Ibsen wrote
BTW: The aggregator EIP is documented here:
http://activemq.apache.org/camel/aggregator.html
I have read this too.

Claus Ibsen wrote
Usually studying the source code for the unit can help. And the javadoc for the classes.
That was my next step.

However someone must have used AggregationCollection - I cannot find a single example anywhere - there is no test case for it either. Anyone?

Thank you once again.
Reply | Threaded
Open this post in threaded view
|

RE: How to use Aggregator and AggregationCollection?

Claus Ibsen
Hi

I am busy at the moment, but please feel free to try getting the collection thing working.

I will have time to look into it next week, as I am also curious how to get it working and I think we need it documented in the wiki as well as having a unit test.

Just wanted to let you know it's not forgotten.
But with day jobs as well time is flying.



Med venlig hilsen
 
Claus Ibsen
......................................
Silverbullet
Skovsgårdsvænget 21
8362 Hørning
Tlf. +45 2962 7576
Web: www.silverbullet.dk

-----Original Message-----
From: huntc [mailto:[hidden email]]
Sent: 1. oktober 2008 09:21
To: [hidden email]
Subject: RE: How to use Aggregator and AggregationCollection?



Claus Ibsen wrote:
>
> Just to inform that I have created a ticket on our issue tracker, for
> improving the wiki documentation for the aggregator
> https://issues.apache.org/activemq/browse/CAMEL-951
>
Thank you for this.


Claus Ibsen wrote:
>
> BTW: The aggregator EIP is documented here:
> http://activemq.apache.org/camel/aggregator.html
>
I have read this too.


Claus Ibsen wrote:
>
> Usually studying the source code for the unit can help. And the javadoc
> for the classes.
>
That was my next step.

However someone must have used AggregationCollection - I cannot find a
single example anywhere - there is no test case for it either. Anyone?

Thank you once again.
--
View this message in context: http://www.nabble.com/How-to-use-Aggregator-and-AggregationCollection--tp19739918s22882p19755684.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

RE: How to use Aggregator and AggregationCollection?

huntc
Thanks to the help of Claus, I have discovered that I can use my own aggregation strategy to collect all files into an Exchange containing a collection of Exchanges. Here is my route building:
public class RouteBuilder extends org.apache.camel.builder.RouteBuilder {
	public void configure() {
		from(
			"file://target/classes/timetables?noop=true&consumer.recursive=true")
			.aggregator(new CollectAllAggregationStrategy()).constant(true)
			.process(new Processor() {
				public void process(Exchange e) {
					System.out.println("Received exchange: "
							+ e.getIn().getBody());
				}
			});
	}
}
...and here is my aggregation strategy:
(CollectAllAggregationStrategy.java)

public class CollectAllAggregationStrategy implements AggregationStrategy {

	@SuppressWarnings("unchecked")
	public Exchange aggregate(Exchange arg0, Exchange arg1) {
		// If our previous exchange holds a collection of Exchanges then we
		// simply add to that collection. Otherwise we create a new Exchange
		// containing a collection of exchanges and use this for future
		// aggregations (and of course add to it presently).
		Collection collection;
		Object body = arg0.getIn().getBody();
		if (body instanceof Collection) {
			collection = (Collection) body;
		} else {
			collection = new Vector();
			collection.add(arg0);

			Exchange newExchange = new DefaultExchange(arg0.getContext());
			Message in = new DefaultMessage();
			in.setBody(collection);
			newExchange.setIn(in);

			arg0 = newExchange;
		}
		collection.add(arg1);
		return arg0;
	}
}
All exchanges appear to be bundled together given the use of the constant expression.

I hope that others find this use of aggregation useful.

Kind regards,
Christopher