Re: ProducerTemplate.request() not returning complete Exchange.

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Re: ProducerTemplate.request() not returning complete Exchange.

Claus Ibsen-2
Hi

The new Processor(){ } in the request method of the producer template is
not for the reply message, but for preparing the message to send (eg the
input message).

On Tue, Jan 14, 2020 at 9:16 PM Wilken Marci J <
[hidden email]> wrote:

> Based on the documentation I was reading the
> ProducerTemplate.request(<URL>, Processor) should return the  Exchange
> information from the route that was called.  When I am running my test on
> this it is returning the body information in the
> exchange.getOut().getBody() but the exchange.getOut().getHeaders() has the
> headers that I sent and not returned headers that I see if I look at the
> mock endpoint.  I am expecting it to return the header and body data in the
> Exchange.getIn().  Since this test is calling a jms message Queue  (JBoss
> EAP 7.2 Fuse 7.3) I am expecting this to return the same thing as the a
> call from another route would see.  Am I understanding this wrong or am I
> missing a setting?
>
>
>
> *public void testConnection() throws Exception {*
>
> *       log.info <http://log.info>("----->>  Begin testConnection
> <<-----");*
>
> *       Assert.assertNotNull("Expected QueueRoute-context to exist",
> camelctx);*
>
> *       Assert.assertEquals(ServiceStatus.Started, camelctx.getStatus());*
>
>
>
> *       MockEndpoint mock = camelctx.getEndpoint("mock:JobLogResult",
> MockEndpoint.class);*
>
> *       assertNotNull("Expected mock:JobLogResult to exist", mock);*
>
> *       mock.reset();*
>
> *       mock.expectedMessageCount(1);*
>
>
>
> *       ProducerTemplate producerTemplate =
> camelctx.createProducerTemplate();*
>
> *       Assert.assertNotNull("Expected producerTemplate to exist",
> producerTemplate);*
>
>
>
> *       Exchange reply =
> producerTemplate.request("jms:queue:Sahara.Core.JobIdRequest", new
> Processor() {*
>
>
>
> *              public void process(final Exchange exchange) {*
>
>
>
> *                     Map<String, Object> headers = new HashMap<String,
> Object>();*
>
> *                     headers.put("RequestingSource", requestingSource);*
>
> *                     headers.put("AuthorizationKey", authorizationKey);*
>
> *                     headers.put("RequestType", "testConnection");*
>
>
>
> *                     Map<String, Object> payload = new HashMap<String,
> Object>();*
>
>
>
> *                     exchange.getIn().setHeaders(headers);*
>
> *                     exchange.getIn().setBody(payload);*
>
> *                     exchange.setPattern(ExchangePattern.InOut);*
>
>
>
> *              }*
>
> *       });*
>
> *       // these fail*
>
> *       assertThat("Request was un-successful", (
> reply.getIn().getHeader("SQLErrorMessage") == null ? "null"
> :((String)reply.getIn().getHeader("SQLErrorMessage"))).toLowerCase()
> ,equalTo("success"));*
>
> *       assertThat("Didn't Return Data ", (int)
> reply.getIn().getHeader("CamelSqlRowCount"),equalTo(1));*
>
>
>
>
>
> *       // these are successfull.*
>
> *       List<Exchange> list = mock.getReceivedExchanges();*
>
> *       Exchange reply2 = list.get(0);*
>
> *       assertThat("Request was un-successful", (
> reply2.getIn().getHeader("SQLErrorMessage") == null ? "null"
> :((String)reply2.getIn().getHeader("SQLErrorMessage"))).toLowerCase()
> ,equalTo("success"));*
>
> *       assertThat("Didn't Return Data ", (int)
> reply2.getIn().getHeader("CamelSqlRowCount"),equalTo(1));*
>
> *}*
>
>
>
> *Route*
>
> *from("jms:queue:Sahara.Core.JobIdRequest?exchangePattern=InOut")*
>
> *      .routeId("JobLogRouter")*
>
> *
> .log(LoggingLevel.INFO,"*************************************************")
> *
>
> *       .log(LoggingLevel.INFO,"* Beginning JobAuditingRequest")*
>
> *       .log(LoggingLevel.INFO,"* Requesting Source:
> ${header.RequestingSource} " )*
>
> *       .log(LoggingLevel.INFO,"* Request Type: ${header.RequestType} " )*
>
> *
> .log(LoggingLevel.INFO,"*************************************************")
> *
>
> *       .choice()    *
>
> *              .when(simple ("${header.RequestType} == \"complete\""))*
>
> *                     .to("direct:CompleteJob")*
>
> *                     .endChoice()*
>
> *
> .when().simple("${header.RequestType?.equalsIgnoreCase(\"audit\")}")*
>
> *                     .to("direct:LogAudit")*
>
> *                     .endChoice()*
>
> *              .when(simple ("${header.RequestType} == \"error\""))*
>
> *                     .to("direct:LogError")*
>
> *                     .endChoice()*
>
> *
> .when().simple("${header.RequestType?.equalsIgnoreCase(\"whoami\")}")*
>
> *                 .to("direct:WhoAmI")*
>
> *                 .endChoice()*
>
> *
> .when().simple("${header.RequestType?.equalsIgnoreCase(\"testConnection\")}")*
>
> *                 .to("direct:ConnectionTest")*
>
> *                 .endChoice()*
>
> *
> .when().simple("${header.RequestType?.equalsIgnoreCase(\"start\")} ||
> ${header.RequestType?.equalsIgnoreCase(\"open\")}")*
>
> *                 .to("direct:InitialJobRequest")*
>
> *                 .endChoice()   *
>
> *              .otherwise()*
>
> *                     .process(new Processor() {*
>
> *                           public void process(Exchange exchange) throws
> Exception {*
>
> *                       //     Message in = exchange.getIn();*
>
> *                                  Map<String, Object> payload = new
> HashMap<String, Object>();*
>
> *                                  exchange.setOut(exchange.getIn());*
>
> *                                  payload = (Map<String,Object>)
> exchange.getIn().getBody();*
>
> *                                  payload.put("whoami", "I am " +
> getContext().resolvePropertyPlaceholders("{{whoami}}") );*
>
> *                                  payload.put("SQLErrorMessage", "No
> Choice Provided");*
>
> *                                  payload.put("SQLErrorNumber", -1);*
>
> *                                  exchange.getOut().setBody(payload);*
>
> *
> exchange.getOut().setHeader("RequestType", "failed");*
>
>
>
> *                                  }*
>
>
>
> *                     })*
>
> *                     .endChoice()*
>
> *       .end()*
>
> *       .process(new Processor() {*
>
> *              public void process(Exchange exchange) throws Exception {*
>
>
>
> *                     exchange.setOut(exchange.getIn());*
>
> *
> exchange.getOut().setHeader("BodyType",exchange.getIn().getBody().getClass().getName());*
>
> *                     exchange.getOut().setHeader("BodyEncoding", "JSON");*
>
> *                     String payloadJson = new
> ObjectMapper().writeValueAsString(exchange.getIn().getBody());*
>
> *                     log.debug(payloadJson);*
>
> *                     exchange.getOut().setBody(payloadJson);*
>
>
>
> *              }*
>
>
>
> *       })*
>
> *       .log(LoggingLevel.INFO, "Request results Job ID Request : \r\n
> Header:  ${headers} \r\n Body:  ${body} ")*
>
>
>
> *       .to("mock:JobLogResult")*
>
> *          .onException(Exception.class)*
>
> *              .setHeader("MESSAGE_INFO", constant("Failure to Process
> Audit Job"))*
>
> *              .setHeader("ROUTE_STOP", constant(Boolean.TRUE))*
>
> *              .log(LoggingLevel.ERROR, "${exception.stackTrace}")*
>
> *              .maximumRedeliveries("0")*
>
> *              .redeliveryDelay("0")*
>
> *              .id("_onExceptionCron")*
>
> *              .handled(true)*
>
> *              ;*
>
> *       ;*
>
>
>
>
>
> * from("direct:ConnectionTest")*
>
> *       .routeId("ConnectionTest")*
>
> *       .log(LoggingLevel.INFO, "Beginning Connection Test")*
>
> *       .log(LoggingLevel.INFO,"whoami ->> {{whoami}}")*
>
> *       .log(LoggingLevel.INFO, "Request: \r\n Header:  ${headers} \r\n
> Body:  ${body} ")*
>
> *       .to("sql:select name from sys.sysusers where uid =
> 0?dataSource=#Sahara_DS")*
>
> *       .log(LoggingLevel.INFO, "Request Result: \r\n Header:  ${headers}
> \r\n Body:  ${body} ")*
>
>
>
> *       .process(new Processor() {*
>
> *              public void process(Exchange exchange) throws Exception {*
>
> *              Map<String, Object> payload = new HashMap<String,
> Object>();*
>
> *              exchange.setOut(exchange.getIn());*
>
> *              ArrayList<Map<String,Object>> body =
> (ArrayList<Map<String,Object>>)  exchange.getIn().getBody();*
>
> *              payload.put("whoami", "I am " +
> getContext().resolvePropertyPlaceholders("{{whoami}}") );*
>
> *              body.add(payload);*
>
> *              exchange.getOut().setBody(body);*
>
> *              exchange.getOut().setHeader("RequestType", "complete");*
>
> *              exchange.getOut().setHeader("SQLErrorMessage", "Success");*
>
> *       }*
>
>
>
> *       })*
>
> *       .onException(SQLException.class)*
>
> *              .setHeader("MESSAGE_INFO", constant("Sql Connection
> Error"))*
>
> *              .setHeader("ROUTE_STOP", constant(Boolean.TRUE))*
>
> *              .log(LoggingLevel.ERROR, "${exception.stackTrace}")*
>
> *              .maximumRedeliveries("0")*
>
> *              .redeliveryDelay("0")*
>
> *              .id("_onExceptionConnectionTest")*
>
> *              .handled(false)*
>
> *       ;*
>
>
>
>
>
>
>
>
>
> Regards-
>
> Marci Wilken
>
> Operations Architect
>
> Office of Information Services
>
> OHA/DHS/CAF-CW/OR-KIDS
>
> 503.378.2405
>
> [image: Description: Description: Description:
> cid:image001.png@01CCF13E.2FACEC70][image: OHAsiglogo]
>
> CONFIDENTIALITY NOTICE
>
> *This email may contain information that is privileged, confidential, or
> otherwise exempt from disclosure under applicable law. If you are not the
> addressee or it appears from the context or otherwise that you have
> received this email in error, please advise me immediately by reply email,
> keep the contents confidential, and immediately delete the message and any
> attachments from your system. *
>
>
>
>
>


--
Claus Ibsen
-----------------
http://davsclaus.com @davsclaus
Camel in Action 2: https://www.manning.com/ibsen2