|
Hi guys,
I have specific requirements. Incoming request should be split into same another requests and sent to different sources paralelly. The sources (legacy and new) are almost compatible with each other but legacy responds much slower than new. To prevent degradation of the whole service incoming request will have timeout attribute set dynamically to the required value. So when legacy system do not respond in time the service should ignore response from it and return information about timeout. I wonder which EIP components can be used for such service? I created following route with using splitter and aggregator because aggregator has completionTimeout with dynamic argument:
package com.aleksander.pena;
import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.processor.aggregate.AggregationStrategy;
import org.apache.camel.test.junit4.CamelTestSupport;
import org.junit.Test;
public class SplitterWithAggregatorTest extends CamelTestSupport {
@Test
public void shouldProcessCorrectlyOnBothSources() throws Exception {
MockEndpoint split = getMockEndpoint("mock:split");
split.expectedBodiesReceivedInAnyOrder("1", "2", "3");
MockEndpoint result = getMockEndpoint("mock:result");
result.expectedBodiesReceived("1+2+3");
template.requestBody("direct:start", "A,B,C");
assertMockEndpointsSatisfied();
}
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
from("direct:start")
.log("start body: ${body}")
.to("direct:process")
.log("result body: ${body}")
.to("mock:result");
from("direct:process")
.split(body())
.parallelProcessing()
.log("Split line ${body}")
.bean(new Responder())
.to("mock:split")
.aggregate(header("myId"), new MyAggregationStrategy())
.completionSize(3)
.log("aggregated ${body}")
.log("completed by ${property.CamelAggregatedCompletedBy}")
.log("test body 1: ${body}")
.end()
.log("test body 2: ${body}")
.end();
}
};
}
public class Responder {
public String translate(Exchange ex, String key) {
ex.getIn().setHeader("myId", "correlation id 1");
if ("A".equals(key)) {
return "1";
} else if ("B".equals(key)) {
return "2";
} else {
return "3";
}
}
}
public class MyAggregationStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
if (oldExchange == null) {
return newExchange;
}
String body = newExchange.getIn().getBody(String.class);
String existing = oldExchange.getIn().getBody(String.class);
oldExchange.getIn().setBody(existing + "+" + body);
return oldExchange;
}
}
}
Unfortunately results are not as I expected.Please see below excerpt from console logs. Responses from sources are aggregated correctly but result stay inside Exchange in thread #3 and it is not returned to the caller main thread. 2012-08-12 10:12:22,285 [ main] INFO DefaultCamelContext - Total 2 routes, of which 2 is started. 2012-08-12 10:12:22,285 [ main] INFO DefaultCamelContext - Apache Camel 2.10.0 (CamelContext: camel-1) started in 0.521 seconds 2012-08-12 10:12:22,315 [ main] INFO route1 - start body: A,B,C 2012-08-12 10:12:22,335 [amel-1) thread #1 - Split] INFO route2 - Split line B 2012-08-12 10:12:22,335 [amel-1) thread #0 - Split] INFO route2 - Split line A 2012-08-12 10:12:22,335 [amel-1) thread #3 - Split] INFO route2 - Split line C 2012-08-12 10:12:22,345 [amel-1) thread #1 - Split] INFO route2 - test body 2: 2 2012-08-12 10:12:22,345 [amel-1) thread #0 - Split] INFO route2 - test body 2: 1 2012-08-12 10:12:22,345 [amel-1) thread #3 - Split] INFO route2 - aggregated 2+1+3 2012-08-12 10:12:22,345 [amel-1) thread #3 - Split] INFO route2 - completed by size 2012-08-12 10:12:22,345 [amel-1) thread #3 - Split] INFO route2 - test body 1: 2+1+3 2012-08-12 10:12:22,345 [amel-1) thread #3 - Split] INFO route2 - test body 2: 3 2012-08-12 10:12:22,345 [ main] INFO route1 - result body: A,B,C Could you advice what could be wrong with my route? I'm not so sure about places where I put end() elements? Anyway I saw in documentation that the only way to return result from splitter to the main thread is to use its aggregation strategy but maybe there is some workaround for this? I'm using Camel 2.8.0-fuse-01-13. Thanks. Alek |
|
Hi Olek,
First of all - apparently you forgot to add the routes examples to the message. :) Could you send them here? > I wonder which EIP components can be used for such service? Splitter or Mutlicast with parallel processing enabled. Plus some kind of aggregation (more about this below). The key question is what do you want to do with the responses from both services (legacy and non-legacy). Do you need to collect both results before continue processing? Do you want to merge the aggregated results into a single message? Send us your routes and tell more about how you treat the responses from the services you call. Then we should be able to give you more precise answer :) . Laters. -- Henryk Konsek http://henryk-konsek.blogspot.com |
|
Hi Henryk,
> First of all - apparently you forgot to add the routes examples to the > message. :) Could you send them here? I'm not sure if the example I've added in the original message is visible by you? I've added example route to the message above and it is visible via web page.Please advice as this is my first post here :) > The key question is what do you want to do with the responses from > both services (legacy and non-legacy). Do you need to collect both > results before continue processing? Do you want to merge the > aggregated results into a single message? More detailed description of my reqs are as be: Incoming message is from http endpoint (1) and reply should be sent back to it. Legacy (2) and new (3) systems are http endpoints as well. I need to sent requests to them, aggregate their responses into one response and sent it back to the starting endpoint (1). Responses from (2) and (3) should be aggregated together and there is some additional processing needed (removing duplicates etc). And yes before I'll continue processing I need to aggregate results from (2) and (3). Regards, Alek |
|
Hi Olek,
> I'm not sure if the example I've added in the original message is visible > by you? I've added example route to the message above and it is visible via > web page.Please advice as this is my first post here :) Yeah, I can see the code via Nabble web interface, but the code formatting makes the examples invisible for GMail. It is safer to send examples as plain text. :) > More detailed description of my reqs are as be: > And yes before I'll continue processing I need > to aggregate results from (2) and (3). Ok, let's start with some EIP magic here. :) Your problem can be solved with the following EIP flow: a) Split the message coming to the aggregation HTTP endpoint. b) Start concurrent processing for each part of the split message. c) For each part of the message send it to the content based router. d) The router decides to which Enricher endpoint message should be sent. e) Aggregate the results using the GroupedExchangeAggregationStrategy. f) Send response. And this is sample routes demonstrating possible implementation of the flow above: from("direct:emulateLegacyHttp"). setBody().simple("emulateLegacyHttp-${body}"); from("direct:emulateNonLegacyHttp"). setBody().simple("emulateNonLegacyHttp-${body}"); from("direct:serviceAggregator"). split(body().tokenize(":")). aggregationStrategy(new GroupedExchangeAggregationStrategy()). parallelProcessing(). choice(). when(body().isEqualTo("foo")). enrich("direct:emulateLegacyHttp"). otherwise(). enrich("direct:emulateNonLegacyHttp"). end(). end(). to("direct:aggregatedResults"); These are the core routing rules for your issue. You can enhance it with timeout, filtering and other features you need. The results of aggregation will be stored in the exchange property under the following key - Exchange.GROUPED_EXCHANGE . Remember that Camel is pretty flexible so my solution is not the only one possible. That's how I would do it. :) Here is also some lecture [1] from my blog regarding service aggregation. :) [1] http://henryk-konsek.blogspot.com/2012/05/aggregating-multiple-web-services.html -- Henryk Konsek http://henryk-konsek.blogspot.com |
|
Hi
the sample unit-test Alek has already provided by this thread puzzles me a bit, so I changed it to a minimum (without parallel processing) which I expect it to pass but it doesn't as "mock:result" is not satisfied: java.lang.AssertionError: mock://result Body of message: 0. Expected: <1+2+3> but was: <A,B,C> However "mock:result2" is satisfied, which is as expected. Can someone please advice me about the point I'm missing, or could it maybe a bug? public class SplitterWithAggregatorTest extends CamelTestSupport { @Test public void shouldProcessCorrectlyOnBothSources() throws Exception { getMockEndpoint("mock:result").expectedBodiesReceived("1+2+3"); getMockEndpoint("mock:result2").expectedBodiesReceived("1+2+3"); template.requestBody("direct:start", "A,B,C"); assertMockEndpointsSatisfied(); } @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { from("direct:start").to("direct:process").to("mock:result"); from("direct:process").split(body()).bean(new Responder()).aggregate(header("myId"), new MyAggregationStrategy()).completionSize(3).to("mock:result2"); } }; } public static class Responder { public String translate(Exchange exchange, String body) { exchange.getIn().setHeader("myId", "myValue"); if ("A".equals(body)) { return "1"; } else if ("B".equals(body)) { return "2"; } else if ("C".equals(body)) { return "3"; } else { throw new IllegalArgumentException("bla bla"); } } } public static class MyAggregationStrategy implements AggregationStrategy { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (oldExchange == null) { return newExchange; } String oldBody = oldExchange.getIn().getBody(String.class); String newBody = newExchange.getIn().getBody(String.class); oldExchange.getIn().setBody(oldBody + "+" + newBody); return oldExchange; } } } Thanks, Babak |
|
Changing the second route as the following makes the unit-test to pass:
@Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { from("direct:start").to("direct:process").to("mock:result"); from("direct:process").process(new Processor() { @Override public void process(Exchange exchange) throws Exception { exchange.getIn().setBody("1+2+3"); } }).to("mock:result2"); } }; } So to me this's definitely a bug! Babak |
|
In reply to this post by hekonsek
Henryk,
thanks for another way of resolving my requirements it works perfectly well, but there are missing some important req: I need to setup timeout dynamically, so the flow could looks like following: .when(body().isEqualTo("foo")) .enrich("direct:emulateLegacyHttp").timeout(property(TIMEOUT)) .otherwise() .enrich("direct:emulateNonLegacyHttp").timeout(property(TIMEOUT)) .end() Unfortunately I cannot find timeout with argument of Expression type :( It would be the best if I could specify timeout per each flow in route like in the below excerpt: from("direct:start") .timeout(property(TIMEOUT)) .process(new SomeProcessingHere()) .to("direct:anotherEndpoint"); But probably it is difficult to implement such behaviour in Camel. Anyway there is still open question why my original example doesn't work. Is it really a bug in Camel as Babak suggesting? Thanks for help, Alek |
|
Hi Olek,
I'll take a look at both timeout in my solution and in possible bug in your original one. Probably tomorrow, I'll got some spare time to analyze these issues. Laters. -- Henryk Konsek http://henryk-konsek.blogspot.com |
|
In reply to this post by Aleksander Pena
Hi
I WAS WRONG with my bug assumption, see the answer here: http://camel.465427.n5.nabble.com/Is-this-routing-behaviour-as-expected-td5717331.html Sorry for the noise & confusion. Babak
|
|
In reply to this post by Aleksander Pena
Hi Alex,
Regarding your "open question" by your mail below please find a slightly modified version of your routing which now would pass. See also the "XXX" comments of mine. Hope this helps. Babak public class SplitterWithAggregatorTest extends CamelTestSupport { @Test public void shouldProcessCorrectlyOnBothSources() throws Exception { MockEndpoint split = getMockEndpoint("mock:split"); split.expectedBodiesReceivedInAnyOrder("1", "2", "3"); MockEndpoint result = getMockEndpoint("mock:result2"); result.expectedBodiesReceived("1+2+3"); template.requestBody("direct:start", "A,B,C"); assertMockEndpointsSatisfied(); } @Override protected RouteBuilder createRouteBuilder() throws Exception { return new RouteBuilder() { @Override public void configure() throws Exception { from("direct:start") .log("start body: ${body}") .to("direct:process") .log("result body: ${body}") .to("mock:result"); from("direct:process") .split(body()) // .parallelProcessing() XXX: no parallelProcessing as otherwise no gurantee for the expected order (we want the data flow "1" then "2" and at last "3") .log("Split line ${body}") .bean(new Responder()) .to("mock:split") .aggregate(header("myId"), new MyAggregationStrategy()) .completionSize(3) .log("aggregated ${body}") .log("completed by ${property.CamelAggregatedCompletedBy}") .log("test body: ${body}") .to("direct:result2"); from("direct:result2") // XXX: a third route where we get the final result of the whole process // XXX: any possible final processing goes here // ... .to("mock:result2"); } }; } public class Responder { public String translate(Exchange ex, String key) { ex.getIn().setHeader("myId", "correlation id 1"); if ("A".equals(key)) { return "1"; } else if ("B".equals(key)) { return "2"; } else { return "3"; } } } public class MyAggregationStrategy implements AggregationStrategy { public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { if (oldExchange == null) { return newExchange; } String body = newExchange.getIn().getBody(String.class); String existing = oldExchange.getIn().getBody(String.class); oldExchange.getIn().setBody(existing + "+" + body); return oldExchange; } } }
|
|
Hi Babak,
your solution works fine but you removed parallel processing which is important for me :) Anyway I found solution for my original problem: 1. before processing is started I'm storing current exchange (from a main thread) as a property (myEx). Then myEx property is propagated to splitter threads. 2. after aggregation is ended (in splitter thread) I'm retrieving 'myEx' exchange from properties and set new property on it with aggregation results (myRes). 3. after whole processing (when I'm in the main thread again) I'm retrieving myRes property from the exchange and set it as a body :) Thanks guys for all your help, Alek |
|
This post was updated on .
Am 16.08.12 11:31 schrieb "Aleksander Pena" unter
<aleksander.pena@gmail.com>: >Hi Babak, > >your solution works fine but you removed parallel processing which is >important for me :) Just be aware that if you make use of parallelProcessing option then you've got no guarantee about the *order* of the outcomes. As an example try to run the following unit-test and see how the content of the file "target/concurrent/outbox/result.txt" seems like: https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConcurrentWriteAppendSameFileTest.java And that even it's content could be different by each run of the test! >Anyway I found solution for my original problem: Happy to hear that. > >1. before processing is started I'm storing current exchange (from a main >thread) as a property (myEx). Then myEx property is propagated to splitter >threads. >2. after aggregation is ended (in splitter thread) I'm retrieving 'myEx' >exchange from properties and set new property on it with aggregation >results (myRes). >3. after whole processing (when I'm in the main thread again) I'm >retrieving myRes property from the exchange and set it as a body :) > >Thanks guys for all your help, >Alek > > > > >-- >View this message in context: >http://camel.465427.n5.nabble.com/Splitter-aggregator-dynamic-timeout-tp57 >17166p5717486.html >Sent from the Camel - Users mailing list archive at Nabble.com. |
|
Sorry to revive this post, but I am facing a similar problem.
I see you are doing a split/aggregation with *dynamic timeout* coming from some property. What I need is to specify the timeout based on the content/header of the message via Expression... is it possible? att. *Henrique Viecili* On Thu, Aug 16, 2012 at 7:05 AM, Babak Vahdat <[hidden email]>wrote: > > Am 16.08.12 11:31 schrieb "Aleksander Pena" unter > <[hidden email]>: > > >Hi Babak, > > > >your solution works fine but you removed parallel processing which is > >important for me :) > > Just be aware that if you make use of parallelProcessing option then > you've got no guarantee about the *order* of the outcomes. As an example > try to run the following unit-test and see how the content of the file > "target/concurrent/outbox/result.txt" seems like: > > https://svn.apache.org/repos/asf/camel/trunk/camel-core/src/test/java/org/a > pache/camel/component/file/FileConcurrentWriteAppendSameFileTest.java > > And that even it's content could be different by each run of the test! > > >Anyway I found solution for my original problem: > > Happy to hear that. > > > > >1. before processing is started I'm storing current exchange (from a main > >thread) as a property (myEx). Then myEx property is propagated to splitter > >threads. > >2. after aggregation is ended (in splitter thread) I'm retrieving 'myEx' > >exchange from properties and set new property on it with aggregation > >results (myRes). > >3. after whole processing (when I'm in the main thread again) I'm > >retrieving myRes property from the exchange and set it as a body :) > > > >Thanks guys for all your help, > >Alek > > > > > > > > > >-- > >View this message in context: > > > http://camel.465427.n5.nabble.com/Splitter-aggregator-dynamic-timeout-tp57 > >17166p5717486.html > >Sent from the Camel - Users mailing list archive at Nabble.com. > > > |
| Powered by Nabble | Edit this page |
