Quantcast

Splitter + aggregator + dynamic timeout

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate
star

Splitter + aggregator + dynamic timeout

Aleksander Pena
 Hi guys,

I have specific requirements. Incoming request should be split into same another requests and sent to different sources paralelly. The sources (legacy and new) are almost compatible with each other but legacy responds much slower than new. To prevent degradation of the whole service incoming request will have timeout attribute set dynamically to the required value. So when legacy system do not respond in time the service should ignore response from it and return information about timeout.
I wonder which EIP components can be used for such service?
I created following route with using splitter and aggregator because aggregator has completionTimeout with dynamic argument:

package com.aleksander.pena;

import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;


public class SplitterWithAggregatorTest extends CamelTestSupport {

	@Test
	public void shouldProcessCorrectlyOnBothSources() throws Exception {
		MockEndpoint split = getMockEndpoint("mock:split");
		split.expectedBodiesReceivedInAnyOrder("1", "2", "3");
		
		MockEndpoint result = getMockEndpoint("mock:result");
		result.expectedBodiesReceived("1+2+3");
		
		template.requestBody("direct:start", "A,B,C");
		assertMockEndpointsSatisfied();
	}
	
    @Override
    protected RouteBuilder createRouteBuilder() throws Exception {

    	return new RouteBuilder() {
            @Override
            public void configure() throws Exception {
            	
            	from("direct:start")
            		.log("start body: ${body}")
            		.to("direct:process")
            		.log("result body: ${body}")
            		.to("mock:result");
            	
                from("direct:process")
                    .split(body())
                    .parallelProcessing()
                        .log("Split line ${body}")
	                        .bean(new Responder())
                        .to("mock:split")
                    .aggregate(header("myId"), new MyAggregationStrategy())
                    	.completionSize(3)
                    	.log("aggregated ${body}")
                    	.log("completed by ${property.CamelAggregatedCompletedBy}")
                    .log("test body 1: ${body}")
                    .end()
                    .log("test body 2: ${body}")
                    .end();
            }
        };
    }
    
	public class Responder {

		public String translate(Exchange ex, String key) {
			ex.getIn().setHeader("myId", "correlation id 1");
			if ("A".equals(key)) {
				return "1";
			} else if ("B".equals(key)) {
				return "2";
			} else {
				return "3";
			}
		}
	}

	public class MyAggregationStrategy implements AggregationStrategy {

		public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
			if (oldExchange == null) {
				return newExchange;
			}
			String body = newExchange.getIn().getBody(String.class);
			String existing = oldExchange.getIn().getBody(String.class);

			oldExchange.getIn().setBody(existing + "+" + body);
			return oldExchange;
		}

	}
}

Unfortunately results are not as I expected.Please see below excerpt from console logs. Responses from sources are aggregated correctly but result stay inside Exchange in thread #3 and it is not returned to the caller main thread.

2012-08-12 10:12:22,285 [                     main] INFO  DefaultCamelContext            - Total 2 routes, of which 2 is started.
2012-08-12 10:12:22,285 [                     main] INFO  DefaultCamelContext            - Apache Camel 2.10.0 (CamelContext: camel-1) started in 0.521 seconds
2012-08-12 10:12:22,315 [                     main] INFO  route1                         - start body: A,B,C
2012-08-12 10:12:22,335 [amel-1) thread #1 - Split] INFO  route2                         - Split line B
2012-08-12 10:12:22,335 [amel-1) thread #0 - Split] INFO  route2                         - Split line A
2012-08-12 10:12:22,335 [amel-1) thread #3 - Split] INFO  route2                         - Split line C
2012-08-12 10:12:22,345 [amel-1) thread #1 - Split] INFO  route2                         - test body 2: 2
2012-08-12 10:12:22,345 [amel-1) thread #0 - Split] INFO  route2                         - test body 2: 1
2012-08-12 10:12:22,345 [amel-1) thread #3 - Split] INFO  route2                         - aggregated 2+1+3
2012-08-12 10:12:22,345 [amel-1) thread #3 - Split] INFO  route2                         - completed by size
2012-08-12 10:12:22,345 [amel-1) thread #3 - Split] INFO  route2                         - test body 1: 2+1+3
2012-08-12 10:12:22,345 [amel-1) thread #3 - Split] INFO  route2                         - test body 2: 3
2012-08-12 10:12:22,345 [                     main] INFO  route1                         - result body: A,B,C

