Quantcast

Split and Aggregate - Old Exchange is null everytime in AggregationStrategy

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Split and Aggregate - Old Exchange is null everytime in AggregationStrategy

dinesh19aug
Hi,
  I see that this question has been asked a number of times but none of post helped or had a conclusive solution. I am splitting a message and then aggregating it using Aggregator2. The code was throwing exception because oldExchange was always null. So to test I designed a small code.

I read an orders,xml file which looks like this
<Orders xmlns="http://some/schema/Order">
        <Order>
                <orderNum>1</orderNum>
        </Order>
        <Order>
                <orderNum>2</orderNum>
        </Order>
        <Order>
                <orderNum>3</orderNum>
        </Order>
        <Order>
                <orderNum>5</orderNum>
        </Order>
        <Order>
                <orderNum>6</orderNum>
        </Order>
</Orders>

My camel Context Looks like this
<camel:camelContext xmlns="http://camel.apache.org/schema/spring" xmlns:te="http://acn/schema/Order">
   
    <camel:route>
    <camel:from uri="file:src/data/catask/test?noop=true"/>
    <camel:log message="${body}"></camel:log>
    <camel:split>
    <camel:xpath>//te:Orders/*</camel:xpath>
    <camel:to uri="direct:logQueries"/>
    <camel:to uri="direct:aggegateQueries"/> 
    </camel:split>
 
    </camel:route>
   
    <camel:route>
    <camel:from uri="direct:logQueries"/>
    <camel:log message="After the call : \n ${body}"></camel:log> 
    </camel:route>
   
     <camel:route>
    <camel:from uri="direct:aggegateQueries"/>
    <camel:aggregate strategyRef="aggrTask" completionInterval="8000" >
    <camel:correlationExpression>
    <camel:xpath>//te:Order</camel:xpath>
    </camel:correlationExpression>
    <camel:to uri="file:src/data/catask/output?fileName=output.xml"/> 

    </camel:aggregate>
    </camel:route> 
   
  </camel:camelContext>

My Aggregation Strategy class looks like this
                public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                if (oldExchange == null) {
                System.out.println("Returning new exchange");
                    return newExchange;
                }
         
                String oldBody = oldExchange.getIn().getBody(String.class);
                String newBody = newExchange.getIn().getBody(String.class);
                oldExchange.getIn().setBody(oldBody + "+" + newBody);
                return oldExchange;
            }


The problem is that when the aggregated result is saved in output.xml file it contains only the last record it read from Orders.xml.

i.e.
<Order xmlns="http://some/schema/Order">
                <orderNum>6</orderNum>
        </Order>



I looked into it further and found that this was happening because after the first call oldExchange should have some value but it turns out it is always null. I think that because it is reading everything from a single file and splitting it, there is only exchange.

Any suggestions??
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Split and Aggregate - Old Exchange is null everytime in AggregationStrategy

Claus Ibsen-2
Looks like its your correlation expression that is a new group for
each message, eg each xpath result is different.

If you want to split and join the same messages then see this eip
http://camel.apache.org/composed-message-processor.html

And see the example using only splitter. That is much easier to do.

On Wed, Jan 22, 2014 at 9:36 PM, dinesh19aug <[hidden email]> wrote:

