Re: ProducerTemplate.request() not returning complete Exchange

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Re: ProducerTemplate.request() not returning complete Exchange

kilidarria

 I understand that the new Processor(){} is for the message being sent to the consumers (from(route)) and this is working fine.
What I am not understanding is why the ProducerTemplate.request() is not returning an exchange back at the end of the route.
For example.
Exchange reply = producerTemplate.request("jms:Sahara", new Processor() {
        public void process(final Exchange exchange) {
        exchange.getIn().setHeader(“test”,”foo”)
        }
Route
     From(jms:Sahara”)
          .log(LoggingLevel.INFO,"* Test ID: ${header.test}" )  
          .setHeader(“test”,”bar”);
end route

assertThat(“return should be bar”,reply.getIn().getHeader(“test”),equalTo(“bar”)
but it is still “foo”
This is failing when  it should pass

Maybe I’m reading these wrong
From Javadoc.io
Exchange request(Endpoint endpoint, Processor processor)
Sends an exchange to an endpoint using a supplied processor Uses an ExchangePattern.InOut message exchange pattern.

from the camel apache org website
request*() methods

The send*() methods use the default Message Exchange Pattern (InOnly, InOut etc) as the endpoint. If you want to explicitly perform a request/response (InOut) you can use the request*() methods instead of the send*() methods.

E.g. let’s invoke an endpoint and get the response:



>Camel › Camel - Users › Camel - Users
>kilidarria
       
>Re: ProducerTemplate.request() not returning complete Exchange.
>‹ Previous Topic Next Topic ›
 
classic Classic list List threaded Threaded
 X  Turn off highlighting 1 message Options Options
       
Classic Threaded   Reply   More   Close
Jan 15, 2020; 12:34am Claus Ibsen-2 Claus Ibsen-2 Re: ProducerTemplate.request() not returning complete Exchange.
Hi

The new Processor(){ } in the request method of the producer template is
not for the reply message, but for preparing the message to send (eg the
input message).

On Tue, Jan 14, 2020 at 9:16 PM Wilken Marci J <
[hidden email]> wrote:

> Based on the documentation I was reading the
> ProducerTemplate.request(<URL>, Processor) should return the  Exchange
> information from the route that was called.  When I am running my test on
> this it is returning the body information in the
> exchange.getOut().getBody() but the exchange.getOut().getHeaders() has the
> headers that I sent and not returned headers that I see if I look at the
> mock endpoint.  I am expecting it to return the header and body data in the
> Exchange.getIn().  Since this test is calling a jms message Queue  (JBoss
> EAP 7.2 Fuse 7.3) I am expecting this to return the same thing as the a
> call from another route would see.  Am I understanding this wrong or am I
> missing a setting?
>
>
>
> *public void testConnection() throws Exception {*
>
> *       log.info <http://log.info>("----->>  Begin testConnection
> <<-----");*
>
> *       Assert.assertNotNull("Expected QueueRoute-context to exist",
> camelctx);*
>
> *       Assert.assertEquals(ServiceStatus.Started, camelctx.getStatus());*
>
>
>
> *       MockEndpoint mock = camelctx.getEndpoint("mock:JobLogResult",
> MockEndpoint.class);*
>
> *       assertNotNull("Expected mock:JobLogResult to exist", mock);*
>
> *       mock.reset();*
>
> *       mock.expectedMessageCount(1);*
>
>
>
> *       ProducerTemplate producerTemplate =
> camelctx.createProducerTemplate();*
>
> *       Assert.assertNotNull("Expected producerTemplate to exist",
> producerTemplate);*
>
>
>
> *       Exchange reply =
> producerTemplate.request("jms:queue:Sahara.Core.JobIdRequest", new
> Processor() {*
>
>
>
> *              public void process(final Exchange exchange) {*
>
>
>
> *                     Map<String, Object> headers = new HashMap<String,
> Object>();*
>
> *                     headers.put("RequestingSource", requestingSource);*
>
> *                     headers.put("AuthorizationKey", authorizationKey);*
>
> *                     headers.put("RequestType", "testConnection");*
>
>
>
> *                     Map<String, Object> payload = new HashMap<String,
> Object>();*
>
>
>
> *                     exchange.getIn().setHeaders(headers);*
>
> *                     exchange.getIn().setBody(payload);*
>
> *                     exchange.setPattern(ExchangePattern.InOut);*
>
>
>
> *              }*
>
> *       });*
>
> *       // these fail*
>
> *       assertThat("Request was un-successful", (
> reply.getIn().getHeader("SQLErrorMessage") == null ? "null"
> :((String)reply.getIn().getHeader("SQLErrorMessage"))).toLowerCase()
> ,equalTo("success"));*
>
> *       assertThat("Didn't Return Data ", (int)
> reply.getIn().getHeader("CamelSqlRowCount"),equalTo(1));*
>
>
>
>
>
> *       // these are successfull.*
>
> *       List<Exchange> list = mock.getReceivedExchanges();*
>
> *       Exchange reply2 = list.get(0);*
>
> *       assertThat("Request was un-successful", (
> reply2.getIn().getHeader("SQLErrorMessage") == null ? "null"
> :((String)reply2.getIn().getHeader("SQLErrorMessage"))).toLowerCase()
> ,equalTo("success"));*
>
> *       assertThat("Didn't Return Data ", (int)
> reply2.getIn().getHeader("CamelSqlRowCount"),equalTo(1));*
>
> *}*
>
>
>
> *Route*
>
> *from("jms:queue:Sahara.Core.JobIdRequest?exchangePattern=InOut")*
>
> *      .routeId("JobLogRouter")*
>
> *
> .log(LoggingLevel.INFO,"*************************************************")
> *
>
> *       .log(LoggingLevel.INFO,"* Beginning JobAuditingRequest")*
>
> *       .log(LoggingLevel.INFO,"* Requesting Source:
> ${header.RequestingSource} " )*
>
> *       .log(LoggingLevel.INFO,"* Request Type: ${header.RequestType} " )*
>
> *
> .log(LoggingLevel.INFO,"*************************************************")
> *
>
> *       .choice()    *
>
> *              .when(simple ("${header.RequestType} == \"complete\""))*
>
> *                     .to("direct:CompleteJob")*
>
> *                     .endChoice()*
>
> *
> .when().simple("${header.RequestType?.equalsIgnoreCase(\"audit\")}")*
>
> *                     .to("direct:LogAudit")*
>
> *                     .endChoice()*
>
> *              .when(simple ("${header.RequestType} == \"error\""))*
>
> *                     .to("direct:LogError")*
>
> *                     .endChoice()*
>
> *
> .when().simple("${header.RequestType?.equalsIgnoreCase(\"whoami\")}")*
>
> *                 .to("direct:WhoAmI")*
>
> *                 .endChoice()*
>
> *
> .when().simple("${header.RequestType?.equalsIgnoreCase(\"testConnection\")}")*
>
> *                 .to("direct:ConnectionTest")*
>
> *                 .endChoice()*
>
> *
> .when().simple("${header.RequestType?.equalsIgnoreCase(\"start\")} ||
> ${header.RequestType?.equalsIgnoreCase(\"open\")}")*
>
> *                 .to("direct:InitialJobRequest")*
>
> *                 .endChoice()   *
>
> *              .otherwise()*
>
> *                     .process(new Processor() {*
>
> *                           public void process(Exchange exchange) throws
> Exception {*
>
> *                       //     Message in = exchange.getIn();*
>
> *                                  Map<String, Object> payload = new
> HashMap<String, Object>();*
>
> *                                  exchange.setOut(exchange.getIn());*
>
> *                                  payload = (Map<String,Object>)
> exchange.getIn().getBody();*
>
> *                                  payload.put("whoami", "I am " +
> getContext().resolvePropertyPlaceholders("{{whoami}}") );*
>
> *                                  payload.put("SQLErrorMessage", "No
> Choice Provided");*
>
> *                                  payload.put("SQLErrorNumber", -1);*
>
> *                                  exchange.getOut().setBody(payload);*
>
> *
> exchange.getOut().setHeader("RequestType", "failed");*
>
>
>
> *                                  }*
>
>
>
> *                     })*
>
> *                     .endChoice()*
>
> *       .end()*
>
> *       .process(new Processor() {*
>
> *              public void process(Exchange exchange) throws Exception {*
>
>
>
> *                     exchange.setOut(exchange.getIn());*
>
> *
> exchange.getOut().setHeader("BodyType",exchange.getIn().getBody().getClass().getName());*
>
> *                     exchange.getOut().setHeader("BodyEncoding", "JSON");*
>
> *                     String payloadJson = new
> ObjectMapper().writeValueAsString(exchange.getIn().getBody());*
>
> *                     log.debug(payloadJson);*
>
> *                     exchange.getOut().setBody(payloadJson);*
>
>
>
> *              }*
>
>
>
> *       })*
>
> *       .log(LoggingLevel.INFO, "Request results Job ID Request : \r\n
> Header:  ${headers} \r\n Body:  ${body} ")*
>
>
>
> *       .to("mock:JobLogResult")*
>
> *          .onException(Exception.class)*
>
> *              .setHeader("MESSAGE_INFO", constant("Failure to Process
> Audit Job"))*
>
> *              .setHeader("ROUTE_STOP", constant(Boolean.TRUE))*
>
> *              .log(LoggingLevel.ERROR, "${exception.stackTrace}")*
>
> *              .maximumRedeliveries("0")*
>
> *              .redeliveryDelay("0")*
>
> *              .id("_onExceptionCron")*
>
> *              .handled(true)*
>
> *              ;*
>
> *       ;*
>
>
>
>
>
> * from("direct:ConnectionTest")*
>
> *       .routeId("ConnectionTest")*
>
> *       .log(LoggingLevel.INFO, "Beginning Connection Test")*
>
> *       .log(LoggingLevel.INFO,"whoami ->> {{whoami}}")*
>
> *       .log(LoggingLevel.INFO, "Request: \r\n Header:  ${headers} \r\n
> Body:  ${body} ")*
>
> *       .to("sql:select name from sys.sysusers where uid =
> 0?dataSource=#Sahara_DS")*
>
> *       .log(LoggingLevel.INFO, "Request Result: \r\n Header:  ${headers}
> \r\n Body:  ${body} ")*
>
>
>
> *       .process(new Processor() {*
>
> *              public void process(Exchange exchange) throws Exception {*
>
> *              Map<String, Object> payload = new HashMap<String,
> Object>();*
>
> *              exchange.setOut(exchange.getIn());*
>
> *              ArrayList<Map<String,Object>> body =
> (ArrayList<Map<String,Object>>)  exchange.getIn().getBody();*
>
> *              payload.put("whoami", "I am " +
> getContext().resolvePropertyPlaceholders("{{whoami}}") );*
>
> *              body.add(payload);*
>
> *              exchange.getOut().setBody(body);*
>
> *              exchange.getOut().setHeader("RequestType", "complete");*
>
> *              exchange.getOut().setHeader("SQLErrorMessage", "Success");*
>
> *       }*
>
>
>
> *       })*
>
> *       .onException(SQLException.class)*
>
> *              .setHeader("MESSAGE_INFO", constant("Sql Connection
> Error"))*
>
> *              .setHeader("ROUTE_STOP", constant(Boolean.TRUE))*
>
> *              .log(LoggingLevel.ERROR, "${exception.stackTrace}")*
>
> *              .maximumRedeliveries("0")*
>
> *              .redeliveryDelay("0")*
>
> *              .id("_onExceptionConnectionTest")*
>
> *              .handled(false)*
>
> *       ;*
>
>
>
>
>
>
>
>
>
> Regards-
>
> Marci Wilken
>
> Operations Architect
>
> Office of Information Services
>
> OHA/DHS/CAF-CW/OR-KIDS
>
> 503.378.2405
>
> [image: Description: Description: Description:
> cid:image001.png@01CCF13E.2FACEC70][image: OHAsiglogo]
>
> CONFIDENTIALITY NOTICE
>
> *This email may contain information that is privileged, confidential, or
> otherwise exempt from disclosure under applicable law. If you are not the
> addressee or it appears from the context or otherwise that you have
> received this email in error, please advise me immediately by reply email,
> keep the contents confidential, and immediately delete the message and any
> attachments from your system. *
>
>
>
>
>
«  [hide part of quote]


--
Claus Ibsen
-----------------
http://davsclaus.com @davsclaus
Camel in Action 2: https://www.manning.com/ibsen2
« Return to Camel - Users  |  12 views
Free forum by Nabble Disable Popup Ads | Edit this page

Reply | Threaded
Open this post in threaded view
|

Re: ProducerTemplate.request() not returning complete Exchange

Alex Dettinger
Hi Marci,

  I would say that from("jms:Sahara")... has MEP InOUt so In
message with header "foo",
and OUT mess

On Fri, Feb 14, 2020 at 11:43 PM Marci <[hidden email]> wrote:

>
>  I understand that the new Processor(){} is for the message being sent to
> the consumers (from(route)) and this is working fine.
> What I am not understanding is why the ProducerTemplate.request() is not
> returning an exchange back at the end of the route.
> For example.
> Exchange reply = producerTemplate.request("jms:Sahara", new Processor() {
>         public void process(final Exchange exchange) {
>         exchange.getIn().setHeader(“test”,”foo”)
>         }
> Route
>      From(jms:Sahara”)
>           .log(LoggingLevel.INFO,"* Test ID: ${header.test}" )
>           .setHeader(“test”,”bar”);
> end route
>
> assertThat(“return should be
> bar”,reply.getIn().getHeader(“test”),equalTo(“bar”)
> but it is still “foo”
> This is failing when  it should pass
>
> Maybe I’m reading these wrong
> From Javadoc.io
> Exchange        request(Endpoint endpoint, Processor processor)
> Sends an exchange to an endpoint using a supplied processor Uses an
> ExchangePattern.InOut message exchange pattern.
>
> from the camel apache org website
> request*() methods
>
> The send*() methods use the default Message Exchange Pattern (InOnly,
> InOut etc) as the endpoint. If you want to explicitly perform a
> request/response (InOut) you can use the request*() methods instead of the
> send*() methods.
>
> E.g. let’s invoke an endpoint and get the response:
>
>
>
> >Camel › Camel - Users › Camel - Users
> >kilidarria
>
> >Re: ProducerTemplate.request() not returning complete Exchange.
> >‹ Previous Topic Next Topic ›
>
> classic         Classic         list    List    threaded        Threaded
>  X  Turn off highlighting 1 message Options Options
>
> Classic Threaded   Reply   More   Close
> Jan 15, 2020; 12:34am Claus Ibsen-2 Claus Ibsen-2 Re:
> ProducerTemplate.request() not returning complete Exchange.
> Hi
>
> The new Processor(){ } in the request method of the producer template is
> not for the reply message, but for preparing the message to send (eg the
> input message).
>
> On Tue, Jan 14, 2020 at 9:16 PM Wilken Marci J <
> [hidden email]> wrote:
>
> > Based on the documentation I was reading the
> > ProducerTemplate.request(<URL>, Processor) should return the  Exchange
> > information from the route that was called.  When I am running my test on
> > this it is returning the body information in the
> > exchange.getOut().getBody() but the exchange.getOut().getHeaders() has
> the
> > headers that I sent and not returned headers that I see if I look at the
> > mock endpoint.  I am expecting it to return the header and body data in
> the
> > Exchange.getIn().  Since this test is calling a jms message Queue  (JBoss
> > EAP 7.2 Fuse 7.3) I am expecting this to return the same thing as the a
> > call from another route would see.  Am I understanding this wrong or am I
> > missing a setting?
> >
> >
> >
> > *public void testConnection() throws Exception {*
> >
> > *       log.info <http://log.info>("----->>  Begin testConnection
> > <<-----");*
> >
> > *       Assert.assertNotNull("Expected QueueRoute-context to exist",
> > camelctx);*
> >
> > *       Assert.assertEquals(ServiceStatus.Started,
> camelctx.getStatus());*
> >
> >
> >
> > *       MockEndpoint mock = camelctx.getEndpoint("mock:JobLogResult",
> > MockEndpoint.class);*
> >
> > *       assertNotNull("Expected mock:JobLogResult to exist", mock);*
> >
> > *       mock.reset();*
> >
> > *       mock.expectedMessageCount(1);*
> >
> >
> >
> > *       ProducerTemplate producerTemplate =
> > camelctx.createProducerTemplate();*
> >
> > *       Assert.assertNotNull("Expected producerTemplate to exist",
> > producerTemplate);*
> >
> >
> >
> > *       Exchange reply =
> > producerTemplate.request("jms:queue:Sahara.Core.JobIdRequest", new
> > Processor() {*
> >
> >
> >
> > *              public void process(final Exchange exchange) {*
> >
> >
> >
> > *                     Map<String, Object> headers = new HashMap<String,
> > Object>();*
> >
> > *                     headers.put("RequestingSource", requestingSource);*
> >
> > *                     headers.put("AuthorizationKey", authorizationKey);*
> >
> > *                     headers.put("RequestType", "testConnection");*
> >
> >
> >
> > *                     Map<String, Object> payload = new HashMap<String,
> > Object>();*
> >
> >
> >
> > *                     exchange.getIn().setHeaders(headers);*
> >
> > *                     exchange.getIn().setBody(payload);*
> >
> > *                     exchange.setPattern(ExchangePattern.InOut);*
> >
> >
> >
> > *              }*
> >
> > *       });*
> >
> > *       // these fail*
> >
> > *       assertThat("Request was un-successful", (
> > reply.getIn().getHeader("SQLErrorMessage") == null ? "null"
> > :((String)reply.getIn().getHeader("SQLErrorMessage"))).toLowerCase()
> > ,equalTo("success"));*
> >
> > *       assertThat("Didn't Return Data ", (int)
> > reply.getIn().getHeader("CamelSqlRowCount"),equalTo(1));*
> >
> >
> >
> >
> >
> > *       // these are successfull.*
> >
> > *       List<Exchange> list = mock.getReceivedExchanges();*
> >
> > *       Exchange reply2 = list.get(0);*
> >
> > *       assertThat("Request was un-successful", (
> > reply2.getIn().getHeader("SQLErrorMessage") == null ? "null"
> > :((String)reply2.getIn().getHeader("SQLErrorMessage"))).toLowerCase()
> > ,equalTo("success"));*
> >
> > *       assertThat("Didn't Return Data ", (int)
> > reply2.getIn().getHeader("CamelSqlRowCount"),equalTo(1));*
> >
> > *}*
> >
> >
> >
> > *Route*
> >
> > *from("jms:queue:Sahara.Core.JobIdRequest?exchangePattern=InOut")*
> >
> > *      .routeId("JobLogRouter")*
> >
> > *
> >
> .log(LoggingLevel.INFO,"*************************************************")
> > *
> >
> > *       .log(LoggingLevel.INFO,"* Beginning JobAuditingRequest")*
> >
> > *       .log(LoggingLevel.INFO,"* Requesting Source:
> > ${header.RequestingSource} " )*
> >
> > *       .log(LoggingLevel.INFO,"* Request Type: ${header.RequestType} "
> )*
> >
> > *
> >
> .log(LoggingLevel.INFO,"*************************************************")
> > *
> >
> > *       .choice()    *
> >
> > *              .when(simple ("${header.RequestType} == \"complete\""))*
> >
> > *                     .to("direct:CompleteJob")*
> >
> > *                     .endChoice()*
> >
> > *
> > .when().simple("${header.RequestType?.equalsIgnoreCase(\"audit\")}")*
> >
> > *                     .to("direct:LogAudit")*
> >
> > *                     .endChoice()*
> >
> > *              .when(simple ("${header.RequestType} == \"error\""))*
> >
> > *                     .to("direct:LogError")*
> >
> > *                     .endChoice()*
> >
> > *
> > .when().simple("${header.RequestType?.equalsIgnoreCase(\"whoami\")}")*
> >
> > *                 .to("direct:WhoAmI")*
> >
> > *                 .endChoice()*
> >
> > *
> >
> .when().simple("${header.RequestType?.equalsIgnoreCase(\"testConnection\")}")*
> >
> > *                 .to("direct:ConnectionTest")*
> >
> > *                 .endChoice()*
> >
> > *
> > .when().simple("${header.RequestType?.equalsIgnoreCase(\"start\")} ||
> > ${header.RequestType?.equalsIgnoreCase(\"open\")}")*
> >
> > *                 .to("direct:InitialJobRequest")*
> >
> > *                 .endChoice()   *
> >
> > *              .otherwise()*
> >
> > *                     .process(new Processor() {*
> >
> > *                           public void process(Exchange exchange) throws
> > Exception {*
> >
> > *                       //     Message in = exchange.getIn();*
> >
> > *                                  Map<String, Object> payload = new
> > HashMap<String, Object>();*
> >
> > *                                  exchange.setOut(exchange.getIn());*
> >
> > *                                  payload = (Map<String,Object>)
> > exchange.getIn().getBody();*
> >
> > *                                  payload.put("whoami", "I am " +
> > getContext().resolvePropertyPlaceholders("{{whoami}}") );*
> >
> > *                                  payload.put("SQLErrorMessage", "No
> > Choice Provided");*
> >
> > *                                  payload.put("SQLErrorNumber", -1);*
> >
> > *                                  exchange.getOut().setBody(payload);*
> >
> > *
> > exchange.getOut().setHeader("RequestType", "failed");*
> >
> >
> >
> > *                                  }*
> >
> >
> >
> > *                     })*
> >
> > *                     .endChoice()*
> >
> > *       .end()*
> >
> > *       .process(new Processor() {*
> >
> > *              public void process(Exchange exchange) throws Exception {*
> >
> >
> >
> > *                     exchange.setOut(exchange.getIn());*
> >
> > *
> >
> exchange.getOut().setHeader("BodyType",exchange.getIn().getBody().getClass().getName());*
> >
> > *                     exchange.getOut().setHeader("BodyEncoding",
> "JSON");*
> >
> > *                     String payloadJson = new
> > ObjectMapper().writeValueAsString(exchange.getIn().getBody());*
> >
> > *                     log.debug(payloadJson);*
> >
> > *                     exchange.getOut().setBody(payloadJson);*
> >
> >
> >
> > *              }*
> >
> >
> >
> > *       })*
> >
> > *       .log(LoggingLevel.INFO, "Request results Job ID Request : \r\n
> > Header:  ${headers} \r\n Body:  ${body} ")*
> >
> >
> >
> > *       .to("mock:JobLogResult")*
> >
> > *          .onException(Exception.class)*
> >
> > *              .setHeader("MESSAGE_INFO", constant("Failure to Process
> > Audit Job"))*
> >
> > *              .setHeader("ROUTE_STOP", constant(Boolean.TRUE))*
> >
> > *              .log(LoggingLevel.ERROR, "${exception.stackTrace}")*
> >
> > *              .maximumRedeliveries("0")*
> >
> > *              .redeliveryDelay("0")*
> >
> > *              .id("_onExceptionCron")*
> >
> > *              .handled(true)*
> >
> > *              ;*
> >
> > *       ;*
> >
> >
> >
> >
> >
> > * from("direct:ConnectionTest")*
> >
> > *       .routeId("ConnectionTest")*
> >
> > *       .log(LoggingLevel.INFO, "Beginning Connection Test")*
> >
> > *       .log(LoggingLevel.INFO,"whoami ->> {{whoami}}")*
> >
> > *       .log(LoggingLevel.INFO, "Request: \r\n Header:  ${headers} \r\n
> > Body:  ${body} ")*
> >
> > *       .to("sql:select name from sys.sysusers where uid =
> > 0?dataSource=#Sahara_DS")*
> >
> > *       .log(LoggingLevel.INFO, "Request Result: \r\n Header:  ${headers}
> > \r\n Body:  ${body} ")*
> >
> >
> >
> > *       .process(new Processor() {*
> >
> > *              public void process(Exchange exchange) throws Exception {*
> >
> > *              Map<String, Object> payload = new HashMap<String,
> > Object>();*
> >
> > *              exchange.setOut(exchange.getIn());*
> >
> > *              ArrayList<Map<String,Object>> body =
> > (ArrayList<Map<String,Object>>)  exchange.getIn().getBody();*
> >
> > *              payload.put("whoami", "I am " +
> > getContext().resolvePropertyPlaceholders("{{whoami}}") );*
> >
> > *              body.add(payload);*
> >
> > *              exchange.getOut().setBody(body);*
> >
> > *              exchange.getOut().setHeader("RequestType", "complete");*
> >
> > *              exchange.getOut().setHeader("SQLErrorMessage",
> "Success");*
> >
> > *       }*
> >
> >
> >
> > *       })*
> >
> > *       .onException(SQLException.class)*
> >
> > *              .setHeader("MESSAGE_INFO", constant("Sql Connection
> > Error"))*
> >
> > *              .setHeader("ROUTE_STOP", constant(Boolean.TRUE))*
> >
> > *              .log(LoggingLevel.ERROR, "${exception.stackTrace}")*
> >
> > *              .maximumRedeliveries("0")*
> >
> > *              .redeliveryDelay("0")*
> >
> > *              .id("_onExceptionConnectionTest")*
> >
> > *              .handled(false)*
> >
> > *       ;*
> >
> >
> >
> >
> >
> >
> >
> >
> >
> > Regards-
> >
> > Marci Wilken
> >
> > Operations Architect
> >
> > Office of Information Services
> >
> > OHA/DHS/CAF-CW/OR-KIDS
> >
> > 503.378.2405
> >
> > [image: Description: Description: Description:
> > cid:image001.png@01CCF13E.2FACEC70][image: OHAsiglogo]
> >
> > CONFIDENTIALITY NOTICE
> >
> > *This email may contain information that is privileged, confidential, or
> > otherwise exempt from disclosure under applicable law. If you are not the
> > addressee or it appears from the context or otherwise that you have
> > received this email in error, please advise me immediately by reply
> email,
> > keep the contents confidential, and immediately delete the message and
> any
> > attachments from your system. *
> >
> >
> >
> >
> >
> «  [hide part of quote]
>
>
> --
> Claus Ibsen
> -----------------
> http://davsclaus.com @davsclaus
> Camel in Action 2: https://www.manning.com/ibsen2
> « Return to Camel - Users  |  12 views
> Free forum by Nabble    Disable Popup Ads | Edit this page
>
>
Reply | Threaded
Open this post in threaded view
|

Re: ProducerTemplate.request() not returning complete Exchange

Alex Dettinger
Hi Marci,

  I would say that from("jms:Sahara")... has MEP InOUt so In
message with header "foo",
and OUT message with header "bar".

  So assertThat(“return should be
bar”,reply.get*Out/Message*().getHeader(“test”),equalTo(“bar”)
should be ok.

Alex

On Mon, Feb 17, 2020 at 11:02 AM Alex Dettinger <[hidden email]>
wrote:

> Hi Marci,
>
>   I would say that from("jms:Sahara")... has MEP InOUt so In
> message with header "foo",
> and OUT mess
>
> On Fri, Feb 14, 2020 at 11:43 PM Marci <[hidden email]> wrote:
>
>>
>>  I understand that the new Processor(){} is for the message being sent to
>> the consumers (from(route)) and this is working fine.
>> What I am not understanding is why the ProducerTemplate.request() is not
>> returning an exchange back at the end of the route.
>> For example.
>> Exchange reply = producerTemplate.request("jms:Sahara", new Processor() {
>>         public void process(final Exchange exchange) {
>>         exchange.getIn().setHeader(“test”,”foo”)
>>         }
>> Route
>>      From(jms:Sahara”)
>>           .log(LoggingLevel.INFO,"* Test ID: ${header.test}" )
>>           .setHeader(“test”,”bar”);
>> end route
>>
>> assertThat(“return should be
>> bar”,reply.getIn().getHeader(“test”),equalTo(“bar”)
>> but it is still “foo”
>> This is failing when  it should pass
>>
>> Maybe I’m reading these wrong
>> From Javadoc.io
>> Exchange        request(Endpoint endpoint, Processor processor)
>> Sends an exchange to an endpoint using a supplied processor Uses an
>> ExchangePattern.InOut message exchange pattern.
>>
>> from the camel apache org website
>> request*() methods
>>
>> The send*() methods use the default Message Exchange Pattern (InOnly,
>> InOut etc) as the endpoint. If you want to explicitly perform a
>> request/response (InOut) you can use the request*() methods instead of the
>> send*() methods.
>>
>> E.g. let’s invoke an endpoint and get the response:
>>
>>
>>
>> >Camel › Camel - Users › Camel - Users
>> >kilidarria
>>
>> >Re: ProducerTemplate.request() not returning complete Exchange.
>> >‹ Previous Topic Next Topic ›
>>
>> classic         Classic         list    List    threaded        Threaded
>>  X  Turn off highlighting 1 message Options Options
>>
>> Classic Threaded   Reply   More   Close
>> Jan 15, 2020; 12:34am Claus Ibsen-2 Claus Ibsen-2 Re:
>> ProducerTemplate.request() not returning complete Exchange.
>> Hi
>>
>> The new Processor(){ } in the request method of the producer template is
>> not for the reply message, but for preparing the message to send (eg the
>> input message).
>>
>> On Tue, Jan 14, 2020 at 9:16 PM Wilken Marci J <
>> [hidden email]> wrote:
>>
>> > Based on the documentation I was reading the
>> > ProducerTemplate.request(<URL>, Processor) should return the  Exchange
>> > information from the route that was called.  When I am running my test
>> on
>> > this it is returning the body information in the
>> > exchange.getOut().getBody() but the exchange.getOut().getHeaders() has
>> the
>> > headers that I sent and not returned headers that I see if I look at the
>> > mock endpoint.  I am expecting it to return the header and body data in
>> the
>> > Exchange.getIn().  Since this test is calling a jms message Queue
>> (JBoss
>> > EAP 7.2 Fuse 7.3) I am expecting this to return the same thing as the a
>> > call from another route would see.  Am I understanding this wrong or am
>> I
>> > missing a setting?
>> >
>> >
>> >
>> > *public void testConnection() throws Exception {*
>> >
>> > *       log.info <http://log.info>("----->>  Begin testConnection
>> > <<-----");*
>> >
>> > *       Assert.assertNotNull("Expected QueueRoute-context to exist",
>> > camelctx);*
>> >
>> > *       Assert.assertEquals(ServiceStatus.Started,
>> camelctx.getStatus());*
>> >
>> >
>> >
>> > *       MockEndpoint mock = camelctx.getEndpoint("mock:JobLogResult",
>> > MockEndpoint.class);*
>> >
>> > *       assertNotNull("Expected mock:JobLogResult to exist", mock);*
>> >
>> > *       mock.reset();*
>> >
>> > *       mock.expectedMessageCount(1);*
>> >
>> >
>> >
>> > *       ProducerTemplate producerTemplate =
>> > camelctx.createProducerTemplate();*
>> >
>> > *       Assert.assertNotNull("Expected producerTemplate to exist",
>> > producerTemplate);*
>> >
>> >
>> >
>> > *       Exchange reply =
>> > producerTemplate.request("jms:queue:Sahara.Core.JobIdRequest", new
>> > Processor() {*
>> >
>> >
>> >
>> > *              public void process(final Exchange exchange) {*
>> >
>> >
>> >
>> > *                     Map<String, Object> headers = new HashMap<String,
>> > Object>();*
>> >
>> > *                     headers.put("RequestingSource",
>> requestingSource);*
>> >
>> > *                     headers.put("AuthorizationKey",
>> authorizationKey);*
>> >
>> > *                     headers.put("RequestType", "testConnection");*
>> >
>> >
>> >
>> > *                     Map<String, Object> payload = new HashMap<String,
>> > Object>();*
>> >
>> >
>> >
>> > *                     exchange.getIn().setHeaders(headers);*
>> >
>> > *                     exchange.getIn().setBody(payload);*
>> >
>> > *                     exchange.setPattern(ExchangePattern.InOut);*
>> >
>> >
>> >
>> > *              }*
>> >
>> > *       });*
>> >
>> > *       // these fail*
>> >
>> > *       assertThat("Request was un-successful", (
>> > reply.getIn().getHeader("SQLErrorMessage") == null ? "null"
>> > :((String)reply.getIn().getHeader("SQLErrorMessage"))).toLowerCase()
>> > ,equalTo("success"));*
>> >
>> > *       assertThat("Didn't Return Data ", (int)
>> > reply.getIn().getHeader("CamelSqlRowCount"),equalTo(1));*
>> >
>> >
>> >
>> >
>> >
>> > *       // these are successfull.*
>> >
>> > *       List<Exchange> list = mock.getReceivedExchanges();*
>> >
>> > *       Exchange reply2 = list.get(0);*
>> >
>> > *       assertThat("Request was un-successful", (
>> > reply2.getIn().getHeader("SQLErrorMessage") == null ? "null"
>> > :((String)reply2.getIn().getHeader("SQLErrorMessage"))).toLowerCase()
>> > ,equalTo("success"));*
>> >
>> > *       assertThat("Didn't Return Data ", (int)
>> > reply2.getIn().getHeader("CamelSqlRowCount"),equalTo(1));*
>> >
>> > *}*
>> >
>> >
>> >
>> > *Route*
>> >
>> > *from("jms:queue:Sahara.Core.JobIdRequest?exchangePattern=InOut")*
>> >
>> > *      .routeId("JobLogRouter")*
>> >
>> > *
>> >
>> .log(LoggingLevel.INFO,"*************************************************")
>> > *
>> >
>> > *       .log(LoggingLevel.INFO,"* Beginning JobAuditingRequest")*
>> >
>> > *       .log(LoggingLevel.INFO,"* Requesting Source:
>> > ${header.RequestingSource} " )*
>> >
>> > *       .log(LoggingLevel.INFO,"* Request Type: ${header.RequestType} "
>> )*
>> >
>> > *
>> >
>> .log(LoggingLevel.INFO,"*************************************************")
>> > *
>> >
>> > *       .choice()    *
>> >
>> > *              .when(simple ("${header.RequestType} == \"complete\""))*
>> >
>> > *                     .to("direct:CompleteJob")*
>> >
>> > *                     .endChoice()*
>> >
>> > *
>> > .when().simple("${header.RequestType?.equalsIgnoreCase(\"audit\")}")*
>> >
>> > *                     .to("direct:LogAudit")*
>> >
>> > *                     .endChoice()*
>> >
>> > *              .when(simple ("${header.RequestType} == \"error\""))*
>> >
>> > *                     .to("direct:LogError")*
>> >
>> > *                     .endChoice()*
>> >
>> > *
>> > .when().simple("${header.RequestType?.equalsIgnoreCase(\"whoami\")}")*
>> >
>> > *                 .to("direct:WhoAmI")*
>> >
>> > *                 .endChoice()*
>> >
>> > *
>> >
>> .when().simple("${header.RequestType?.equalsIgnoreCase(\"testConnection\")}")*
>> >
>> > *                 .to("direct:ConnectionTest")*
>> >
>> > *                 .endChoice()*
>> >
>> > *
>> > .when().simple("${header.RequestType?.equalsIgnoreCase(\"start\")} ||
>> > ${header.RequestType?.equalsIgnoreCase(\"open\")}")*
>> >
>> > *                 .to("direct:InitialJobRequest")*
>> >
>> > *                 .endChoice()   *
>> >
>> > *              .otherwise()*
>> >
>> > *                     .process(new Processor() {*
>> >
>> > *                           public void process(Exchange exchange)
>> throws
>> > Exception {*
>> >
>> > *                       //     Message in = exchange.getIn();*
>> >
>> > *                                  Map<String, Object> payload = new
>> > HashMap<String, Object>();*
>> >
>> > *                                  exchange.setOut(exchange.getIn());*
>> >
>> > *                                  payload = (Map<String,Object>)
>> > exchange.getIn().getBody();*
>> >
>> > *                                  payload.put("whoami", "I am " +
>> > getContext().resolvePropertyPlaceholders("{{whoami}}") );*
>> >
>> > *                                  payload.put("SQLErrorMessage", "No
>> > Choice Provided");*
>> >
>> > *                                  payload.put("SQLErrorNumber", -1);*
>> >
>> > *                                  exchange.getOut().setBody(payload);*
>> >
>> > *
>> > exchange.getOut().setHeader("RequestType", "failed");*
>> >
>> >
>> >
>> > *                                  }*
>> >
>> >
>> >
>> > *                     })*
>> >
>> > *                     .endChoice()*
>> >
>> > *       .end()*
>> >
>> > *       .process(new Processor() {*
>> >
>> > *              public void process(Exchange exchange) throws Exception
>> {*
>> >
>> >
>> >
>> > *                     exchange.setOut(exchange.getIn());*
>> >
>> > *
>> >
>> exchange.getOut().setHeader("BodyType",exchange.getIn().getBody().getClass().getName());*
>> >
>> > *                     exchange.getOut().setHeader("BodyEncoding",
>> "JSON");*
>> >
>> > *                     String payloadJson = new
>> > ObjectMapper().writeValueAsString(exchange.getIn().getBody());*
>> >
>> > *                     log.debug(payloadJson);*
>> >
>> > *                     exchange.getOut().setBody(payloadJson);*
>> >
>> >
>> >
>> > *              }*
>> >
>> >
>> >
>> > *       })*
>> >
>> > *       .log(LoggingLevel.INFO, "Request results Job ID Request : \r\n
>> > Header:  ${headers} \r\n Body:  ${body} ")*
>> >
>> >
>> >
>> > *       .to("mock:JobLogResult")*
>> >
>> > *          .onException(Exception.class)*
>> >
>> > *              .setHeader("MESSAGE_INFO", constant("Failure to Process
>> > Audit Job"))*
>> >
>> > *              .setHeader("ROUTE_STOP", constant(Boolean.TRUE))*
>> >
>> > *              .log(LoggingLevel.ERROR, "${exception.stackTrace}")*
>> >
>> > *              .maximumRedeliveries("0")*
>> >
>> > *              .redeliveryDelay("0")*
>> >
>> > *              .id("_onExceptionCron")*
>> >
>> > *              .handled(true)*
>> >
>> > *              ;*
>> >
>> > *       ;*
>> >
>> >
>> >
>> >
>> >
>> > * from("direct:ConnectionTest")*
>> >
>> > *       .routeId("ConnectionTest")*
>> >
>> > *       .log(LoggingLevel.INFO, "Beginning Connection Test")*
>> >
>> > *       .log(LoggingLevel.INFO,"whoami ->> {{whoami}}")*
>> >
>> > *       .log(LoggingLevel.INFO, "Request: \r\n Header:  ${headers} \r\n
>> > Body:  ${body} ")*
>> >
>> > *       .to("sql:select name from sys.sysusers where uid =
>> > 0?dataSource=#Sahara_DS")*
>> >
>> > *       .log(LoggingLevel.INFO, "Request Result: \r\n Header:
>> ${headers}
>> > \r\n Body:  ${body} ")*
>> >
>> >
>> >
>> > *       .process(new Processor() {*
>> >
>> > *              public void process(Exchange exchange) throws Exception
>> {*
>> >
>> > *              Map<String, Object> payload = new HashMap<String,
>> > Object>();*
>> >
>> > *              exchange.setOut(exchange.getIn());*
>> >
>> > *              ArrayList<Map<String,Object>> body =
>> > (ArrayList<Map<String,Object>>)  exchange.getIn().getBody();*
>> >
>> > *              payload.put("whoami", "I am " +
>> > getContext().resolvePropertyPlaceholders("{{whoami}}") );*
>> >
>> > *              body.add(payload);*
>> >
>> > *              exchange.getOut().setBody(body);*
>> >
>> > *              exchange.getOut().setHeader("RequestType", "complete");*
>> >
>> > *              exchange.getOut().setHeader("SQLErrorMessage",
>> "Success");*
>> >
>> > *       }*
>> >
>> >
>> >
>> > *       })*
>> >
>> > *       .onException(SQLException.class)*
>> >
>> > *              .setHeader("MESSAGE_INFO", constant("Sql Connection
>> > Error"))*
>> >
>> > *              .setHeader("ROUTE_STOP", constant(Boolean.TRUE))*
>> >
>> > *              .log(LoggingLevel.ERROR, "${exception.stackTrace}")*
>> >
>> > *              .maximumRedeliveries("0")*
>> >
>> > *              .redeliveryDelay("0")*
>> >
>> > *              .id("_onExceptionConnectionTest")*
>> >
>> > *              .handled(false)*
>> >
>> > *       ;*
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> > Regards-
>> >
>> > Marci Wilken
>> >
>> > Operations Architect
>> >
>> > Office of Information Services
>> >
>> > OHA/DHS/CAF-CW/OR-KIDS
>> >
>> > 503.378.2405
>> >
>> > [image: Description: Description: Description:
>> > cid:image001.png@01CCF13E.2FACEC70][image: OHAsiglogo]
>> >
>> > CONFIDENTIALITY NOTICE
>> >
>> > *This email may contain information that is privileged, confidential, or
>> > otherwise exempt from disclosure under applicable law. If you are not
>> the
>> > addressee or it appears from the context or otherwise that you have
>> > received this email in error, please advise me immediately by reply
>> email,
>> > keep the contents confidential, and immediately delete the message and
>> any
>> > attachments from your system. *
>> >
>> >
>> >
>> >
>> >
>> «  [hide part of quote]
>>
>>
>> --
>> Claus Ibsen
>> -----------------
>> http://davsclaus.com @davsclaus
>> Camel in Action 2: https://www.manning.com/ibsen2
>> « Return to Camel - Users  |  12 views
>> Free forum by Nabble    Disable Popup Ads | Edit this page
>>
>>
Reply | Threaded
Open this post in threaded view
|

R: ProducerTemplate.request() not returning complete Exchange

Giovanni Condello
Hi Marci,

to add on what Alex said, I found it's always better to use either getMessage() or to explicitly check if the exchange hasOut() instead of assuming getIn() is always the right place to look for data coming out of a component

Giovanni

-----Messaggio originale-----
Da: Alex Dettinger <[hidden email]>
Inviato: lunedì 17 febbraio 2020 11:04
A: [hidden email]
Oggetto: Re: ProducerTemplate.request() not returning complete Exchange

Hi Marci,

  I would say that from("jms:Sahara")... has MEP InOUt so In message with header "foo", and OUT message with header "bar".

  So assertThat(“return should be
bar”,reply.get*Out/Message*().getHeader(“test”),equalTo(“bar”)
should be ok.

Alex

On Mon, Feb 17, 2020 at 11:02 AM Alex Dettinger <[hidden email]>
wrote:

> Hi Marci,
>
>   I would say that from("jms:Sahara")... has MEP InOUt so In message
> with header "foo", and OUT mess
>
> On Fri, Feb 14, 2020 at 11:43 PM Marci <[hidden email]> wrote:
>
>>
>>  I understand that the new Processor(){} is for the message being
>> sent to the consumers (from(route)) and this is working fine.
>> What I am not understanding is why the ProducerTemplate.request() is
>> not returning an exchange back at the end of the route.
>> For example.
>> Exchange reply = producerTemplate.request("jms:Sahara", new Processor() {
>>         public void process(final Exchange exchange) {
>>         exchange.getIn().setHeader(“test”,”foo”)
>>         }
>> Route
>>      From(jms:Sahara”)
>>           .log(LoggingLevel.INFO,"* Test ID: ${header.test}" )
>>           .setHeader(“test”,”bar”);
>> end route
>>
>> assertThat(“return should be
>> bar”,reply.getIn().getHeader(“test”),equalTo(“bar”)
>> but it is still “foo”
>> This is failing when  it should pass
>>
>> Maybe I’m reading these wrong
>> From Javadoc.io
>> Exchange        request(Endpoint endpoint, Processor processor)
>> Sends an exchange to an endpoint using a supplied processor Uses an
>> ExchangePattern.InOut message exchange pattern.
>>
>> from the camel apache org website
>> request*() methods
>>
>> The send*() methods use the default Message Exchange Pattern (InOnly,
>> InOut etc) as the endpoint. If you want to explicitly perform a
>> request/response (InOut) you can use the request*() methods instead
>> of the
>> send*() methods.
>>
>> E.g. let’s invoke an endpoint and get the response:
>>
>>
>>
>> >Camel › Camel - Users › Camel - Users kilidarria
>>
>> >Re: ProducerTemplate.request() not returning complete Exchange.
>> >‹ Previous Topic Next Topic ›
>>
>> classic         Classic         list    List    threaded        Threaded
>>  X  Turn off highlighting 1 message Options Options
>>
>> Classic Threaded   Reply   More   Close
>> Jan 15, 2020; 12:34am Claus Ibsen-2 Claus Ibsen-2 Re:
>> ProducerTemplate.request() not returning complete Exchange.
>> Hi
>>
>> The new Processor(){ } in the request method of the producer template
>> is not for the reply message, but for preparing the message to send
>> (eg the input message).
>>
>> On Tue, Jan 14, 2020 at 9:16 PM Wilken Marci J < [hidden email]>
>> wrote:
>>
>> > Based on the documentation I was reading the
>> > ProducerTemplate.request(<URL>, Processor) should return the  
>> > Exchange information from the route that was called.  When I am
>> > running my test
>> on
>> > this it is returning the body information in the
>> > exchange.getOut().getBody() but the exchange.getOut().getHeaders()
>> > has
>> the
>> > headers that I sent and not returned headers that I see if I look
>> > at the mock endpoint.  I am expecting it to return the header and
>> > body data in
>> the
>> > Exchange.getIn().  Since this test is calling a jms message Queue
>> (JBoss
>> > EAP 7.2 Fuse 7.3) I am expecting this to return the same thing as
>> > the a call from another route would see.  Am I understanding this
>> > wrong or am
>> I
>> > missing a setting?
>> >
>> >
>> >
>> > *public void testConnection() throws Exception {*
>> >
>> > *       log.info <http://log.info>("----->>  Begin testConnection
>> > <<-----");*
>> >
>> > *       Assert.assertNotNull("Expected QueueRoute-context to exist",
>> > camelctx);*
>> >
>> > *       Assert.assertEquals(ServiceStatus.Started,
>> camelctx.getStatus());*
>> >
>> >
>> >
>> > *       MockEndpoint mock = camelctx.getEndpoint("mock:JobLogResult",
>> > MockEndpoint.class);*
>> >
>> > *       assertNotNull("Expected mock:JobLogResult to exist", mock);*
>> >
>> > *       mock.reset();*
>> >
>> > *       mock.expectedMessageCount(1);*
>> >
>> >
>> >
>> > *       ProducerTemplate producerTemplate =
>> > camelctx.createProducerTemplate();*
>> >
>> > *       Assert.assertNotNull("Expected producerTemplate to exist",
>> > producerTemplate);*
>> >
>> >
>> >
>> > *       Exchange reply =
>> > producerTemplate.request("jms:queue:Sahara.Core.JobIdRequest", new
>> > Processor() {*
>> >
>> >
>> >
>> > *              public void process(final Exchange exchange) {*
>> >
>> >
>> >
>> > *                     Map<String, Object> headers = new HashMap<String,
>> > Object>();*
>> >
>> > *                     headers.put("RequestingSource",
>> requestingSource);*
>> >
>> > *                     headers.put("AuthorizationKey",
>> authorizationKey);*
>> >
>> > *                     headers.put("RequestType", "testConnection");*
>> >
>> >
>> >
>> > *                     Map<String, Object> payload = new HashMap<String,
>> > Object>();*
>> >
>> >
>> >
>> > *                     exchange.getIn().setHeaders(headers);*
>> >
>> > *                     exchange.getIn().setBody(payload);*
>> >
>> > *                     exchange.setPattern(ExchangePattern.InOut);*
>> >
>> >
>> >
>> > *              }*
>> >
>> > *       });*
>> >
>> > *       // these fail*
>> >
>> > *       assertThat("Request was un-successful", (
>> > reply.getIn().getHeader("SQLErrorMessage") == null ? "null"
>> > :((String)reply.getIn().getHeader("SQLErrorMessage"))).toLowerCase(
>> > )
>> > ,equalTo("success"));*
>> >
>> > *       assertThat("Didn't Return Data ", (int)
>> > reply.getIn().getHeader("CamelSqlRowCount"),equalTo(1));*
>> >
>> >
>> >
>> >
>> >
>> > *       // these are successfull.*
>> >
>> > *       List<Exchange> list = mock.getReceivedExchanges();*
>> >
>> > *       Exchange reply2 = list.get(0);*
>> >
>> > *       assertThat("Request was un-successful", (
>> > reply2.getIn().getHeader("SQLErrorMessage") == null ? "null"
>> > :((String)reply2.getIn().getHeader("SQLErrorMessage"))).toLowerCase
>> > ()
>> > ,equalTo("success"));*
>> >
>> > *       assertThat("Didn't Return Data ", (int)
>> > reply2.getIn().getHeader("CamelSqlRowCount"),equalTo(1));*
>> >
>> > *}*
>> >
>> >
>> >
>> > *Route*
>> >
>> > *from("jms:queue:Sahara.Core.JobIdRequest?exchangePattern=InOut")*
>> >
>> > *      .routeId("JobLogRouter")*
>> >
>> > *
>> >
>> .log(LoggingLevel.INFO,"*********************************************
>> ****")
>> > *
>> >
>> > *       .log(LoggingLevel.INFO,"* Beginning JobAuditingRequest")*
>> >
>> > *       .log(LoggingLevel.INFO,"* Requesting Source:
>> > ${header.RequestingSource} " )*
>> >
>> > *       .log(LoggingLevel.INFO,"* Request Type: ${header.RequestType} "
>> )*
>> >
>> > *
>> >
>> .log(LoggingLevel.INFO,"*********************************************
>> ****")
>> > *
>> >
>> > *       .choice()    *
>> >
>> > *              .when(simple ("${header.RequestType} == \"complete\""))*
>> >
>> > *                     .to("direct:CompleteJob")*
>> >
>> > *                     .endChoice()*
>> >
>> > *
>> > .when().simple("${header.RequestType?.equalsIgnoreCase(\"audit\")}"
>> > )*
>> >
>> > *                     .to("direct:LogAudit")*
>> >
>> > *                     .endChoice()*
>> >
>> > *              .when(simple ("${header.RequestType} == \"error\""))*
>> >
>> > *                     .to("direct:LogError")*
>> >
>> > *                     .endChoice()*
>> >
>> > *
>> > .when().simple("${header.RequestType?.equalsIgnoreCase(\"whoami\")}
>> > ")*
>> >
>> > *                 .to("direct:WhoAmI")*
>> >
>> > *                 .endChoice()*
>> >
>> > *
>> >
>> .when().simple("${header.RequestType?.equalsIgnoreCase(\"testConnecti
>> on\")}")*
>> >
>> > *                 .to("direct:ConnectionTest")*
>> >
>> > *                 .endChoice()*
>> >
>> > *
>> > .when().simple("${header.RequestType?.equalsIgnoreCase(\"start\")}
>> > ||
>> > ${header.RequestType?.equalsIgnoreCase(\"open\")}")*
>> >
>> > *                 .to("direct:InitialJobRequest")*
>> >
>> > *                 .endChoice()   *
>> >
>> > *              .otherwise()*
>> >
>> > *                     .process(new Processor() {*
>> >
>> > *                           public void process(Exchange exchange)
>> throws
>> > Exception {*
>> >
>> > *                       //     Message in = exchange.getIn();*
>> >
>> > *                                  Map<String, Object> payload = new
>> > HashMap<String, Object>();*
>> >
>> > *                                  exchange.setOut(exchange.getIn());*
>> >
>> > *                                  payload = (Map<String,Object>)
>> > exchange.getIn().getBody();*
>> >
>> > *                                  payload.put("whoami", "I am " +
>> > getContext().resolvePropertyPlaceholders("{{whoami}}") );*
>> >
>> > *                                  payload.put("SQLErrorMessage", "No
>> > Choice Provided");*
>> >
>> > *                                  payload.put("SQLErrorNumber", -1);*
>> >
>> > *                                  exchange.getOut().setBody(payload);*
>> >
>> > *
>> > exchange.getOut().setHeader("RequestType", "failed");*
>> >
>> >
>> >
>> > *                                  }*
>> >
>> >
>> >
>> > *                     })*
>> >
>> > *                     .endChoice()*
>> >
>> > *       .end()*
>> >
>> > *       .process(new Processor() {*
>> >
>> > *              public void process(Exchange exchange) throws Exception
>> {*
>> >
>> >
>> >
>> > *                     exchange.setOut(exchange.getIn());*
>> >
>> > *
>> >
>> exchange.getOut().setHeader("BodyType",exchange.getIn().getBody().get
>> Class().getName());*
>> >
>> > *                     exchange.getOut().setHeader("BodyEncoding",
>> "JSON");*
>> >
>> > *                     String payloadJson = new
>> > ObjectMapper().writeValueAsString(exchange.getIn().getBody());*
>> >
>> > *                     log.debug(payloadJson);*
>> >
>> > *                     exchange.getOut().setBody(payloadJson);*
>> >
>> >
>> >
>> > *              }*
>> >
>> >
>> >
>> > *       })*
>> >
>> > *       .log(LoggingLevel.INFO, "Request results Job ID Request : \r\n
>> > Header:  ${headers} \r\n Body:  ${body} ")*
>> >
>> >
>> >
>> > *       .to("mock:JobLogResult")*
>> >
>> > *          .onException(Exception.class)*
>> >
>> > *              .setHeader("MESSAGE_INFO", constant("Failure to Process
>> > Audit Job"))*
>> >
>> > *              .setHeader("ROUTE_STOP", constant(Boolean.TRUE))*
>> >
>> > *              .log(LoggingLevel.ERROR, "${exception.stackTrace}")*
>> >
>> > *              .maximumRedeliveries("0")*
>> >
>> > *              .redeliveryDelay("0")*
>> >
>> > *              .id("_onExceptionCron")*
>> >
>> > *              .handled(true)*
>> >
>> > *              ;*
>> >
>> > *       ;*
>> >
>> >
>> >
>> >
>> >
>> > * from("direct:ConnectionTest")*
>> >
>> > *       .routeId("ConnectionTest")*
>> >
>> > *       .log(LoggingLevel.INFO, "Beginning Connection Test")*
>> >
>> > *       .log(LoggingLevel.INFO,"whoami ->> {{whoami}}")*
>> >
>> > *       .log(LoggingLevel.INFO, "Request: \r\n Header:  ${headers} \r\n
>> > Body:  ${body} ")*
>> >
>> > *       .to("sql:select name from sys.sysusers where uid =
>> > 0?dataSource=#Sahara_DS")*
>> >
>> > *       .log(LoggingLevel.INFO, "Request Result: \r\n Header:
>> ${headers}
>> > \r\n Body:  ${body} ")*
>> >
>> >
>> >
>> > *       .process(new Processor() {*
>> >
>> > *              public void process(Exchange exchange) throws Exception
>> {*
>> >
>> > *              Map<String, Object> payload = new HashMap<String,
>> > Object>();*
>> >
>> > *              exchange.setOut(exchange.getIn());*
>> >
>> > *              ArrayList<Map<String,Object>> body =
>> > (ArrayList<Map<String,Object>>)  exchange.getIn().getBody();*
>> >
>> > *              payload.put("whoami", "I am " +
>> > getContext().resolvePropertyPlaceholders("{{whoami}}") );*
>> >
>> > *              body.add(payload);*
>> >
>> > *              exchange.getOut().setBody(body);*
>> >
>> > *              exchange.getOut().setHeader("RequestType", "complete");*
>> >
>> > *              exchange.getOut().setHeader("SQLErrorMessage",
>> "Success");*
>> >
>> > *       }*
>> >
>> >
>> >
>> > *       })*
>> >
>> > *       .onException(SQLException.class)*
>> >
>> > *              .setHeader("MESSAGE_INFO", constant("Sql Connection
>> > Error"))*
>> >
>> > *              .setHeader("ROUTE_STOP", constant(Boolean.TRUE))*
>> >
>> > *              .log(LoggingLevel.ERROR, "${exception.stackTrace}")*
>> >
>> > *              .maximumRedeliveries("0")*
>> >
>> > *              .redeliveryDelay("0")*
>> >
>> > *              .id("_onExceptionConnectionTest")*
>> >
>> > *              .handled(false)*
>> >
>> > *       ;*
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> >
>> > Regards-
>> >
>> > Marci Wilken
>> >
>> > Operations Architect
>> >
>> > Office of Information Services
>> >
>> > OHA/DHS/CAF-CW/OR-KIDS
>> >
>> > 503.378.2405
>> >
>> > [image: Description: Description: Description:
>> > cid:image001.png@01CCF13E.2FACEC70][image: OHAsiglogo]
>> >
>> > CONFIDENTIALITY NOTICE
>> >
>> > *This email may contain information that is privileged,
>> > confidential, or otherwise exempt from disclosure under applicable
>> > law. If you are not
>> the
>> > addressee or it appears from the context or otherwise that you have
>> > received this email in error, please advise me immediately by reply
>> email,
>> > keep the contents confidential, and immediately delete the message
>> > and
>> any
>> > attachments from your system. *
>> >
>> >
>> >
>> >
>> >
>> «  [hide part of quote]
>>
>>
>> --
>> Claus Ibsen
>> -----------------
>> http://davsclaus.com @davsclaus
>> Camel in Action 2: https://www.manning.com/ibsen2 « Return to Camel -
>> Users  |  12 views
>> Free forum by Nabble    Disable Popup Ads | Edit this page
>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: ProducerTemplate.request() not returning complete Exchange

Alex Dettinger
+1 for getMessage() :)

On Mon, Feb 17, 2020 at 11:18 AM Giovanni Condello <
[hidden email]> wrote:

> Hi Marci,
>
> to add on what Alex said, I found it's always better to use either
> getMessage() or to explicitly check if the exchange hasOut() instead of
> assuming getIn() is always the right place to look for data coming out of a
> component
>
> Giovanni
>
> -----Messaggio originale-----
> Da: Alex Dettinger <[hidden email]>
> Inviato: lunedì 17 febbraio 2020 11:04
> A: [hidden email]
> Oggetto: Re: ProducerTemplate.request() not returning complete Exchange
>
> Hi Marci,
>
>   I would say that from("jms:Sahara")... has MEP InOUt so In message with
> header "foo", and OUT message with header "bar".
>
>   So assertThat(“return should be
> bar”,reply.get*Out/Message*().getHeader(“test”),equalTo(“bar”)
> should be ok.
>
> Alex
>
> On Mon, Feb 17, 2020 at 11:02 AM Alex Dettinger <[hidden email]>
> wrote:
>
> > Hi Marci,
> >
> >   I would say that from("jms:Sahara")... has MEP InOUt so In message
> > with header "foo", and OUT mess
> >
> > On Fri, Feb 14, 2020 at 11:43 PM Marci <[hidden email]> wrote:
> >
> >>
> >>  I understand that the new Processor(){} is for the message being
> >> sent to the consumers (from(route)) and this is working fine.
> >> What I am not understanding is why the ProducerTemplate.request() is
> >> not returning an exchange back at the end of the route.
> >> For example.
> >> Exchange reply = producerTemplate.request("jms:Sahara", new Processor()
> {
> >>         public void process(final Exchange exchange) {
> >>         exchange.getIn().setHeader(“test”,”foo”)
> >>         }
> >> Route
> >>      From(jms:Sahara”)
> >>           .log(LoggingLevel.INFO,"* Test ID: ${header.test}" )
> >>           .setHeader(“test”,”bar”);
> >> end route
> >>
> >> assertThat(“return should be
> >> bar”,reply.getIn().getHeader(“test”),equalTo(“bar”)
> >> but it is still “foo”
> >> This is failing when  it should pass
> >>
> >> Maybe I’m reading these wrong
> >> From Javadoc.io
> >> Exchange        request(Endpoint endpoint, Processor processor)
> >> Sends an exchange to an endpoint using a supplied processor Uses an
> >> ExchangePattern.InOut message exchange pattern.
> >>
> >> from the camel apache org website
> >> request*() methods
> >>
> >> The send*() methods use the default Message Exchange Pattern (InOnly,
> >> InOut etc) as the endpoint. If you want to explicitly perform a
> >> request/response (InOut) you can use the request*() methods instead
> >> of the
> >> send*() methods.
> >>
> >> E.g. let’s invoke an endpoint and get the response:
> >>
> >>
> >>
> >> >Camel › Camel - Users › Camel - Users kilidarria
> >>
> >> >Re: ProducerTemplate.request() not returning complete Exchange.
> >> >‹ Previous Topic Next Topic ›
> >>
> >> classic         Classic         list    List    threaded        Threaded
> >>  X  Turn off highlighting 1 message Options Options
> >>
> >> Classic Threaded   Reply   More   Close
> >> Jan 15, 2020; 12:34am Claus Ibsen-2 Claus Ibsen-2 Re:
> >> ProducerTemplate.request() not returning complete Exchange.
> >> Hi
> >>
> >> The new Processor(){ } in the request method of the producer template
> >> is not for the reply message, but for preparing the message to send
> >> (eg the input message).
> >>
> >> On Tue, Jan 14, 2020 at 9:16 PM Wilken Marci J < [hidden email]>
> >> wrote:
> >>
> >> > Based on the documentation I was reading the
> >> > ProducerTemplate.request(<URL>, Processor) should return the
> >> > Exchange information from the route that was called.  When I am
> >> > running my test
> >> on
> >> > this it is returning the body information in the
> >> > exchange.getOut().getBody() but the exchange.getOut().getHeaders()
> >> > has
> >> the
> >> > headers that I sent and not returned headers that I see if I look
> >> > at the mock endpoint.  I am expecting it to return the header and
> >> > body data in
> >> the
> >> > Exchange.getIn().  Since this test is calling a jms message Queue
> >> (JBoss
> >> > EAP 7.2 Fuse 7.3) I am expecting this to return the same thing as
> >> > the a call from another route would see.  Am I understanding this
> >> > wrong or am
> >> I
> >> > missing a setting?
> >> >
> >> >
> >> >
> >> > *public void testConnection() throws Exception {*
> >> >
> >> > *       log.info <http://log.info>("----->>  Begin testConnection
> >> > <<-----");*
> >> >
> >> > *       Assert.assertNotNull("Expected QueueRoute-context to exist",
> >> > camelctx);*
> >> >
> >> > *       Assert.assertEquals(ServiceStatus.Started,
> >> camelctx.getStatus());*
> >> >
> >> >
> >> >
> >> > *       MockEndpoint mock = camelctx.getEndpoint("mock:JobLogResult",
> >> > MockEndpoint.class);*
> >> >
> >> > *       assertNotNull("Expected mock:JobLogResult to exist", mock);*
> >> >
> >> > *       mock.reset();*
> >> >
> >> > *       mock.expectedMessageCount(1);*
> >> >
> >> >
> >> >
> >> > *       ProducerTemplate producerTemplate =
> >> > camelctx.createProducerTemplate();*
> >> >
> >> > *       Assert.assertNotNull("Expected producerTemplate to exist",
> >> > producerTemplate);*
> >> >
> >> >
> >> >
> >> > *       Exchange reply =
> >> > producerTemplate.request("jms:queue:Sahara.Core.JobIdRequest", new
> >> > Processor() {*
> >> >
> >> >
> >> >
> >> > *              public void process(final Exchange exchange) {*
> >> >
> >> >
> >> >
> >> > *                     Map<String, Object> headers = new
> HashMap<String,
> >> > Object>();*
> >> >
> >> > *                     headers.put("RequestingSource",
> >> requestingSource);*
> >> >
> >> > *                     headers.put("AuthorizationKey",
> >> authorizationKey);*
> >> >
> >> > *                     headers.put("RequestType", "testConnection");*
> >> >
> >> >
> >> >
> >> > *                     Map<String, Object> payload = new
> HashMap<String,
> >> > Object>();*
> >> >
> >> >
> >> >
> >> > *                     exchange.getIn().setHeaders(headers);*
> >> >
> >> > *                     exchange.getIn().setBody(payload);*
> >> >
> >> > *                     exchange.setPattern(ExchangePattern.InOut);*
> >> >
> >> >
> >> >
> >> > *              }*
> >> >
> >> > *       });*
> >> >
> >> > *       // these fail*
> >> >
> >> > *       assertThat("Request was un-successful", (
> >> > reply.getIn().getHeader("SQLErrorMessage") == null ? "null"
> >> > :((String)reply.getIn().getHeader("SQLErrorMessage"))).toLowerCase(
> >> > )
> >> > ,equalTo("success"));*
> >> >
> >> > *       assertThat("Didn't Return Data ", (int)
> >> > reply.getIn().getHeader("CamelSqlRowCount"),equalTo(1));*
> >> >
> >> >
> >> >
> >> >
> >> >
> >> > *       // these are successfull.*
> >> >
> >> > *       List<Exchange> list = mock.getReceivedExchanges();*
> >> >
> >> > *       Exchange reply2 = list.get(0);*
> >> >
> >> > *       assertThat("Request was un-successful", (
> >> > reply2.getIn().getHeader("SQLErrorMessage") == null ? "null"
> >> > :((String)reply2.getIn().getHeader("SQLErrorMessage"))).toLowerCase
> >> > ()
> >> > ,equalTo("success"));*
> >> >
> >> > *       assertThat("Didn't Return Data ", (int)
> >> > reply2.getIn().getHeader("CamelSqlRowCount"),equalTo(1));*
> >> >
> >> > *}*
> >> >
> >> >
> >> >
> >> > *Route*
> >> >
> >> > *from("jms:queue:Sahara.Core.JobIdRequest?exchangePattern=InOut")*
> >> >
> >> > *      .routeId("JobLogRouter")*
> >> >
> >> > *
> >> >
> >> .log(LoggingLevel.INFO,"*********************************************
> >> ****")
> >> > *
> >> >
> >> > *       .log(LoggingLevel.INFO,"* Beginning JobAuditingRequest")*
> >> >
> >> > *       .log(LoggingLevel.INFO,"* Requesting Source:
> >> > ${header.RequestingSource} " )*
> >> >
> >> > *       .log(LoggingLevel.INFO,"* Request Type: ${header.RequestType}
> "
> >> )*
> >> >
> >> > *
> >> >
> >> .log(LoggingLevel.INFO,"*********************************************
> >> ****")
> >> > *
> >> >
> >> > *       .choice()    *
> >> >
> >> > *              .when(simple ("${header.RequestType} ==
> \"complete\""))*
> >> >
> >> > *                     .to("direct:CompleteJob")*
> >> >
> >> > *                     .endChoice()*
> >> >
> >> > *
> >> > .when().simple("${header.RequestType?.equalsIgnoreCase(\"audit\")}"
> >> > )*
> >> >
> >> > *                     .to("direct:LogAudit")*
> >> >
> >> > *                     .endChoice()*
> >> >
> >> > *              .when(simple ("${header.RequestType} == \"error\""))*
> >> >
> >> > *                     .to("direct:LogError")*
> >> >
> >> > *                     .endChoice()*
> >> >
> >> > *
> >> > .when().simple("${header.RequestType?.equalsIgnoreCase(\"whoami\")}
> >> > ")*
> >> >
> >> > *                 .to("direct:WhoAmI")*
> >> >
> >> > *                 .endChoice()*
> >> >
> >> > *
> >> >
> >> .when().simple("${header.RequestType?.equalsIgnoreCase(\"testConnecti
> >> on\")}")*
> >> >
> >> > *                 .to("direct:ConnectionTest")*
> >> >
> >> > *                 .endChoice()*
> >> >
> >> > *
> >> > .when().simple("${header.RequestType?.equalsIgnoreCase(\"start\")}
> >> > ||
> >> > ${header.RequestType?.equalsIgnoreCase(\"open\")}")*
> >> >
> >> > *                 .to("direct:InitialJobRequest")*
> >> >
> >> > *                 .endChoice()   *
> >> >
> >> > *              .otherwise()*
> >> >
> >> > *                     .process(new Processor() {*
> >> >
> >> > *                           public void process(Exchange exchange)
> >> throws
> >> > Exception {*
> >> >
> >> > *                       //     Message in = exchange.getIn();*
> >> >
> >> > *                                  Map<String, Object> payload = new
> >> > HashMap<String, Object>();*
> >> >
> >> > *                                  exchange.setOut(exchange.getIn());*
> >> >
> >> > *                                  payload = (Map<String,Object>)
> >> > exchange.getIn().getBody();*
> >> >
> >> > *                                  payload.put("whoami", "I am " +
> >> > getContext().resolvePropertyPlaceholders("{{whoami}}") );*
> >> >
> >> > *                                  payload.put("SQLErrorMessage", "No
> >> > Choice Provided");*
> >> >
> >> > *                                  payload.put("SQLErrorNumber", -1);*
> >> >
> >> > *
> exchange.getOut().setBody(payload);*
> >> >
> >> > *
> >> > exchange.getOut().setHeader("RequestType", "failed");*
> >> >
> >> >
> >> >
> >> > *                                  }*
> >> >
> >> >
> >> >
> >> > *                     })*
> >> >
> >> > *                     .endChoice()*
> >> >
> >> > *       .end()*
> >> >
> >> > *       .process(new Processor() {*
> >> >
> >> > *              public void process(Exchange exchange) throws Exception
> >> {*
> >> >
> >> >
> >> >
> >> > *                     exchange.setOut(exchange.getIn());*
> >> >
> >> > *
> >> >
> >> exchange.getOut().setHeader("BodyType",exchange.getIn().getBody().get
> >> Class().getName());*
> >> >
> >> > *                     exchange.getOut().setHeader("BodyEncoding",
> >> "JSON");*
> >> >
> >> > *                     String payloadJson = new
> >> > ObjectMapper().writeValueAsString(exchange.getIn().getBody());*
> >> >
> >> > *                     log.debug(payloadJson);*
> >> >
> >> > *                     exchange.getOut().setBody(payloadJson);*
> >> >
> >> >
> >> >
> >> > *              }*
> >> >
> >> >
> >> >
> >> > *       })*
> >> >
> >> > *       .log(LoggingLevel.INFO, "Request results Job ID Request : \r\n
> >> > Header:  ${headers} \r\n Body:  ${body} ")*
> >> >
> >> >
> >> >
> >> > *       .to("mock:JobLogResult")*
> >> >
> >> > *          .onException(Exception.class)*
> >> >
> >> > *              .setHeader("MESSAGE_INFO", constant("Failure to Process
> >> > Audit Job"))*
> >> >
> >> > *              .setHeader("ROUTE_STOP", constant(Boolean.TRUE))*
> >> >
> >> > *              .log(LoggingLevel.ERROR, "${exception.stackTrace}")*
> >> >
> >> > *              .maximumRedeliveries("0")*
> >> >
> >> > *              .redeliveryDelay("0")*
> >> >
> >> > *              .id("_onExceptionCron")*
> >> >
> >> > *              .handled(true)*
> >> >
> >> > *              ;*
> >> >
> >> > *       ;*
> >> >
> >> >
> >> >
> >> >
> >> >
> >> > * from("direct:ConnectionTest")*
> >> >
> >> > *       .routeId("ConnectionTest")*
> >> >
> >> > *       .log(LoggingLevel.INFO, "Beginning Connection Test")*
> >> >
> >> > *       .log(LoggingLevel.INFO,"whoami ->> {{whoami}}")*
> >> >
> >> > *       .log(LoggingLevel.INFO, "Request: \r\n Header:  ${headers}
> \r\n
> >> > Body:  ${body} ")*
> >> >
> >> > *       .to("sql:select name from sys.sysusers where uid =
> >> > 0?dataSource=#Sahara_DS")*
> >> >
> >> > *       .log(LoggingLevel.INFO, "Request Result: \r\n Header:
> >> ${headers}
> >> > \r\n Body:  ${body} ")*
> >> >
> >> >
> >> >
> >> > *       .process(new Processor() {*
> >> >
> >> > *              public void process(Exchange exchange) throws Exception
> >> {*
> >> >
> >> > *              Map<String, Object> payload = new HashMap<String,
> >> > Object>();*
> >> >
> >> > *              exchange.setOut(exchange.getIn());*
> >> >
> >> > *              ArrayList<Map<String,Object>> body =
> >> > (ArrayList<Map<String,Object>>)  exchange.getIn().getBody();*
> >> >
> >> > *              payload.put("whoami", "I am " +
> >> > getContext().resolvePropertyPlaceholders("{{whoami}}") );*
> >> >
> >> > *              body.add(payload);*
> >> >
> >> > *              exchange.getOut().setBody(body);*
> >> >
> >> > *              exchange.getOut().setHeader("RequestType",
> "complete");*
> >> >
> >> > *              exchange.getOut().setHeader("SQLErrorMessage",
> >> "Success");*
> >> >
> >> > *       }*
> >> >
> >> >
> >> >
> >> > *       })*
> >> >
> >> > *       .onException(SQLException.class)*
> >> >
> >> > *              .setHeader("MESSAGE_INFO", constant("Sql Connection
> >> > Error"))*
> >> >
> >> > *              .setHeader("ROUTE_STOP", constant(Boolean.TRUE))*
> >> >
> >> > *              .log(LoggingLevel.ERROR, "${exception.stackTrace}")*
> >> >
> >> > *              .maximumRedeliveries("0")*
> >> >
> >> > *              .redeliveryDelay("0")*
> >> >
> >> > *              .id("_onExceptionConnectionTest")*
> >> >
> >> > *              .handled(false)*
> >> >
> >> > *       ;*
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> >
> >> > Regards-
> >> >
> >> > Marci Wilken
> >> >
> >> > Operations Architect
> >> >
> >> > Office of Information Services
> >> >
> >> > OHA/DHS/CAF-CW/OR-KIDS
> >> >
> >> > 503.378.2405
> >> >
> >> > [image: Description: Description: Description:
> >> > cid:image001.png@01CCF13E.2FACEC70][image: OHAsiglogo]
> >> >
> >> > CONFIDENTIALITY NOTICE
> >> >
> >> > *This email may contain information that is privileged,
> >> > confidential, or otherwise exempt from disclosure under applicable
> >> > law. If you are not
> >> the
> >> > addressee or it appears from the context or otherwise that you have
> >> > received this email in error, please advise me immediately by reply
> >> email,
> >> > keep the contents confidential, and immediately delete the message
> >> > and
> >> any
> >> > attachments from your system. *
> >> >
> >> >
> >> >
> >> >
> >> >
> >> «  [hide part of quote]
> >>
> >>
> >> --
> >> Claus Ibsen
> >> -----------------
> >> http://davsclaus.com @davsclaus
> >> Camel in Action 2: https://www.manning.com/ibsen2 « Return to Camel -
> >> Users  |  12 views
> >> Free forum by Nabble    Disable Popup Ads | Edit this page
> >>
> >>
>
Reply | Threaded
Open this post in threaded view
|

Re: ProducerTemplate.request() not returning complete Exchange

kilidarria

The problem or my mis-understanding is in the producertemplate in the
Junit tests

In the actual code when calling from one route to the other

to(jms:Sahara) header.test = foo

from(jms:Sahara)...  with MEP inout

/* do some cool stuff */

Back to the previous  returns the correct message/exchange header.test =
bar

what is not happening from the junit test is Exchange reply
=producerTemplate.send(new Processor etc )

       @Test
           @InSequence(15)
           public void testProcessingRun() throws Exception {
                 log.info("\r\n\r\n----->>  Begin testProcessingRun()
<<-----");
                  jobID = -1;
                 ArrayList<Map<String,Object>> rtnArray = null;

                   MockEndpoint mock =
camelctx.getEndpoint("mock:SaharaCoreTestResult", MockEndpoint.class);
                   assertNotNull("Expected mock:SaharaCoreTestResult to
exist", mock);
                   mock.reset();
                   mock.expectedMessageCount(1);

                 // this is the problem
                   ProducerTemplate producerTemplate =
camelctx.createProducerTemplate();

                   Map<String, Object> headers = new HashMap<String,
Object>();


                   Exchange exchange=
producerTemplate.send("jms:sahara", new Processor() {

                       public void process(final Exchange exchange) {

                         headers.put("test", "foo");
                         Map<String, Object> payload = new
HashMap<String, Object>();
                         exchange.getIn().setHeaders(headers);
                       exchange.getIn().setBody(payload);
                       exchange.setPattern(ExchangePattern.InOut);

                       }
                   });

                 assertThat("header is not bar",
reply.getIn().getHeader("test"), equalTo("bar")) <--- this fails
                 assertThat("header is not bar",
reply.getOut().getHeader("test"), equalTo("bar")) <--- this fails

                 List<Exchange> list = mock.getReceivedExchanges();
                 assertThat("Expected exhange to return",
list.size(),greaterThan(0));
                 Exchange mockReply = list.get(0);
                 assertThat("header is not bar", mockReply
.getIn().getHeader("test"), equalTo("bar")) <--- this works

                 MockEndpoint.assertIsSatisfied(5L, TimeUnit.SECONDS, mock);

               log.info("----->>  testProcessingRun Complete <<-----");
           }




> +1 for getMessage() :)
>
> On Mon, Feb 17, 2020 at 11:18 AM Giovanni Condello <
> [hidden email]> wrote:
>
>> Hi Marci,
>>
>> to add on what Alex said, I found it's always better to use either
>> getMessage() or to explicitly check if the exchange hasOut() instead of
>> assuming getIn() is always the right place to look for data coming out of a
>> component
>>
>> Giovanni
>>
>> -----Messaggio originale-----
>> Da: Alex Dettinger <[hidden email]>
>> Inviato: lunedì 17 febbraio 2020 11:04
>> A: [hidden email]
>> Oggetto: Re: ProducerTemplate.request() not returning complete Exchange
>>
>> Hi Marci,
>>
>>    I would say that from("jms:Sahara")... has MEP InOUt so In message with
>> header "foo", and OUT message with header "bar".
>>
>>    So assertThat(“return should be
>> bar”,reply.get*Out/Message*().getHeader(“test”),equalTo(“bar”)
>> should be ok.
>>
>> Alex
>>
>> On Mon, Feb 17, 2020 at 11:02 AM Alex Dettinger <[hidden email]>
>> wrote:
>>
>>> Hi Marci,
>>>
>>>    I would say that from("jms:Sahara")... has MEP InOUt so In message
>>> with header "foo", and OUT mess
>>>
>>> On Fri, Feb 14, 2020 at 11:43 PM Marci <[hidden email]> wrote:
>>>
>>>>   I understand that the new Processor(){} is for the message being
>>>> sent to the consumers (from(route)) and this is working fine.
>>>> What I am not understanding is why the ProducerTemplate.request() is
>>>> not returning an exchange back at the end of the route.
>>>> For example.
>>>> Exchange reply = producerTemplate.request("jms:Sahara", new Processor()
>> {
>>>>          public void process(final Exchange exchange) {
>>>>          exchange.getIn().setHeader(“test”,”foo”)
>>>>          }
>>>> Route
>>>>       From(jms:Sahara”)
>>>>            .log(LoggingLevel.INFO,"* Test ID: ${header.test}" )
>>>>            .setHeader(“test”,”bar”);
>>>> end route
>>>>
>>>> assertThat(“return should be
>>>> bar”,reply.getIn().getHeader(“test”),equalTo(“bar”)
>>>> but it is still “foo”
>>>> This is failing when  it should pass
>>>>
>>>> Maybe I’m reading these wrong
>>>>  From Javadoc.io
>>>> Exchange        request(Endpoint endpoint, Processor processor)
>>>> Sends an exchange to an endpoint using a supplied processor Uses an
>>>> ExchangePattern.InOut message exchange pattern.
>>>>
>>>> from the camel apache org website
>>>> request*() methods
>>>>
>>>> The send*() methods use the default Message Exchange Pattern (InOnly,
>>>> InOut etc) as the endpoint. If you want to explicitly perform a
>>>> request/response (InOut) you can use the request*() methods instead
>>>> of the
>>>> send*() methods.
>>>>
>>>> E.g. let’s invoke an endpoint and get the response:
>>>>
>>>>
>>>>
>>>>> Camel › Camel - Users › Camel - Users kilidarria
>>>>> Re: ProducerTemplate.request() not returning complete Exchange.
>>>>> ‹ Previous Topic Next Topic ›
>>>> classic         Classic         list    List    threaded        Threaded
>>>>   X  Turn off highlighting 1 message Options Options
>>>>
>>>> Classic Threaded   Reply   More   Close
>>>> Jan 15, 2020; 12:34am Claus Ibsen-2 Claus Ibsen-2 Re:
>>>> ProducerTemplate.request() not returning complete Exchange.
>>>> Hi
>>>>
>>>> The new Processor(){ } in the request method of the producer template
>>>> is not for the reply message, but for preparing the message to send
>>>> (eg the input message).
>>>>
>>>> On Tue, Jan 14, 2020 at 9:16 PM Wilken Marci J < [hidden email]>
>>>> wrote:
>>>>
>>>>> Based on the documentation I was reading the
>>>>> ProducerTemplate.request(<URL>, Processor) should return the
>>>>> Exchange information from the route that was called.  When I am
>>>>> running my test
>>>> on
>>>>> this it is returning the body information in the
>>>>> exchange.getOut().getBody() but the exchange.getOut().getHeaders()
>>>>> has
>>>> the
>>>>> headers that I sent and not returned headers that I see if I look
>>>>> at the mock endpoint.  I am expecting it to return the header and
>>>>> body data in
>>>> the
>>>>> Exchange.getIn().  Since this test is calling a jms message Queue
>>>> (JBoss
>>>>> EAP 7.2 Fuse 7.3) I am expecting this to return the same thing as
>>>>> the a call from another route would see.  Am I understanding this
>>>>> wrong or am
>>>> I
>>>>> missing a setting?
>>>>>
>>>>>
>>>>>
>>>>> *public void testConnection() throws Exception {*
>>>>>
>>>>> *       log.info <http://log.info>("----->>  Begin testConnection
>>>>> <<-----");*
>>>>>
>>>>> *       Assert.assertNotNull("Expected QueueRoute-context to exist",
>>>>> camelctx);*
>>>>>
>>>>> *       Assert.assertEquals(ServiceStatus.Started,
>>>> camelctx.getStatus());*
>>>>>
>>>>>
>>>>> *       MockEndpoint mock = camelctx.getEndpoint("mock:JobLogResult",
>>>>> MockEndpoint.class);*
>>>>>
>>>>> *       assertNotNull("Expected mock:JobLogResult to exist", mock);*
>>>>>
>>>>> *       mock.reset();*
>>>>>
>>>>> *       mock.expectedMessageCount(1);*
>>>>>
>>>>>
>>>>>
>>>>> *       ProducerTemplate producerTemplate =
>>>>> camelctx.createProducerTemplate();*
>>>>>
>>>>> *       Assert.assertNotNull("Expected producerTemplate to exist",
>>>>> producerTemplate);*
>>>>>
>>>>>
>>>>>
>>>>> *       Exchange reply =
>>>>> producerTemplate.request("jms:queue:Sahara.Core.JobIdRequest", new
>>>>> Processor() {*
>>>>>
>>>>>
>>>>>
>>>>> *              public void process(final Exchange exchange) {*
>>>>>
>>>>>
>>>>>
>>>>> *                     Map<String, Object> headers = new
>> HashMap<String,
>>>>> Object>();*
>>>>>
>>>>> *                     headers.put("RequestingSource",
>>>> requestingSource);*
>>>>> *                     headers.put("AuthorizationKey",
>>>> authorizationKey);*
>>>>> *                     headers.put("RequestType", "testConnection");*
>>>>>
>>>>>
>>>>>
>>>>> *                     Map<String, Object> payload = new
>> HashMap<String,
>>>>> Object>();*
>>>>>
>>>>>
>>>>>
>>>>> *                     exchange.getIn().setHeaders(headers);*
>>>>>
>>>>> *                     exchange.getIn().setBody(payload);*
>>>>>
>>>>> *                     exchange.setPattern(ExchangePattern.InOut);*
>>>>>
>>>>>
>>>>>
>>>>> *              }*
>>>>>
>>>>> *       });*
>>>>>
>>>>> *       // these fail*
>>>>>
>>>>> *       assertThat("Request was un-successful", (
>>>>> reply.getIn().getHeader("SQLErrorMessage") == null ? "null"
>>>>> :((String)reply.getIn().getHeader("SQLErrorMessage"))).toLowerCase(
>>>>> )
>>>>> ,equalTo("success"));*
>>>>>
>>>>> *       assertThat("Didn't Return Data ", (int)
>>>>> reply.getIn().getHeader("CamelSqlRowCount"),equalTo(1));*
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> *       // these are successfull.*
>>>>>
>>>>> *       List<Exchange> list = mock.getReceivedExchanges();*
>>>>>
>>>>> *       Exchange reply2 = list.get(0);*
>>>>>
>>>>> *       assertThat("Request was un-successful", (
>>>>> reply2.getIn().getHeader("SQLErrorMessage") == null ? "null"
>>>>> :((String)reply2.getIn().getHeader("SQLErrorMessage"))).toLowerCase
>>>>> ()
>>>>> ,equalTo("success"));*
>>>>>
>>>>> *       assertThat("Didn't Return Data ", (int)
>>>>> reply2.getIn().getHeader("CamelSqlRowCount"),equalTo(1));*
>>>>>
>>>>> *}*
>>>>>
>>>>>
>>>>>
>>>>> *Route*
>>>>>
>>>>> *from("jms:queue:Sahara.Core.JobIdRequest?exchangePattern=InOut")*
>>>>>
>>>>> *      .routeId("JobLogRouter")*
>>>>>
>>>>> *
>>>>>
>>>> .log(LoggingLevel.INFO,"*********************************************
>>>> ****")
>>>>> *
>>>>>
>>>>> *       .log(LoggingLevel.INFO,"* Beginning JobAuditingRequest")*
>>>>>
>>>>> *       .log(LoggingLevel.INFO,"* Requesting Source:
>>>>> ${header.RequestingSource} " )*
>>>>>
>>>>> *       .log(LoggingLevel.INFO,"* Request Type: ${header.RequestType}
>> "
>>>> )*
>>>>> *
>>>>>
>>>> .log(LoggingLevel.INFO,"*********************************************
>>>> ****")
>>>>> *
>>>>>
>>>>> *       .choice()    *
>>>>>
>>>>> *              .when(simple ("${header.RequestType} ==
>> \"complete\""))*
>>>>> *                     .to("direct:CompleteJob")*
>>>>>
>>>>> *                     .endChoice()*
>>>>>
>>>>> *
>>>>> .when().simple("${header.RequestType?.equalsIgnoreCase(\"audit\")}"
>>>>> )*
>>>>>
>>>>> *                     .to("direct:LogAudit")*
>>>>>
>>>>> *                     .endChoice()*
>>>>>
>>>>> *              .when(simple ("${header.RequestType} == \"error\""))*
>>>>>
>>>>> *                     .to("direct:LogError")*
>>>>>
>>>>> *                     .endChoice()*
>>>>>
>>>>> *
>>>>> .when().simple("${header.RequestType?.equalsIgnoreCase(\"whoami\")}
>>>>> ")*
>>>>>
>>>>> *                 .to("direct:WhoAmI")*
>>>>>
>>>>> *                 .endChoice()*
>>>>>
>>>>> *
>>>>>
>>>> .when().simple("${header.RequestType?.equalsIgnoreCase(\"testConnecti
>>>> on\")}")*
>>>>> *                 .to("direct:ConnectionTest")*
>>>>>
>>>>> *                 .endChoice()*
>>>>>
>>>>> *
>>>>> .when().simple("${header.RequestType?.equalsIgnoreCase(\"start\")}
>>>>> ||
>>>>> ${header.RequestType?.equalsIgnoreCase(\"open\")}")*
>>>>>
>>>>> *                 .to("direct:InitialJobRequest")*
>>>>>
>>>>> *                 .endChoice()   *
>>>>>
>>>>> *              .otherwise()*
>>>>>
>>>>> *                     .process(new Processor() {*
>>>>>
>>>>> *                           public void process(Exchange exchange)
>>>> throws
>>>>> Exception {*
>>>>>
>>>>> *                       //     Message in = exchange.getIn();*
>>>>>
>>>>> *                                  Map<String, Object> payload = new
>>>>> HashMap<String, Object>();*
>>>>>
>>>>> *                                  exchange.setOut(exchange.getIn());*
>>>>>
>>>>> *                                  payload = (Map<String,Object>)
>>>>> exchange.getIn().getBody();*
>>>>>
>>>>> *                                  payload.put("whoami", "I am " +
>>>>> getContext().resolvePropertyPlaceholders("{{whoami}}") );*
>>>>>
>>>>> *                                  payload.put("SQLErrorMessage", "No
>>>>> Choice Provided");*
>>>>>
>>>>> *                                  payload.put("SQLErrorNumber", -1);*
>>>>>
>>>>> *
>> exchange.getOut().setBody(payload);*
>>>>> *
>>>>> exchange.getOut().setHeader("RequestType", "failed");*
>>>>>
>>>>>
>>>>>
>>>>> *                                  }*
>>>>>
>>>>>
>>>>>
>>>>> *                     })*
>>>>>
>>>>> *                     .endChoice()*
>>>>>
>>>>> *       .end()*
>>>>>
>>>>> *       .process(new Processor() {*
>>>>>
>>>>> *              public void process(Exchange exchange) throws Exception
>>>> {*
>>>>>
>>>>>
>>>>> *                     exchange.setOut(exchange.getIn());*
>>>>>
>>>>> *
>>>>>
>>>> exchange.getOut().setHeader("BodyType",exchange.getIn().getBody().get
>>>> Class().getName());*
>>>>> *                     exchange.getOut().setHeader("BodyEncoding",
>>>> "JSON");*
>>>>> *                     String payloadJson = new
>>>>> ObjectMapper().writeValueAsString(exchange.getIn().getBody());*
>>>>>
>>>>> *                     log.debug(payloadJson);*
>>>>>
>>>>> *                     exchange.getOut().setBody(payloadJson);*
>>>>>
>>>>>
>>>>>
>>>>> *              }*
>>>>>
>>>>>
>>>>>
>>>>> *       })*
>>>>>
>>>>> *       .log(LoggingLevel.INFO, "Request results Job ID Request : \r\n
>>>>> Header:  ${headers} \r\n Body:  ${body} ")*
>>>>>
>>>>>
>>>>>
>>>>> *       .to("mock:JobLogResult")*
>>>>>
>>>>> *          .onException(Exception.class)*
>>>>>
>>>>> *              .setHeader("MESSAGE_INFO", constant("Failure to Process
>>>>> Audit Job"))*
>>>>>
>>>>> *              .setHeader("ROUTE_STOP", constant(Boolean.TRUE))*
>>>>>
>>>>> *              .log(LoggingLevel.ERROR, "${exception.stackTrace}")*
>>>>>
>>>>> *              .maximumRedeliveries("0")*
>>>>>
>>>>> *              .redeliveryDelay("0")*
>>>>>
>>>>> *              .id("_onExceptionCron")*
>>>>>
>>>>> *              .handled(true)*
>>>>>
>>>>> *              ;*
>>>>>
>>>>> *       ;*
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> * from("direct:ConnectionTest")*
>>>>>
>>>>> *       .routeId("ConnectionTest")*
>>>>>
>>>>> *       .log(LoggingLevel.INFO, "Beginning Connection Test")*
>>>>>
>>>>> *       .log(LoggingLevel.INFO,"whoami ->> {{whoami}}")*
>>>>>
>>>>> *       .log(LoggingLevel.INFO, "Request: \r\n Header:  ${headers}
>> \r\n
>>>>> Body:  ${body} ")*
>>>>>
>>>>> *       .to("sql:select name from sys.sysusers where uid =
>>>>> 0?dataSource=#Sahara_DS")*
>>>>>
>>>>> *       .log(LoggingLevel.INFO, "Request Result: \r\n Header:
>>>> ${headers}
>>>>> \r\n Body:  ${body} ")*
>>>>>
>>>>>
>>>>>
>>>>> *       .process(new Processor() {*
>>>>>
>>>>> *              public void process(Exchange exchange) throws Exception
>>>> {*
>>>>> *              Map<String, Object> payload = new HashMap<String,
>>>>> Object>();*
>>>>>
>>>>> *              exchange.setOut(exchange.getIn());*
>>>>>
>>>>> *              ArrayList<Map<String,Object>> body =
>>>>> (ArrayList<Map<String,Object>>)  exchange.getIn().getBody();*
>>>>>
>>>>> *              payload.put("whoami", "I am " +
>>>>> getContext().resolvePropertyPlaceholders("{{whoami}}") );*
>>>>>
>>>>> *              body.add(payload);*
>>>>>
>>>>> *              exchange.getOut().setBody(body);*
>>>>>
>>>>> *              exchange.getOut().setHeader("RequestType",
>> "complete");*
>>>>> *              exchange.getOut().setHeader("SQLErrorMessage",
>>>> "Success");*
>>>>> *       }*
>>>>>
>>>>>
>>>>>
>>>>> *       })*
>>>>>
>>>>> *       .onException(SQLException.class)*
>>>>>
>>>>> *              .setHeader("MESSAGE_INFO", constant("Sql Connection
>>>>> Error"))*
>>>>>
>>>>> *              .setHeader("ROUTE_STOP", constant(Boolean.TRUE))*
>>>>>
>>>>> *              .log(LoggingLevel.ERROR, "${exception.stackTrace}")*
>>>>>
>>>>> *              .maximumRedeliveries("0")*
>>>>>
>>>>> *              .redeliveryDelay("0")*
>>>>>
>>>>> *              .id("_onExceptionConnectionTest")*
>>>>>
>>>>> *              .handled(false)*
>>>>>
>>>>> *       ;*
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> Regards-
>>>>>
>>>>> Marci Wilken
>>>>>
>>>>> Operations Architect
>>>>>
>>>>> Office of Information Services
>>>>>
>>>>> OHA/DHS/CAF-CW/OR-KIDS
>>>>>
>>>>> 503.378.2405
>>>>>
>>>>> [image: Description: Description: Description:
>>>>> cid:image001.png@01CCF13E.2FACEC70][image: OHAsiglogo]
>>>>>
>>>>> CONFIDENTIALITY NOTICE
>>>>>
>>>>> *This email may contain information that is privileged,
>>>>> confidential, or otherwise exempt from disclosure under applicable
>>>>> law. If you are not
>>>> the
>>>>> addressee or it appears from the context or otherwise that you have
>>>>> received this email in error, please advise me immediately by reply
>>>> email,
>>>>> keep the contents confidential, and immediately delete the message
>>>>> and
>>>> any
>>>>> attachments from your system. *
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>> «  [hide part of quote]
>>>>
>>>>
>>>> --
>>>> Claus Ibsen
>>>> -----------------
>>>> http://davsclaus.com @davsclaus
>>>> Camel in Action 2: https://www.manning.com/ibsen2 « Return to Camel -
>>>> Users  |  12 views
>>>> Free forum by Nabble    Disable Popup Ads | Edit this page
>>>>
>>>>
Reply | Threaded
Open this post in threaded view
|

Re: ProducerTemplate.request() not returning complete Exchange

Alex Dettinger
Marci,

  Not sure I get the overall test scenario, but I think it's expected.
"producerTemplate.send("jms:sahara", new Processor() {" is actually sending
a message with MEP InOnly.
So you get the exchange back before "jms:sahara" as treated the message,
and thus can't assert any side effect from jms:sahara route.

  producerTemplate.request(...) should set the MEP to InOut early enough so
that your test is ok.

Alex

On Thu, Feb 20, 2020 at 3:47 AM Marci Wilken <[hidden email]> wrote:

>
> The problem or my mis-understanding is in the producertemplate in the
> Junit tests
>
> In the actual code when calling from one route to the other
>
> to(jms:Sahara) header.test = foo
>
> from(jms:Sahara)...  with MEP inout
>
> /* do some cool stuff */
>
> Back to the previous  returns the correct message/exchange header.test =
> bar
>
> what is not happening from the junit test is Exchange reply
> =producerTemplate.send(new Processor etc )
>
>        @Test
>            @InSequence(15)
>            public void testProcessingRun() throws Exception {
>                  log.info("\r\n\r\n----->>  Begin testProcessingRun()
> <<-----");
>                   jobID = -1;
>                  ArrayList<Map<String,Object>> rtnArray = null;
>
>                    MockEndpoint mock =
> camelctx.getEndpoint("mock:SaharaCoreTestResult", MockEndpoint.class);
>                    assertNotNull("Expected mock:SaharaCoreTestResult to
> exist", mock);
>                    mock.reset();
>                    mock.expectedMessageCount(1);
>
>                  // this is the problem
>                    ProducerTemplate producerTemplate =
> camelctx.createProducerTemplate();
>
>                    Map<String, Object> headers = new HashMap<String,
> Object>();
>
>
>                    Exchange exchange=
> producerTemplate.send("jms:sahara", new Processor() {
>
>                        public void process(final Exchange exchange) {
>
>                          headers.put("test", "foo");
>                          Map<String, Object> payload = new
> HashMap<String, Object>();
>                          exchange.getIn().setHeaders(headers);
>                        exchange.getIn().setBody(payload);
>                        exchange.setPattern(ExchangePattern.InOut);
>
>                        }
>                    });
>
>                  assertThat("header is not bar",
> reply.getIn().getHeader("test"), equalTo("bar")) <--- this fails
>                  assertThat("header is not bar",
> reply.getOut().getHeader("test"), equalTo("bar")) <--- this fails
>
>                  List<Exchange> list = mock.getReceivedExchanges();
>                  assertThat("Expected exhange to return",
> list.size(),greaterThan(0));
>                  Exchange mockReply = list.get(0);
>                  assertThat("header is not bar", mockReply
> .getIn().getHeader("test"), equalTo("bar")) <--- this works
>
>                  MockEndpoint.assertIsSatisfied(5L, TimeUnit.SECONDS,
> mock);
>
>                log.info("----->>  testProcessingRun Complete <<-----");
>            }
>
>
>
>
> > +1 for getMessage() :)
> >
> > On Mon, Feb 17, 2020 at 11:18 AM Giovanni Condello <
> > [hidden email]> wrote:
> >
> >> Hi Marci,
> >>
> >> to add on what Alex said, I found it's always better to use either
> >> getMessage() or to explicitly check if the exchange hasOut() instead of
> >> assuming getIn() is always the right place to look for data coming out
> of a
> >> component
> >>
> >> Giovanni
> >>
> >> -----Messaggio originale-----
> >> Da: Alex Dettinger <[hidden email]>
> >> Inviato: lunedì 17 febbraio 2020 11:04
> >> A: [hidden email]
> >> Oggetto: Re: ProducerTemplate.request() not returning complete Exchange
> >>
> >> Hi Marci,
> >>
> >>    I would say that from("jms:Sahara")... has MEP InOUt so In message
> with
> >> header "foo", and OUT message with header "bar".
> >>
> >>    So assertThat(“return should be
> >> bar”,reply.get*Out/Message*().getHeader(“test”),equalTo(“bar”)
> >> should be ok.
> >>
> >> Alex
> >>
> >> On Mon, Feb 17, 2020 at 11:02 AM Alex Dettinger <[hidden email]>
> >> wrote:
> >>
> >>> Hi Marci,
> >>>
> >>>    I would say that from("jms:Sahara")... has MEP InOUt so In message
> >>> with header "foo", and OUT mess
> >>>
> >>> On Fri, Feb 14, 2020 at 11:43 PM Marci <[hidden email]> wrote:
> >>>
> >>>>   I understand that the new Processor(){} is for the message being
> >>>> sent to the consumers (from(route)) and this is working fine.
> >>>> What I am not understanding is why the ProducerTemplate.request() is
> >>>> not returning an exchange back at the end of the route.
> >>>> For example.
> >>>> Exchange reply = producerTemplate.request("jms:Sahara", new
> Processor()
> >> {
> >>>>          public void process(final Exchange exchange) {
> >>>>          exchange.getIn().setHeader(“test”,”foo”)
> >>>>          }
> >>>> Route
> >>>>       From(jms:Sahara”)
> >>>>            .log(LoggingLevel.INFO,"* Test ID: ${header.test}" )
> >>>>            .setHeader(“test”,”bar”);
> >>>> end route
> >>>>
> >>>> assertThat(“return should be
> >>>> bar”,reply.getIn().getHeader(“test”),equalTo(“bar”)
> >>>> but it is still “foo”
> >>>> This is failing when  it should pass
> >>>>
> >>>> Maybe I’m reading these wrong
> >>>>  From Javadoc.io
> >>>> Exchange        request(Endpoint endpoint, Processor processor)
> >>>> Sends an exchange to an endpoint using a supplied processor Uses an
> >>>> ExchangePattern.InOut message exchange pattern.
> >>>>
> >>>> from the camel apache org website
> >>>> request*() methods
> >>>>
> >>>> The send*() methods use the default Message Exchange Pattern (InOnly,
> >>>> InOut etc) as the endpoint. If you want to explicitly perform a
> >>>> request/response (InOut) you can use the request*() methods instead
> >>>> of the
> >>>> send*() methods.
> >>>>
> >>>> E.g. let’s invoke an endpoint and get the response:
> >>>>
> >>>>
> >>>>
> >>>>> Camel › Camel - Users › Camel - Users kilidarria
> >>>>> Re: ProducerTemplate.request() not returning complete Exchange.
> >>>>> ‹ Previous Topic Next Topic ›
> >>>> classic         Classic         list    List    threaded
> Threaded
> >>>>   X  Turn off highlighting 1 message Options Options
> >>>>
> >>>> Classic Threaded   Reply   More   Close
> >>>> Jan 15, 2020; 12:34am Claus Ibsen-2 Claus Ibsen-2 Re:
> >>>> ProducerTemplate.request() not returning complete Exchange.
> >>>> Hi
> >>>>
> >>>> The new Processor(){ } in the request method of the producer template
> >>>> is not for the reply message, but for preparing the message to send
> >>>> (eg the input message).
> >>>>
> >>>> On Tue, Jan 14, 2020 at 9:16 PM Wilken Marci J < [hidden email]>
> >>>> wrote:
> >>>>
> >>>>> Based on the documentation I was reading the
> >>>>> ProducerTemplate.request(<URL>, Processor) should return the
> >>>>> Exchange information from the route that was called.  When I am
> >>>>> running my test
> >>>> on
> >>>>> this it is returning the body information in the
> >>>>> exchange.getOut().getBody() but the exchange.getOut().getHeaders()
> >>>>> has
> >>>> the
> >>>>> headers that I sent and not returned headers that I see if I look
> >>>>> at the mock endpoint.  I am expecting it to return the header and
> >>>>> body data in
> >>>> the
> >>>>> Exchange.getIn().  Since this test is calling a jms message Queue
> >>>> (JBoss
> >>>>> EAP 7.2 Fuse 7.3) I am expecting this to return the same thing as
> >>>>> the a call from another route would see.  Am I understanding this
> >>>>> wrong or am
> >>>> I
> >>>>> missing a setting?
> >>>>>
> >>>>>
> >>>>>
> >>>>> *public void testConnection() throws Exception {*
> >>>>>
> >>>>> *       log.info <http://log.info>("----->>  Begin testConnection
> >>>>> <<-----");*
> >>>>>
> >>>>> *       Assert.assertNotNull("Expected QueueRoute-context to exist",
> >>>>> camelctx);*
> >>>>>
> >>>>> *       Assert.assertEquals(ServiceStatus.Started,
> >>>> camelctx.getStatus());*
> >>>>>
> >>>>>
> >>>>> *       MockEndpoint mock = camelctx.getEndpoint("mock:JobLogResult",
> >>>>> MockEndpoint.class);*
> >>>>>
> >>>>> *       assertNotNull("Expected mock:JobLogResult to exist", mock);*
> >>>>>
> >>>>> *       mock.reset();*
> >>>>>
> >>>>> *       mock.expectedMessageCount(1);*
> >>>>>
> >>>>>
> >>>>>
> >>>>> *       ProducerTemplate producerTemplate =
> >>>>> camelctx.createProducerTemplate();*
> >>>>>
> >>>>> *       Assert.assertNotNull("Expected producerTemplate to exist",
> >>>>> producerTemplate);*
> >>>>>
> >>>>>
> >>>>>
> >>>>> *       Exchange reply =
> >>>>> producerTemplate.request("jms:queue:Sahara.Core.JobIdRequest", new
> >>>>> Processor() {*
> >>>>>
> >>>>>
> >>>>>
> >>>>> *              public void process(final Exchange exchange) {*
> >>>>>
> >>>>>
> >>>>>
> >>>>> *                     Map<String, Object> headers = new
> >> HashMap<String,
> >>>>> Object>();*
> >>>>>
> >>>>> *                     headers.put("RequestingSource",
> >>>> requestingSource);*
> >>>>> *                     headers.put("AuthorizationKey",
> >>>> authorizationKey);*
> >>>>> *                     headers.put("RequestType", "testConnection");*
> >>>>>
> >>>>>
> >>>>>
> >>>>> *                     Map<String, Object> payload = new
> >> HashMap<String,
> >>>>> Object>();*
> >>>>>
> >>>>>
> >>>>>
> >>>>> *                     exchange.getIn().setHeaders(headers);*
> >>>>>
> >>>>> *                     exchange.getIn().setBody(payload);*
> >>>>>
> >>>>> *                     exchange.setPattern(ExchangePattern.InOut);*
> >>>>>
> >>>>>
> >>>>>
> >>>>> *              }*
> >>>>>
> >>>>> *       });*
> >>>>>
> >>>>> *       // these fail*
> >>>>>
> >>>>> *       assertThat("Request was un-successful", (
> >>>>> reply.getIn().getHeader("SQLErrorMessage") == null ? "null"
> >>>>> :((String)reply.getIn().getHeader("SQLErrorMessage"))).toLowerCase(
> >>>>> )
> >>>>> ,equalTo("success"));*
> >>>>>
> >>>>> *       assertThat("Didn't Return Data ", (int)
> >>>>> reply.getIn().getHeader("CamelSqlRowCount"),equalTo(1));*
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> *       // these are successfull.*
> >>>>>
> >>>>> *       List<Exchange> list = mock.getReceivedExchanges();*
> >>>>>
> >>>>> *       Exchange reply2 = list.get(0);*
> >>>>>
> >>>>> *       assertThat("Request was un-successful", (
> >>>>> reply2.getIn().getHeader("SQLErrorMessage") == null ? "null"
> >>>>> :((String)reply2.getIn().getHeader("SQLErrorMessage"))).toLowerCase
> >>>>> ()
> >>>>> ,equalTo("success"));*
> >>>>>
> >>>>> *       assertThat("Didn't Return Data ", (int)
> >>>>> reply2.getIn().getHeader("CamelSqlRowCount"),equalTo(1));*
> >>>>>
> >>>>> *}*
> >>>>>
> >>>>>
> >>>>>
> >>>>> *Route*
> >>>>>
> >>>>> *from("jms:queue:Sahara.Core.JobIdRequest?exchangePattern=InOut")*
> >>>>>
> >>>>> *      .routeId("JobLogRouter")*
> >>>>>
> >>>>> *
> >>>>>
> >>>> .log(LoggingLevel.INFO,"*********************************************
> >>>> ****")
> >>>>> *
> >>>>>
> >>>>> *       .log(LoggingLevel.INFO,"* Beginning JobAuditingRequest")*
> >>>>>
> >>>>> *       .log(LoggingLevel.INFO,"* Requesting Source:
> >>>>> ${header.RequestingSource} " )*
> >>>>>
> >>>>> *       .log(LoggingLevel.INFO,"* Request Type: ${header.RequestType}
> >> "
> >>>> )*
> >>>>> *
> >>>>>
> >>>> .log(LoggingLevel.INFO,"*********************************************
> >>>> ****")
> >>>>> *
> >>>>>
> >>>>> *       .choice()    *
> >>>>>
> >>>>> *              .when(simple ("${header.RequestType} ==
> >> \"complete\""))*
> >>>>> *                     .to("direct:CompleteJob")*
> >>>>>
> >>>>> *                     .endChoice()*
> >>>>>
> >>>>> *
> >>>>> .when().simple("${header.RequestType?.equalsIgnoreCase(\"audit\")}"
> >>>>> )*
> >>>>>
> >>>>> *                     .to("direct:LogAudit")*
> >>>>>
> >>>>> *                     .endChoice()*
> >>>>>
> >>>>> *              .when(simple ("${header.RequestType} == \"error\""))*
> >>>>>
> >>>>> *                     .to("direct:LogError")*
> >>>>>
> >>>>> *                     .endChoice()*
> >>>>>
> >>>>> *
> >>>>> .when().simple("${header.RequestType?.equalsIgnoreCase(\"whoami\")}
> >>>>> ")*
> >>>>>
> >>>>> *                 .to("direct:WhoAmI")*
> >>>>>
> >>>>> *                 .endChoice()*
> >>>>>
> >>>>> *
> >>>>>
> >>>> .when().simple("${header.RequestType?.equalsIgnoreCase(\"testConnecti
> >>>> on\")}")*
> >>>>> *                 .to("direct:ConnectionTest")*
> >>>>>
> >>>>> *                 .endChoice()*
> >>>>>
> >>>>> *
> >>>>> .when().simple("${header.RequestType?.equalsIgnoreCase(\"start\")}
> >>>>> ||
> >>>>> ${header.RequestType?.equalsIgnoreCase(\"open\")}")*
> >>>>>
> >>>>> *                 .to("direct:InitialJobRequest")*
> >>>>>
> >>>>> *                 .endChoice()   *
> >>>>>
> >>>>> *              .otherwise()*
> >>>>>
> >>>>> *                     .process(new Processor() {*
> >>>>>
> >>>>> *                           public void process(Exchange exchange)
> >>>> throws
> >>>>> Exception {*
> >>>>>
> >>>>> *                       //     Message in = exchange.getIn();*
> >>>>>
> >>>>> *                                  Map<String, Object> payload = new
> >>>>> HashMap<String, Object>();*
> >>>>>
> >>>>> *
> exchange.setOut(exchange.getIn());*
> >>>>>
> >>>>> *                                  payload = (Map<String,Object>)
> >>>>> exchange.getIn().getBody();*
> >>>>>
> >>>>> *                                  payload.put("whoami", "I am " +
> >>>>> getContext().resolvePropertyPlaceholders("{{whoami}}") );*
> >>>>>
> >>>>> *                                  payload.put("SQLErrorMessage", "No
> >>>>> Choice Provided");*
> >>>>>
> >>>>> *                                  payload.put("SQLErrorNumber",
> -1);*
> >>>>>
> >>>>> *
> >> exchange.getOut().setBody(payload);*
> >>>>> *
> >>>>> exchange.getOut().setHeader("RequestType", "failed");*
> >>>>>
> >>>>>
> >>>>>
> >>>>> *                                  }*
> >>>>>
> >>>>>
> >>>>>
> >>>>> *                     })*
> >>>>>
> >>>>> *                     .endChoice()*
> >>>>>
> >>>>> *       .end()*
> >>>>>
> >>>>> *       .process(new Processor() {*
> >>>>>
> >>>>> *              public void process(Exchange exchange) throws
> Exception
> >>>> {*
> >>>>>
> >>>>>
> >>>>> *                     exchange.setOut(exchange.getIn());*
> >>>>>
> >>>>> *
> >>>>>
> >>>> exchange.getOut().setHeader("BodyType",exchange.getIn().getBody().get
> >>>> Class().getName());*
> >>>>> *                     exchange.getOut().setHeader("BodyEncoding",
> >>>> "JSON");*
> >>>>> *                     String payloadJson = new
> >>>>> ObjectMapper().writeValueAsString(exchange.getIn().getBody());*
> >>>>>
> >>>>> *                     log.debug(payloadJson);*
> >>>>>
> >>>>> *                     exchange.getOut().setBody(payloadJson);*
> >>>>>
> >>>>>
> >>>>>
> >>>>> *              }*
> >>>>>
> >>>>>
> >>>>>
> >>>>> *       })*
> >>>>>
> >>>>> *       .log(LoggingLevel.INFO, "Request results Job ID Request :
> \r\n
> >>>>> Header:  ${headers} \r\n Body:  ${body} ")*
> >>>>>
> >>>>>
> >>>>>
> >>>>> *       .to("mock:JobLogResult")*
> >>>>>
> >>>>> *          .onException(Exception.class)*
> >>>>>
> >>>>> *              .setHeader("MESSAGE_INFO", constant("Failure to
> Process
> >>>>> Audit Job"))*
> >>>>>
> >>>>> *              .setHeader("ROUTE_STOP", constant(Boolean.TRUE))*
> >>>>>
> >>>>> *              .log(LoggingLevel.ERROR, "${exception.stackTrace}")*
> >>>>>
> >>>>> *              .maximumRedeliveries("0")*
> >>>>>
> >>>>> *              .redeliveryDelay("0")*
> >>>>>
> >>>>> *              .id("_onExceptionCron")*
> >>>>>
> >>>>> *              .handled(true)*
> >>>>>
> >>>>> *              ;*
> >>>>>
> >>>>> *       ;*
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> * from("direct:ConnectionTest")*
> >>>>>
> >>>>> *       .routeId("ConnectionTest")*
> >>>>>
> >>>>> *       .log(LoggingLevel.INFO, "Beginning Connection Test")*
> >>>>>
> >>>>> *       .log(LoggingLevel.INFO,"whoami ->> {{whoami}}")*
> >>>>>
> >>>>> *       .log(LoggingLevel.INFO, "Request: \r\n Header:  ${headers}
> >> \r\n
> >>>>> Body:  ${body} ")*
> >>>>>
> >>>>> *       .to("sql:select name from sys.sysusers where uid =
> >>>>> 0?dataSource=#Sahara_DS")*
> >>>>>
> >>>>> *       .log(LoggingLevel.INFO, "Request Result: \r\n Header:
> >>>> ${headers}
> >>>>> \r\n Body:  ${body} ")*
> >>>>>
> >>>>>
> >>>>>
> >>>>> *       .process(new Processor() {*
> >>>>>
> >>>>> *              public void process(Exchange exchange) throws
> Exception
> >>>> {*
> >>>>> *              Map<String, Object> payload = new HashMap<String,
> >>>>> Object>();*
> >>>>>
> >>>>> *              exchange.setOut(exchange.getIn());*
> >>>>>
> >>>>> *              ArrayList<Map<String,Object>> body =
> >>>>> (ArrayList<Map<String,Object>>)  exchange.getIn().getBody();*
> >>>>>
> >>>>> *              payload.put("whoami", "I am " +
> >>>>> getContext().resolvePropertyPlaceholders("{{whoami}}") );*
> >>>>>
> >>>>> *              body.add(payload);*
> >>>>>
> >>>>> *              exchange.getOut().setBody(body);*
> >>>>>
> >>>>> *              exchange.getOut().setHeader("RequestType",
> >> "complete");*
> >>>>> *              exchange.getOut().setHeader("SQLErrorMessage",
> >>>> "Success");*
> >>>>> *       }*
> >>>>>
> >>>>>
> >>>>>
> >>>>> *       })*
> >>>>>
> >>>>> *       .onException(SQLException.class)*
> >>>>>
> >>>>> *              .setHeader("MESSAGE_INFO", constant("Sql Connection
> >>>>> Error"))*
> >>>>>
> >>>>> *              .setHeader("ROUTE_STOP", constant(Boolean.TRUE))*
> >>>>>
> >>>>> *              .log(LoggingLevel.ERROR, "${exception.stackTrace}")*
> >>>>>
> >>>>> *              .maximumRedeliveries("0")*
> >>>>>
> >>>>> *              .redeliveryDelay("0")*
> >>>>>
> >>>>> *              .id("_onExceptionConnectionTest")*
> >>>>>
> >>>>> *              .handled(false)*
> >>>>>
> >>>>> *       ;*
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>> Regards-
> >>>>>
> >>>>> Marci Wilken
> >>>>>
> >>>>> Operations Architect
> >>>>>
> >>>>> Office of Information Services
> >>>>>
> >>>>> OHA/DHS/CAF-CW/OR-KIDS
> >>>>>
> >>>>> 503.378.2405
> >>>>>
> >>>>> [image: Description: Description: Description:
> >>>>> cid:image001.png@01CCF13E.2FACEC70][image: OHAsiglogo]
> >>>>>
> >>>>> CONFIDENTIALITY NOTICE
> >>>>>
> >>>>> *This email may contain information that is privileged,
> >>>>> confidential, or otherwise exempt from disclosure under applicable
> >>>>> law. If you are not
> >>>> the
> >>>>> addressee or it appears from the context or otherwise that you have
> >>>>> received this email in error, please advise me immediately by reply
> >>>> email,
> >>>>> keep the contents confidential, and immediately delete the message
> >>>>> and
> >>>> any
> >>>>> attachments from your system. *
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>>>
> >>>> «  [hide part of quote]
> >>>>
> >>>>
> >>>> --
> >>>> Claus Ibsen
> >>>> -----------------
> >>>> http://davsclaus.com @davsclaus
> >>>> Camel in Action 2: https://www.manning.com/ibsen2 « Return to Camel -
> >>>> Users  |  12 views
> >>>> Free forum by Nabble    Disable Popup Ads | Edit this page
> >>>>
> >>>>
>