Could you advice what could be wrong with my route? I'm not so sure about places where I put end() elements? Anyway I saw in documentation that the only  way to return result from splitter to the main thread is to use its aggregation strategy but maybe there is some workaround for this?
I'm using Camel 2.8.0-fuse-01-13.

Thanks.
Alek
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate
star

Re: Splitter + aggregator + dynamic timeout

hekonsek
Hi Olek,

First of all - apparently you forgot to add the routes examples to the
message. :) Could you send them here?

> I wonder which EIP components can be used for such service?

Splitter or Mutlicast with parallel processing enabled. Plus some kind
of aggregation (more about this below).

The key question is what do you want to do with the responses from
both services (legacy and non-legacy). Do you need to collect both
results before continue processing? Do you want to merge the
aggregated results into a single message?

Send us your routes and tell more about how you treat the responses
from the services you call. Then we should be able to give you more
precise answer :) .

Laters.

--
Henryk Konsek
http://henryk-konsek.blogspot.com
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate
star

Re: Splitter + aggregator + dynamic timeout

Aleksander Pena
Hi Henryk,


> First of all - apparently you forgot to add the routes examples to the
> message. :) Could you send them here?

I'm not sure if the example I've added in the original message is visible by you? I've added example route to the message above and it is visible via web page.Please advice as this is my first post here :)

> The key question is what do you want to do with the responses from
> both services (legacy and non-legacy). Do you need to collect both
> results before continue processing? Do you want to merge the
> aggregated results into a single message?

More detailed description of my reqs are as be:
Incoming message is from http endpoint (1) and reply should be sent back to it.
Legacy (2)  and new (3) systems are http endpoints as well. I need to sent requests to them, aggregate their responses into one response and sent it back to the starting endpoint (1). Responses from (2) and (3) should be aggregated together and there is some additional processing needed (removing duplicates etc). And yes before I'll continue processing I need to aggregate results from (2) and (3).

Regards,
Alek
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate
star

Re: Splitter + aggregator + dynamic timeout

hekonsek
Hi Olek,

> I'm not sure if the example I've added in the original message is visible
> by you? I've added example route to the message above and it is visible via
> web page.Please advice as this is my first post here :)

Yeah, I can see the code via Nabble web interface, but the code
formatting makes the examples invisible for GMail. It is safer to send
examples as plain text. :)

> More detailed description of my reqs are as be:
> And yes before I'll continue processing I need
> to aggregate results from (2) and (3).

Ok, let's start with some EIP magic here. :) Your problem can be
solved with the following EIP flow:

a) Split the message coming to the aggregation HTTP endpoint.
b) Start concurrent processing for each part of the split message.
c) For each part of the message send it to the content based router.
d) The router decides to which Enricher endpoint message should be sent.
e) Aggregate the results using the GroupedExchangeAggregationStrategy.
f) Send response.

And this is sample routes demonstrating possible implementation of the
flow above:

from("direct:emulateLegacyHttp").
  setBody().simple("emulateLegacyHttp-${body}");

from("direct:emulateNonLegacyHttp").
  setBody().simple("emulateNonLegacyHttp-${body}");

from("direct:serviceAggregator").
  split(body().tokenize(":")).
  aggregationStrategy(new GroupedExchangeAggregationStrategy()).
  parallelProcessing().
  choice().
    when(body().isEqualTo("foo")).
      enrich("direct:emulateLegacyHttp").
    otherwise().
      enrich("direct:emulateNonLegacyHttp").
   end().
   end().
   to("direct:aggregatedResults");

These are the core routing rules for your issue. You can enhance it
with timeout, filtering and other features you need. The results of
aggregation will be stored in the exchange property under the
following key - Exchange.GROUPED_EXCHANGE .

Remember that Camel is pretty flexible so my solution is not the only
one possible. That's how I would do it. :)

Here is also some lecture [1] from my blog regarding service aggregation. :)

[1] http://henryk-konsek.blogspot.com/2012/05/aggregating-multiple-web-services.html