> Hi,
>   I see that this question has been asked a number of times but none of post
> helped or had a conclusive solution. I am splitting a message and then
> aggregating it using Aggregator2. The code was throwing exception because
> oldExchange was always null. So to test I designed a small code.
>
> I read an orders,xml file which looks like this
> <Orders xmlns="http://some/schema/Order">
>         <Order>
>                 <orderNum>1</orderNum>
>         </Order>
>         <Order>
>                 <orderNum>2</orderNum>
>         </Order>
>         <Order>
>                 <orderNum>3</orderNum>
>         </Order>
>         <Order>
>                 <orderNum>5</orderNum>
>         </Order>
>         <Order>
>                 <orderNum>6</orderNum>
>         </Order>
> </Orders>
>
> My camel Context Looks like this
> <camel:camelContext xmlns="http://camel.apache.org/schema/spring"
> xmlns:te="http://acn/schema/Order">
>
>     <camel:route>
>         <camel:from uri="file:src/data/catask/test?noop=true"/>
>         <camel:log message="${body}"></camel:log>
>         <camel:split>
>                 <camel:xpath>//te:Orders/*</camel:xpath>
>                 <camel:to uri="direct:logQueries"/>
>                  <camel:to uri="direct:aggegateQueries"/>
>         </camel:split>
>
>     </camel:route>
>
>     <camel:route>
>         <camel:from uri="direct:logQueries"/>
>         <camel:log message="After the call : \n ${body}"></camel:log>
>     </camel:route>
>
>      <camel:route>
>         <camel:from uri="direct:aggegateQueries"/>
>         <camel:aggregate strategyRef="aggrTask" completionInterval="8000" >
>                 <camel:correlationExpression>
>                         <camel:xpath>//te:Order</camel:xpath>
>                 </camel:correlationExpression>
>                  <camel:to uri="file:src/data/catask/output?fileName=output.xml"/>
>
>         </camel:aggregate>
>     </camel:route>
>
>   </camel:camelContext>
>
> My Aggregation Strategy class looks like this
>                 public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
>                 if (oldExchange == null) {
>                         System.out.println("Returning new exchange");
>                     return newExchange;
>                 }
>
>                 String oldBody = oldExchange.getIn().getBody(String.class);
>                 String newBody = newExchange.getIn().getBody(String.class);
>                 oldExchange.getIn().setBody(oldBody + "+" + newBody);
>                 return oldExchange;
>             }
>
>
> The problem is that when the aggregated result is saved in output.xml file
> it contains only the last record it read from Orders.xml.
>
> i.e.
> <Order xmlns="http://some/schema/Order">
>                 <orderNum>6</orderNum>
>         </Order>
>
>
>
> I looked into it further and found that this was happening because after the
> first call oldExchange should have some value but it turns out it is always
> null. I think that because it is reading everything from a single file and
> splitting it, there is only exchange.
>
> Any suggestions??
>
>
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/Split-and-Aggregate-Old-Exchange-is-null-everytime-in-AggregationStrategy-tp5746365.html
> Sent from the Camel - Users mailing list archive at Nabble.com.



--
Claus Ibsen
-----------------
Red Hat, Inc.
Email: [hidden email]
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen
Make your Camel applications look hawt, try: http://hawt.io
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Split and Aggregate - Old Exchange is null everytime in AggregationStrategy

dinesh19aug
Claus,
  I tested the Xpath expression using a Xpath Evaluator tool and also printed out the coorelation expression results and all my messages are same. Ex-
Group 1:
<Order> 
     <orderNum>1</orderNum> 
</Order> 

Group 2:
<Order> 
     <orderNum>2</orderNum> 
</Order> 

I am trying to keep things clean and hence thought that Aggregator2 would be a good way to keep the code in small chunks.
Step 1 Split
Step 2 Aggregate


I will try it using splitter only example but is there a way I can create a oldExchnage (when it is null) in my Aggregation strategy, is that a good idea?
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Split and Aggregate - Old Exchange is null everytime in AggregationStrategy

dinesh19aug
Claus,
    I tried to add the Aggregation strategy in the Splitter itself and that works and I get aggregated xmls messages. However I am still looking for a way to use Aggregator2. Is there no way I can implement it using Aggregator component?

BTW .. if anyone is following this post here's how I did it using Splitter. All othere code remains the same.

<camel:route>
    <camel:from uri="file:src/data/catask/test?noop=true"/>
    <camel:log message="${body}"></camel:log>
    <camel:split strategyRef="aggrTask"> 
    <camel:xpath>//te:Orders/*</camel:xpath>
    <camel:to uri="direct:logQueries"/>
     <
    </camel:split>
 
    </camel:route>
   
    <camel:route>
    <camel:from uri="direct:logQueries"/>
    <camel:log message="After the call : \n ${body}"></camel:log> 
    </camel:route>

Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Split and Aggregate - Old Exchange is null everytime in AggregationStrategy

dinesh19aug
Claus,
   I think I figured it out how to aggregate the messages using Aggregator. I added a headerName called id and use it as my correlation id.

 <camel:route>
    <camel:from uri="file:src/data/catask/test?noop=true"/>
    <camel:log message="${body}"></camel:log>
    <camel:split>
    <camel:xpath>//te:Orders/*</camel:xpath>
    <camel:to uri="direct:addHeaders"/>
     <camel:to uri="direct:aggegateQueries"/>
    </camel:split>
 
    </camel:route>
   
    <camel:route>
    <camel:from uri="direct:addHeaders"/>
    <camel:setHeader headerName="id">
    <camel:constant>order</camel:constant>
    </camel:setHeader>
   
    </camel:route>
   
      <camel:route>
    <camel:from uri="direct:aggegateQueries"/>
    <camel:aggregate strategyRef="aggrTask" completionInterval="8000" >
    <camel:correlationExpression>
    <simple>header.id</simple>
    </camel:correlationExpression>
    <camel:to uri="file:src/data/catask/output?fileName=output.xml"/>
    <camel:log message="MERGED:: /n ${body}"></camel:log>
    </camel:aggregate>
    </camel:route> 


This aggregates my messages. However I am still not sure that despite using correct XPATH why does Camel thinks that it is different type of message?
Loading...