Grouped Aggregation Strategies...

classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

Grouped Aggregation Strategies...

Craig Taylor
I'm wanting to accumulate all exchanges matching a correlation expression
(id) into a list for a given period of time.  I've looked at the
GroupedBodyAggregationStrategy and it returns an Exchange with a body of
List<Exchange>'s where I had expect an Exchange with a body of
List<Body_type> back.

aggregate(simple("${body.id}"), new GroupedBodyAggregationStrategy())
    .completionTimeout(5000L)

I wasn't able to find documentation on the built-in aggregator however,
this "sounds" like the right one to use based on naming.  Desperation wise,
attempting the other Grouped Aggregators didn't yield the solution either.

Thanks,
--
-------------------------------------------
Craig Taylor
[hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Grouped Aggregation Strategies...

hakuseki
Been struggling with this for a while and I have studied the underlying code and found that “GroupedAggregationStrategies" extends AbstractListAggreationStrategy which in turn uses the aggregate() method.

But it is not returning anything of value. It picks up a List internally but then ignores it and returns an Exchange

/**
 * This method will aggregate the old and new exchange and return the result.
 *
 * @param oldExchange The oldest exchange, can be null
 * @param newExchange The newest exchange, can be null
 * @return a composite exchange of the old and/or new exchanges
 */
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
    List<V> list;

    if (oldExchange == null) {
        list = getList(newExchange);
    } else {
        list = getList(oldExchange);
    }

    if (newExchange != null) {
        V value = getValue(newExchange);
        if (value != null) {
            list.add(value);
        }
    }

    return oldExchange != null ? oldExchange : newExchange;
}

If you find a solution, please let me know

What version of Camel are you using?

M