--
Henryk Konsek
http://henryk-konsek.blogspot.com
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate
star

Re: Splitter + aggregator + dynamic timeout

Babak Vahdat
Hi

the sample unit-test Alek has already provided by this thread puzzles me a bit, so I changed it to a minimum (without parallel processing) which I expect it to pass but it doesn't as "mock:result" is not satisfied:

    java.lang.AssertionError: mock://result Body of message: 0. Expected: <1+2+3> but was: <A,B,C>

However "mock:result2" is satisfied, which is as expected.

Can someone please advice me about the point I'm missing, or could it maybe a bug?

public class SplitterWithAggregatorTest extends CamelTestSupport {

    @Test
    public void shouldProcessCorrectlyOnBothSources() throws Exception {
        getMockEndpoint("mock:result").expectedBodiesReceived("1+2+3");
        getMockEndpoint("mock:result2").expectedBodiesReceived("1+2+3");

        template.requestBody("direct:start", "A,B,C");

        assertMockEndpointsSatisfied();
    }

    @Override
    protected RouteBuilder createRouteBuilder() throws Exception {
        return new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                from("direct:start").to("direct:process").to("mock:result");

                from("direct:process").split(body()).bean(new Responder()).aggregate(header("myId"), new MyAggregationStrategy()).completionSize(3).to("mock:result2");
            }
        };
    }

    public static class Responder {

        public String translate(Exchange exchange, String body) {
            exchange.getIn().setHeader("myId", "myValue");

            if ("A".equals(body)) {
                return "1";
            } else if ("B".equals(body)) {
                return "2";
            } else if ("C".equals(body)) {
                return "3";
            } else {
                throw new IllegalArgumentException("bla bla");
            }
        }

    }

    public static class MyAggregationStrategy implements AggregationStrategy {

        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
            if (oldExchange == null) {
                return newExchange;
            }

            String oldBody = oldExchange.getIn().getBody(String.class);
            String newBody = newExchange.getIn().getBody(String.class);
            oldExchange.getIn().setBody(oldBody + "+" + newBody);
            return oldExchange;
        }

    }
}


Thanks, Babak
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate
star

Re: Splitter + aggregator + dynamic timeout

Babak Vahdat
Changing the second route as the following makes the unit-test to pass:

    @Override
    protected RouteBuilder createRouteBuilder() throws Exception {
        return new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                from("direct:start").to("direct:process").to("mock:result");

                from("direct:process").process(new Processor() {
                    @Override
                    public void process(Exchange exchange) throws Exception {
                        exchange.getIn().setBody("1+2+3");
                    }
                }).to("mock:result2");
            }
        };
    }

So to me this's definitely a bug!

Babak
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate
star

Re: Splitter + aggregator + dynamic timeout

Aleksander Pena
In reply to this post by hekonsek
Henryk,

thanks for another way of resolving my requirements it works perfectly well, but there are missing some important req: I need to setup timeout dynamically, so the flow could looks like following:

.when(body().isEqualTo("foo"))
     .enrich("direct:emulateLegacyHttp").timeout(property(TIMEOUT))
.otherwise()
     .enrich("direct:emulateNonLegacyHttp").timeout(property(TIMEOUT))
.end()

