|
Hello, I have a quite basic problem which I cannot solve. The thing is
that I want to send message to couple of services. These services should be invoked from multiple threads because the processing will take quite some time. Then I want to gather, and aggregate the output of the services somehow nicely, and return it to the calling point. I was trying to use seda with combination with aggregator, which worked up to the point that it returned just the first message to the user - before aggregator was able to process the rest. I know that the whole service is asynchronous by my design, but it would be nice to know how to gather the results back... there is an excerpt from my code - just an illustation: from("direct:a").setHeader("splitCount", new Expression() { public Object evaluate(Exchange exchange) { return 2; }}).to("seda:b", "seda:c"); from("seda:b").process(new HeaderSetter("msgCounter", 0)).process(traceProcessor).to("ret:xxx").to("seda:z"); from("seda:c").process(new HeaderSetter("msgCounter", 1)).process(traceProcessor).to("ret:xxx").to("seda:z"); from("seda:z").aggregator(header("msgId"), aggregationStrategy).to("return:a"); Exchange exchange = new DefaultExchange(ctx); exchange.getIn().setBody("<result" + ii + " />"); exchange.getIn().setHeader("msgId", "ahojValue" + ii); exchange = template.send("direct:a", exchange); Object body = exchange.getOut().getBody(); can somebody advice me what to do please? thanks, Martin |
|
Administrator
|
If you want to send messages to multiple destinations, you should
use multicast(a, b, c) instead of to(a, b, c). The latest means that the output of a will be sent to b, the output of b to c and the output of c back to the consumer. This is equivalent to to(a).to(b).to(c) On 9/18/07, Martin Novák <[hidden email]> wrote: > > Hello, I have a quite basic problem which I cannot solve. The thing is > that I want to send message to couple of services. These services should > be invoked from multiple threads because the processing will take quite > some time. Then I want to gather, and aggregate the output of the > services somehow nicely, and return it to the calling point. > > I was trying to use seda with combination with aggregator, which worked > up to the point that it returned just the first message to the user - > before aggregator was able to process the rest. I know that the whole > service is asynchronous by my design, but it would be nice to know how > to gather the results back... > > there is an excerpt from my code - just an illustation: > > from("direct:a").setHeader("splitCount", new Expression() { > public Object evaluate(Exchange exchange) { > return 2; > }}).to("seda:b", "seda:c"); > from("seda:b").process(new HeaderSetter("msgCounter", > 0)).process(traceProcessor).to("ret:xxx").to("seda:z"); > from("seda:c").process(new HeaderSetter("msgCounter", > 1)).process(traceProcessor).to("ret:xxx").to("seda:z"); > from("seda:z").aggregator(header("msgId"), > aggregationStrategy).to("return:a"); > > Exchange exchange = new DefaultExchange(ctx); > exchange.getIn().setBody("<result" + ii + " />"); > exchange.getIn().setHeader("msgId", "ahojValue" + ii); > exchange = template.send("direct:a", exchange); > Object body = exchange.getOut().getBody(); > > can somebody advice me what to do please? > thanks, > Martin > -- Cheers, Guillaume Nodet ------------------------ Blog: http://gnodet.blogspot.com/ |
|
Thanks a lot for pointing this out. However, it doesn't solve my
problem, because I want to gather the output of a,b,c into one message, and send it back to the client. The call (template.send(...)) is returned before aggregator can aggregate them, and send them back. Martin Guillaume Nodet wrote: > If you want to send messages to multiple destinations, you should > use multicast(a, b, c) instead of to(a, b, c). The latest means that the > output of a will be sent to b, the output of b to c and the output of c back > to the consumer. This is equivalent to to(a).to(b).to(c) > > On 9/18/07, Martin Novák <[hidden email]> wrote: >> Hello, I have a quite basic problem which I cannot solve. The thing is >> that I want to send message to couple of services. These services should >> be invoked from multiple threads because the processing will take quite >> some time. Then I want to gather, and aggregate the output of the >> services somehow nicely, and return it to the calling point. >> >> I was trying to use seda with combination with aggregator, which worked >> up to the point that it returned just the first message to the user - >> before aggregator was able to process the rest. I know that the whole >> service is asynchronous by my design, but it would be nice to know how >> to gather the results back... >> >> there is an excerpt from my code - just an illustation: >> >> from("direct:a").setHeader("splitCount", new Expression() { >> public Object evaluate(Exchange exchange) { >> return 2; >> }}).to("seda:b", "seda:c"); >> from("seda:b").process(new HeaderSetter("msgCounter", >> 0)).process(traceProcessor).to("ret:xxx").to("seda:z"); >> from("seda:c").process(new HeaderSetter("msgCounter", >> 1)).process(traceProcessor).to("ret:xxx").to("seda:z"); >> from("seda:z").aggregator(header("msgId"), >> aggregationStrategy).to("return:a"); >> >> Exchange exchange = new DefaultExchange(ctx); >> exchange.getIn().setBody("<result" + ii + " />"); >> exchange.getIn().setHeader("msgId", "ahojValue" + ii); >> exchange = template.send("direct:a", exchange); >> Object body = exchange.getOut().getBody(); >> >> can somebody advice me what to do please? >> thanks, >> Martin >> > > > |
|
Administrator
|
I think there are two different things here. The first one is to have
the aggregator receive, correlate and create the aggregate response. The process of going through a splitter / aggregator pattern, is inherently based on InOnly exchanges. So if you want to have the response synchronously, there is a need for a bridge between an InOut and two InOnly exchanges. I don't think this is supported yet. I guess one way to do that would be to send the response from the aggregator to a seda queue and create a processor that will grab the correct response from this queue, and set it as the out message. Makes sense ? On 9/18/07, Martin Novák <[hidden email]> wrote: > Thanks a lot for pointing this out. However, it doesn't solve my > problem, because I want to gather the output of a,b,c into one message, > and send it back to the client. The call (template.send(...)) is > returned before aggregator can aggregate them, and send them back. > > Martin > > Guillaume Nodet wrote: > > If you want to send messages to multiple destinations, you should > > use multicast(a, b, c) instead of to(a, b, c). The latest means that the > > output of a will be sent to b, the output of b to c and the output of c back > > to the consumer. This is equivalent to to(a).to(b).to(c) > > > > On 9/18/07, Martin Novák <[hidden email]> wrote: > >> Hello, I have a quite basic problem which I cannot solve. The thing is > >> that I want to send message to couple of services. These services should > >> be invoked from multiple threads because the processing will take quite > >> some time. Then I want to gather, and aggregate the output of the > >> services somehow nicely, and return it to the calling point. > >> > >> I was trying to use seda with combination with aggregator, which worked > >> up to the point that it returned just the first message to the user - > >> before aggregator was able to process the rest. I know that the whole > >> service is asynchronous by my design, but it would be nice to know how > >> to gather the results back... > >> > >> there is an excerpt from my code - just an illustation: > >> > >> from("direct:a").setHeader("splitCount", new Expression() { > >> public Object evaluate(Exchange exchange) { > >> return 2; > >> }}).to("seda:b", "seda:c"); > >> from("seda:b").process(new HeaderSetter("msgCounter", > >> 0)).process(traceProcessor).to("ret:xxx").to("seda:z"); > >> from("seda:c").process(new HeaderSetter("msgCounter", > >> 1)).process(traceProcessor).to("ret:xxx").to("seda:z"); > >> from("seda:z").aggregator(header("msgId"), > >> aggregationStrategy).to("return:a"); > >> > >> Exchange exchange = new DefaultExchange(ctx); > >> exchange.getIn().setBody("<result" + ii + " />"); > >> exchange.getIn().setHeader("msgId", "ahojValue" + ii); > >> exchange = template.send("direct:a", exchange); > >> Object body = exchange.getOut().getBody(); > >> > >> can somebody advice me what to do please? > >> thanks, > >> Martin > >> > > > > > > > -- Cheers, Guillaume Nodet ------------------------ Blog: http://gnodet.blogspot.com/ |
| Powered by Nabble | Edit this page |