> On 3 Apr 2020, at 23:23, Craig Taylor <[hidden email]> wrote:
>
> I'm wanting to accumulate all exchanges matching a correlation expression
> (id) into a list for a given period of time.  I've looked at the
> GroupedBodyAggregationStrategy and it returns an Exchange with a body of
> List<Exchange>'s where I had expect an Exchange with a body of
> List<Body_type> back.
>
> aggregate(simple("${body.id}"), new GroupedBodyAggregationStrategy())
>    .completionTimeout(5000L)
>
> I wasn't able to find documentation on the built-in aggregator however,
> this "sounds" like the right one to use based on naming.  Desperation wise,
> attempting the other Grouped Aggregators didn't yield the solution either.
>
> Thanks,
> --
> -------------------------------------------
> Craig Taylor
> [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Grouped Aggregation Strategies...

Claus Ibsen-2
In reply to this post by Craig Taylor
Hi

Are you really sure as its supposed to store a List<Object> that are
the message body.
But there are however only one unit test for this in camel-jms, so
lets add one to camel-core.

On Fri, Apr 3, 2020 at 11:23 PM Craig Taylor <[hidden email]> wrote:

>
> I'm wanting to accumulate all exchanges matching a correlation expression
> (id) into a list for a given period of time.  I've looked at the
> GroupedBodyAggregationStrategy and it returns an Exchange with a body of
> List<Exchange>'s where I had expect an Exchange with a body of
> List<Body_type> back.
>
> aggregate(simple("${body.id}"), new GroupedBodyAggregationStrategy())
>     .completionTimeout(5000L)
>
> I wasn't able to find documentation on the built-in aggregator however,
> this "sounds" like the right one to use based on naming.  Desperation wise,
> attempting the other Grouped Aggregators didn't yield the solution either.
>
> Thanks,
> --
> -------------------------------------------
> Craig Taylor
> [hidden email]



--
Claus Ibsen
-----------------
http://davsclaus.com @davsclaus
Camel in Action 2: https://www.manning.com/ibsen2
Reply | Threaded
Open this post in threaded view
|

Re: Grouped Aggregation Strategies...

Claus Ibsen-2
Hi

Here is an unit test that tests it groups the body as a List
https://github.com/apache/camel/commit/f000201a1d0200fa8b9e596a516a84acb85dc538

On Sat, Apr 4, 2020 at 9:50 AM Claus Ibsen <[hidden email]> wrote:

>
> Hi
>
> Are you really sure as its supposed to store a List<Object> that are
> the message body.
> But there are however only one unit test for this in camel-jms, so
> lets add one to camel-core.
>
> On Fri, Apr 3, 2020 at 11:23 PM Craig Taylor <[hidden email]> wrote:
> >
> > I'm wanting to accumulate all exchanges matching a correlation expression
> > (id) into a list for a given period of time.  I've looked at the
> > GroupedBodyAggregationStrategy and it returns an Exchange with a body of
> > List<Exchange>'s where I had expect an Exchange with a body of
> > List<Body_type> back.
> >
> > aggregate(simple("${body.id}"), new GroupedBodyAggregationStrategy())
> >     .completionTimeout(5000L)
> >
> > I wasn't able to find documentation on the built-in aggregator however,
> > this "sounds" like the right one to use based on naming.  Desperation wise,
> > attempting the other Grouped Aggregators didn't yield the solution either.
> >
> > Thanks,
> > --
> > -------------------------------------------
> > Craig Taylor
> > [hidden email]
>
>
>
> --
> Claus Ibsen
> -----------------
> http://davsclaus.com @davsclaus
> Camel in Action 2: https://www.manning.com/ibsen2



--
Claus Ibsen
-----------------
http://davsclaus.com @davsclaus
Camel in Action 2: https://www.manning.com/ibsen2
Reply | Threaded
Open this post in threaded view
|

Re: Grouped Aggregation Strategies...

hakuseki
In reply to this post by Claus Ibsen-2
Claus (or any other ), please correct me if I’m wrong but

all the GroupedAggregationStrategies extends AbstractListAggregationStrategy
and by inheritance aggregate() is called on the extended class if not provided by own class.
This method in Camel 2.25.0 is only returning the Exchange (old/new on a condition).
Internally it retrieves a List from the Exchange and adds to that List but the List is then never used or added to any Exchange.

If I’m correct then maybe it is why he doesn’t get the expected results?


Also AggregationStrategies, is this a new feature in Camel 3?


Sorry for hijacking the thread, Craig!

Thx

M

> On 4 Apr 2020, at 09:50, Claus Ibsen <[hidden email]> wrote:
>
> Hi
>
> Are you really sure as its supposed to store a List<Object> that are
> the message body.
> But there are however only one unit test for this in camel-jms, so
> lets add one to camel-core.
>
> On Fri, Apr 3, 2020 at 11:23 PM Craig Taylor <[hidden email]> wrote:
>>
>> I'm wanting to accumulate all exchanges matching a correlation expression
>> (id) into a list for a given period of time.  I've looked at the
>> GroupedBodyAggregationStrategy and it returns an Exchange with a body of
>> List<Exchange>'s where I had expect an Exchange with a body of
>> List<Body_type> back.
>>
>> aggregate(simple("${body.id}"), new GroupedBodyAggregationStrategy())
>>    .completionTimeout(5000L)
>>
>> I wasn't able to find documentation on the built-in aggregator however,
>> this "sounds" like the right one to use based on naming.  Desperation wise,
>> attempting the other Grouped Aggregators didn't yield the solution either.
>>
>> Thanks,
>> --
>> -------------------------------------------
>> Craig Taylor
>> [hidden email]
>
>
>
> --
> Claus Ibsen
> -----------------
> http://davsclaus.com @davsclaus
> Camel in Action 2: https://www.manning.com/ibsen2

Reply | Threaded
Open this post in threaded view
|

Re: Grouped Aggregation Strategies...

Craig Taylor
In reply to this post by Claus Ibsen-2
>> Are you really sure as its supposed to store a List<Object> that are
the message body.

Yes, wanting to accumulate List<Object> over a period of time so before
passing into a "full changes" object so that batching of the request to a
3rd party api can occur.

I'm using Camel 3.1.0, and more detail / output from my route follows (note
- having issues w/ log4j - will look at that after this, first things
first):

        from(Routes.InitialRoute)
            .process(
                (Exchange ex) -> { Logger.getLogger().error("**** Exchange:
" + ex.getIn().getBody()); }
            )
            .aggregate(simple("${body.id}"),
AggregationStrategies.groupedBody())
                .completionTimeout(5000L)
            .process(
                (Exchange ex) -> { Logger.getLogger().error("****
Exchange2: " + ex.getIn().getBody()); }
            )
            .to(ChangeNotificationRoute.ROUTE_TO_WATERSHED);

with an output of :

2020/Apr/04 09:59:03.233 ERROR ....MyChangeRoute.lambda$configure$0: ****
Exchange: {change:'user', id:'ekp000002600', field:'FIELD', new:'1',
old:'null'}
2020/Apr/04 09:59:08.770 ERROR ....MyChangeRoute.lambda$configure$1: ****
Exchange2: List<Exchange>(1 elements)

Note that the Routes.InitialRoute is returning a vm:// endpoint.



On Sat, Apr 4, 2020 at 3:50 AM Claus Ibsen <[hidden email]> wrote:

> Hi
>
> Are you really sure as its supposed to store a List<Object> that are
> the message body.
> But there are however only one unit test for this in camel-jms, so
> lets add one to camel-core.
>
> On Fri, Apr 3, 2020 at 11:23 PM Craig Taylor <[hidden email]>
> wrote:
> >
> > I'm wanting to accumulate all exchanges matching a correlation expression
> > (id) into a list for a given period of time.  I've looked at the
> > GroupedBodyAggregationStrategy and it returns an Exchange with a body of
> > List<Exchange>'s where I had expect an Exchange with a body of
> > List<Body_type> back.
> >
> > aggregate(simple("${body.id}"), new GroupedBodyAggregationStrategy())
> >     .completionTimeout(5000L)
> >
> > I wasn't able to find documentation on the built-in aggregator however,
> > this "sounds" like the right one to use based on naming.  Desperation
> wise,
> > attempting the other Grouped Aggregators didn't yield the solution
> either.
> >
> > Thanks,
> > --
> > -------------------------------------------
> > Craig Taylor
> > [hidden email]
>
>
>
> --
> Claus Ibsen
> -----------------
> http://davsclaus.com @davsclaus
> Camel in Action 2: https://www.manning.com/ibsen2
>


--
-------------------------------------------
Craig Taylor
[hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Grouped Aggregation Strategies...

hakuseki
I too have migrated to Camel 3 and I can’t get this to work.

I have tried using a constant(true) but I get only 1 message back instead of 3

Using a unique correlation expression I get as before, 3 different back, not a List of 3

M

> On 4 Apr 2020, at 16:10, Craig Taylor <[hidden email]> wrote:
>
>>> Are you really sure as its supposed to store a List<Object> that are
> the message body.
>
> Yes, wanting to accumulate List<Object> over a period of time so before
> passing into a "full changes" object so that batching of the request to a
> 3rd party api can occur.
>
> I'm using Camel 3.1.0, and more detail / output from my route follows (note
> - having issues w/ log4j - will look at that after this, first things
> first):
>
>        from(Routes.InitialRoute)
>            .process(
>                (Exchange ex) -> { Logger.getLogger().error("**** Exchange:
> " + ex.getIn().getBody()); }
>            )
>            .aggregate(simple("${body.id}"),
> AggregationStrategies.groupedBody())
>                .completionTimeout(5000L)
>            .process(
>                (Exchange ex) -> { Logger.getLogger().error("****
> Exchange2: " + ex.getIn().getBody()); }
>            )
>            .to(ChangeNotificationRoute.ROUTE_TO_WATERSHED);
>
> with an output of :
>
> 2020/Apr/04 09:59:03.233 ERROR ....MyChangeRoute.lambda$configure$0: ****
> Exchange: {change:'user', id:'ekp000002600', field:'FIELD', new:'1',
> old:'null'}
> 2020/Apr/04 09:59:08.770 ERROR ....MyChangeRoute.lambda$configure$1: ****
> Exchange2: List<Exchange>(1 elements)
>
> Note that the Routes.InitialRoute is returning a vm:// endpoint.
>
>
>
> On Sat, Apr 4, 2020 at 3:50 AM Claus Ibsen <[hidden email]> wrote:
>
>> Hi
>>
>> Are you really sure as its supposed to store a List<Object> that are
>> the message body.
>> But there are however only one unit test for this in camel-jms, so
>> lets add one to camel-core.
>>
>> On Fri, Apr 3, 2020 at 11:23 PM Craig Taylor <[hidden email]>
>> wrote:
>>>
>>> I'm wanting to accumulate all exchanges matching a correlation expression
>>> (id) into a list for a given period of time.  I've looked at the
>>> GroupedBodyAggregationStrategy and it returns an Exchange with a body of
>>> List<Exchange>'s where I had expect an Exchange with a body of
>>> List<Body_type> back.
>>>
>>> aggregate(simple("${body.id}"), new GroupedBodyAggregationStrategy())
>>>    .completionTimeout(5000L)
>>>
>>> I wasn't able to find documentation on the built-in aggregator however,
>>> this "sounds" like the right one to use based on naming.  Desperation
>> wise,
>>> attempting the other Grouped Aggregators didn't yield the solution
>> either.
>>>
>>> Thanks,
>>> --
>>> -------------------------------------------
>>> Craig Taylor
>>> [hidden email]
>>
>>
>>
>> --
>> Claus Ibsen
>> -----------------
>> http://davsclaus.com @davsclaus
>> Camel in Action 2: https://www.manning.com/ibsen2
>>
>
>
> --
> -------------------------------------------
> Craig Taylor
> [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Grouped Aggregation Strategies...

Craig Taylor
Problem solved.  To be a good net-zitizen, this is what I found after
finding some old source code from when I knew camel better:

Using .
   aggregate( simple( "${body.id}"),
AggregationStrategies.flexible().accumulateInCollection( ArrayList.class))
   .completionTimeout(5000L)
allowed me to have a body of List<-type-> being passed along my route.

I'm not sure why the example Claus posted is working versus using the
flexible approach / what the GroupedAggregationStrategies are intended to
be used for.   Documentation there is terse and I'm still not sure as to
what their actual purpose is.

Thanks,

On Wed, Apr 8, 2020 at 6:06 AM Mikael Andersson Wigander <
[hidden email]> wrote:

> I too have migrated to Camel 3 and I can’t get this to work.
>
> I have tried using a constant(true) but I get only 1 message back instead
> of 3
>
> Using a unique correlation expression I get as before, 3 different back,
> not a List of 3
>
> M
>
> > On 4 Apr 2020, at 16:10, Craig Taylor <[hidden email]> wrote:
> >
> >>> Are you really sure as its supposed to store a List<Object> that are
> > the message body.
> >
> > Yes, wanting to accumulate List<Object> over a period of time so before
> > passing into a "full changes" object so that batching of the request to a
> > 3rd party api can occur.
> >
> > I'm using Camel 3.1.0, and more detail / output from my route follows
> (note
> > - having issues w/ log4j - will look at that after this, first things
> > first):
> >
> >        from(Routes.InitialRoute)
> >            .process(
> >                (Exchange ex) -> { Logger.getLogger().error("****
> Exchange:
> > " + ex.getIn().getBody()); }
> >            )
> >            .aggregate(simple("${body.id}"),
> > AggregationStrategies.groupedBody())
> >                .completionTimeout(5000L)
> >            .process(
> >                (Exchange ex) -> { Logger.getLogger().error("****
> > Exchange2: " + ex.getIn().getBody()); }
> >            )
> >            .to(ChangeNotificationRoute.ROUTE_TO_WATERSHED);
> >
> > with an output of :
> >
> > 2020/Apr/04 09:59:03.233 ERROR ....MyChangeRoute.lambda$configure$0: ****
> > Exchange: {change:'user', id:'ekp000002600', field:'FIELD', new:'1',
> > old:'null'}
> > 2020/Apr/04 09:59:08.770 ERROR ....MyChangeRoute.lambda$configure$1: ****
> > Exchange2: List<Exchange>(1 elements)
> >
> > Note that the Routes.InitialRoute is returning a vm:// endpoint.
> >
> >
> >
> > On Sat, Apr 4, 2020 at 3:50 AM Claus Ibsen <[hidden email]>
> wrote:
> >
> >> Hi
> >>
> >> Are you really sure as its supposed to store a List<Object> that are
> >> the message body.
> >> But there are however only one unit test for this in camel-jms, so
> >> lets add one to camel-core.
> >>
> >> On Fri, Apr 3, 2020 at 11:23 PM Craig Taylor <[hidden email]>
> >> wrote:
> >>>
> >>> I'm wanting to accumulate all exchanges matching a correlation
> expression
> >>> (id) into a list for a given period of time.  I've looked at the
> >>> GroupedBodyAggregationStrategy and it returns an Exchange with a body
> of
> >>> List<Exchange>'s where I had expect an Exchange with a body of
> >>> List<Body_type> back.
> >>>
> >>> aggregate(simple("${body.id}"), new GroupedBodyAggregationStrategy())
> >>>    .completionTimeout(5000L)
> >>>
> >>> I wasn't able to find documentation on the built-in aggregator however,
> >>> this "sounds" like the right one to use based on naming.  Desperation
> >> wise,
> >>> attempting the other Grouped Aggregators didn't yield the solution
> >> either.
> >>>
> >>> Thanks,
> >>> --
> >>> -------------------------------------------
> >>> Craig Taylor
> >>> [hidden email]
> >>
> >>
> >>
> >> --
> >> Claus Ibsen
> >> -----------------
> >> http://davsclaus.com @davsclaus
> >> Camel in Action 2: https://www.manning.com/ibsen2
> >>
> >
> >
> > --
> > -------------------------------------------
> > Craig Taylor
> > [hidden email]
>
>

--
-------------------------------------------
Craig Taylor
[hidden email]
Reply | Threaded
Open this post in threaded view
|

Re: Grouped Aggregation Strategies...

hakuseki
King!

Glad you figured it out

I tried it but for me it didn’t work, it still produces one exchange per “group” not one List with all grouped messages in it…

Quite tricky, this one is…

M

> On 8 Apr 2020, at 12:23, Craig Taylor <[hidden email]> wrote:
>
> Problem solved.  To be a good net-zitizen, this is what I found after
> finding some old source code from when I knew camel better:
>
> Using .
>   aggregate( simple( "${body.id}"),
> AggregationStrategies.flexible().accumulateInCollection( ArrayList.class))
>   .completionTimeout(5000L)
> allowed me to have a body of List<-type-> being passed along my route.
>
> I'm not sure why the example Claus posted is working versus using the
> flexible approach / what the GroupedAggregationStrategies are intended to
> be used for.   Documentation there is terse and I'm still not sure as to
> what their actual purpose is.
>
> Thanks,
>
> On Wed, Apr 8, 2020 at 6:06 AM Mikael Andersson Wigander <
> [hidden email]> wrote:
>
>> I too have migrated to Camel 3 and I can’t get this to work.
>>
>> I have tried using a constant(true) but I get only 1 message back instead
>> of 3
>>
>> Using a unique correlation expression I get as before, 3 different back,
>> not a List of 3
>>
>> M
>>
>>> On 4 Apr 2020, at 16:10, Craig Taylor <[hidden email]> wrote:
>>>
>>>>> Are you really sure as its supposed to store a List<Object> that are
>>> the message body.
>>>
>>> Yes, wanting to accumulate List<Object> over a period of time so before
>>> passing into a "full changes" object so that batching of the request to a
>>> 3rd party api can occur.
>>>
>>> I'm using Camel 3.1.0, and more detail / output from my route follows
>> (note
>>> - having issues w/ log4j - will look at that after this, first things
>>> first):
>>>
>>>       from(Routes.InitialRoute)
>>>           .process(
>>>               (Exchange ex) -> { Logger.getLogger().error("****
>> Exchange:
>>> " + ex.getIn().getBody()); }
>>>           )
>>>           .aggregate(simple("${body.id}"),
>>> AggregationStrategies.groupedBody())
>>>               .completionTimeout(5000L)
>>>           .process(
>>>               (Exchange ex) -> { Logger.getLogger().error("****
>>> Exchange2: " + ex.getIn().getBody()); }
>>>           )
>>>           .to(ChangeNotificationRoute.ROUTE_TO_WATERSHED);
>>>
>>> with an output of :
>>>
>>> 2020/Apr/04 09:59:03.233 ERROR ....MyChangeRoute.lambda$configure$0: ****
>>> Exchange: {change:'user', id:'ekp000002600', field:'FIELD', new:'1',
>>> old:'null'}
>>> 2020/Apr/04 09:59:08.770 ERROR ....MyChangeRoute.lambda$configure$1: ****
>>> Exchange2: List<Exchange>(1 elements)
>>>
>>> Note that the Routes.InitialRoute is returning a vm:// endpoint.
>>>
>>>
>>>
>>> On Sat, Apr 4, 2020 at 3:50 AM Claus Ibsen <[hidden email]>
>> wrote:
>>>
>>>> Hi
>>>>
>>>> Are you really sure as its supposed to store a List<Object> that are
>>>> the message body.
>>>> But there are however only one unit test for this in camel-jms, so
>>>> lets add one to camel-core.
>>>>
>>>> On Fri, Apr 3, 2020 at 11:23 PM Craig Taylor <[hidden email]>
>>>> wrote:
>>>>>
>>>>> I'm wanting to accumulate all exchanges matching a correlation
>> expression
>>>>> (id) into a list for a given period of time.  I've looked at the
>>>>> GroupedBodyAggregationStrategy and it returns an Exchange with a body
>> of
>>>>> List<Exchange>'s where I had expect an Exchange with a body of
>>>>> List<Body_type> back.
>>>>>
>>>>> aggregate(simple("${body.id}"), new GroupedBodyAggregationStrategy())
>>>>>   .completionTimeout(5000L)
>>>>>
>>>>> I wasn't able to find documentation on the built-in aggregator however,
>>>>> this "sounds" like the right one to use based on naming.  Desperation
>>>> wise,
>>>>> attempting the other Grouped Aggregators didn't yield the solution
>>>> either.
>>>>>
>>>>> Thanks,
>>>>> --
>>>>> -------------------------------------------
>>>>> Craig Taylor
>>>>> [hidden email]
>>>>
>>>>
>>>>
>>>> --
>>>> Claus Ibsen
>>>> -----------------
>>>> http://davsclaus.com @davsclaus
>>>> Camel in Action 2: https://www.manning.com/ibsen2
>>>>
>>>
>>>
>>> --
>>> -------------------------------------------
>>> Craig Taylor
>>> [hidden email]
>>
>>
>
> --
> -------------------------------------------
> Craig Taylor
> [hidden email]