Unfortunately I cannot find timeout with argument of Expression type :(

It would be the best if I could specify timeout per each flow in route like in the below excerpt:

from("direct:start")
    .timeout(property(TIMEOUT))
    .process(new SomeProcessingHere())
    .to("direct:anotherEndpoint");

But probably it is difficult to implement such behaviour in Camel.

Anyway there is still open question why my original example doesn't work. Is it really a bug in Camel as Babak suggesting?

Thanks for help,
Alek
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate
star

Re: Splitter + aggregator + dynamic timeout

hekonsek
Hi Olek,

I'll take a look at both timeout in my solution and in possible bug in
your original one. Probably tomorrow, I'll got some spare time to
analyze these issues.

Laters.

--
Henryk Konsek
http://henryk-konsek.blogspot.com
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate
star

Re: Splitter + aggregator + dynamic timeout

Babak Vahdat
In reply to this post by Aleksander Pena
Hi

I WAS WRONG with my bug assumption, see the answer here:

http://camel.465427.n5.nabble.com/Is-this-routing-behaviour-as-expected-td5717331.html

Sorry for the noise & confusion.

Babak

Aleksander Pena wrote
Henryk,

thanks for another way of resolving my requirements it works perfectly
well, but there are missing some important req: I need to setup timeout
dynamically, so the flow could looks like following:

.when(body().isEqualTo("foo"))
     .enrich("direct:emulateLegacyHttp").timeout(property(TIMEOUT))
.otherwise()
     .enrich("direct:emulateNonLegacyHttp").timeout(property(TIMEOUT))
.end()

Unfortunately I cannot find timeout with argument of Expression type :(

It would be the best if I could specify timeout per each flow in route like
in the below excerpt:

from("direct:start")
    .timeout(property(TIMEOUT))
    .process(new SomeProcessingHere())
    .to("direct:anotherEndpoint");

But probably it is difficult to implement such behaviour in Camel.

Anyway there is still open question why my original example doesn't work.
Is it really a bug in Camel as Babak suggesting?

Thanks for help,
Alek
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate
star

Re: Splitter + aggregator + dynamic timeout

Babak Vahdat
In reply to this post by Aleksander Pena
Hi Alex,

Regarding your "open question" by your mail below please find a slightly modified version of your routing which now would pass. See also the "XXX" comments of mine. Hope this helps.

Babak

public class SplitterWithAggregatorTest extends CamelTestSupport {

    @Test
    public void shouldProcessCorrectlyOnBothSources() throws Exception {
            MockEndpoint split = getMockEndpoint("mock:split");
            split.expectedBodiesReceivedInAnyOrder("1", "2", "3");
           
            MockEndpoint result = getMockEndpoint("mock:result2");
            result.expectedBodiesReceived("1+2+3");
           
            template.requestBody("direct:start", "A,B,C");
            assertMockEndpointsSatisfied();
    }
   
@Override
protected RouteBuilder createRouteBuilder() throws Exception {

    return new RouteBuilder() {
        @Override
        public void configure() throws Exception {
           
            from("direct:start")
                    .log("start body: ${body}")
                    .to("direct:process")
                    .log("result body: ${body}")
                    .to("mock:result");
           
            from("direct:process")
                .split(body())
                // .parallelProcessing() XXX: no parallelProcessing as otherwise no gurantee for the expected order (we want the data flow "1" then "2" and at last "3")
                    .log("Split line ${body}")
                            .bean(new Responder())
                    .to("mock:split")
                .aggregate(header("myId"), new MyAggregationStrategy())
                    .completionSize(3)
                    .log("aggregated ${body}")
                    .log("completed by ${property.CamelAggregatedCompletedBy}")
                .log("test body: ${body}")
                .to("direct:result2");
           
            from("direct:result2") // XXX: a third route where we get the final result of the whole process
           // XXX: any possible final processing goes here
           // ...
            .to("mock:result2");
        }
    };
}

    public class Responder {

            public String translate(Exchange ex, String key) {
                    ex.getIn().setHeader("myId", "correlation id 1");
                    if ("A".equals(key)) {
                            return "1";
                    } else if ("B".equals(key)) {
                            return "2";
                    } else {
                            return "3";
                    }
            }
    }

    public class MyAggregationStrategy implements AggregationStrategy {

            public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                    if (oldExchange == null) {
                            return newExchange;
                    }
                    String body = newExchange.getIn().getBody(String.class);
                    String existing = oldExchange.getIn().getBody(String.class);

                    oldExchange.getIn().setBody(existing + "+" + body);
                    return oldExchange;
            }

    }
}


Aleksander Pena wrote
Henryk,

thanks for another way of resolving my requirements it works perfectly
well, but there are missing some important req: I need to setup timeout
dynamically, so the flow could looks like following:

.when(body().isEqualTo("foo"))
     .enrich("direct:emulateLegacyHttp").timeout(property(TIMEOUT))
.otherwise()
     .enrich("direct:emulateNonLegacyHttp").timeout(property(TIMEOUT))
.end()

Unfortunately I cannot find timeout with argument of Expression type :(

It would be the best if I could specify timeout per each flow in route like
in the below excerpt:

from("direct:start")
    .timeout(property(TIMEOUT))
    .process(new SomeProcessingHere())
    .to("direct:anotherEndpoint");

But probably it is difficult to implement such behaviour in Camel.

Anyway there is still open question why my original example doesn't work.
Is it really a bug in Camel as Babak suggesting?

Thanks for help,
Alek
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate
star

Re: Splitter + aggregator + dynamic timeout

Aleksander Pena
Hi Babak,

your solution works fine but you removed parallel processing which is important for me :)
Anyway I found solution for my original problem:

1. before processing is started I'm storing current exchange (from a main thread) as a property (myEx). Then myEx property is propagated to splitter threads.
2. after aggregation is ended (in splitter thread)  I'm retrieving 'myEx' exchange from properties and set new property on it with aggregation results (myRes).
3. after whole processing (when I'm in the main thread again) I'm retrieving myRes property from the exchange and set it as a body :)

Thanks guys for all your help,
Alek

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate
star

Re: Splitter + aggregator + dynamic timeout

Babak Vahdat
This post was updated on .
Am 16.08.12 11:31 schrieb "Aleksander Pena" unter
<aleksander.pena@gmail.com>:

>Hi Babak,
>
>your solution works fine but you removed parallel processing which is
>important for me :)

Just be aware that if you make use of parallelProcessing option then
you've got no guarantee about the *order* of the outcomes. As an example
try to run the following unit-test and see how the content of the file
"target/concurrent/outbox/result.txt" seems like:

https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentWriteAppendSameFileTest.java

And that even it's content could be different by each run of the test!

>Anyway I found solution for my original problem:

Happy to hear that.

>
>1. before processing is started I'm storing current exchange (from a main
>thread) as a property (myEx). Then myEx property is propagated to splitter
>threads.
>2. after aggregation is ended (in splitter thread)  I'm retrieving 'myEx'
>exchange from properties and set new property on it with aggregation
>results (myRes).
>3. after whole processing (when I'm in the main thread again) I'm
>retrieving myRes property from the exchange and set it as a body :)
>
>Thanks guys for all your help,
>Alek
>
>
>
>
>--
>View this message in context:
>http://camel.465427.n5.nabble.com/Splitter-aggregator-dynamic-timeout-tp57
>17166p5717486.html
>Sent from the Camel - Users mailing list archive at Nabble.com.


Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate
star

Re: Splitter + aggregator + dynamic timeout

Henrique Viecili
Sorry to revive this post, but I am facing a similar problem.

I see you are doing a split/aggregation with *dynamic timeout* coming from
some property. What I need is to specify the timeout based on the
content/header of the message via Expression... is it possible?

att.
*Henrique Viecili*

On Thu, Aug 16, 2012 at 7:05 AM, Babak Vahdat
<[hidden email]>wrote:

>
> Am 16.08.12 11:31 schrieb "Aleksander Pena" unter
> <[hidden email]>:
>
> >Hi Babak,
> >
> >your solution works fine but you removed parallel processing which is
> >important for me :)
>
> Just be aware that if you make use of parallelProcessing option then
> you've got no guarantee about the *order* of the outcomes. As an example
> try to run the following unit-test and see how the content of the file
> "target/concurrent/outbox/result.txt" seems like:
>
> https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/test/java/org/a
> pache/camel/component/file/FileConcurrentWriteAppendSameFileTest.java
>
> And that even it's content could be different by each run of the test!
>
> >Anyway I found solution for my original problem:
>
> Happy to hear that.
>
> >
> >1. before processing is started I'm storing current exchange (from a main
> >thread) as a property (myEx). Then myEx property is propagated to splitter
> >threads.
> >2. after aggregation is ended (in splitter thread)  I'm retrieving 'myEx'
> >exchange from properties and set new property on it with aggregation
> >results (myRes).
> >3. after whole processing (when I'm in the main thread again) I'm
> >retrieving myRes property from the exchange and set it as a body :)
> >
> >Thanks guys for all your help,
> >Alek
> >
> >
> >
> >
> >--
> >View this message in context:
> >
> http://camel.465427.n5.nabble.com/Splitter-aggregator-dynamic-timeout-tp57
> >17166p5717486.html
> >Sent from the Camel - Users mailing list archive at Nabble.com.
>
>
>
Loading...