[CONF] Apache Camel: Aggregator (page edited)

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[CONF] Apache Camel: Aggregator (page edited)

Dhiraj Bokde (Confluence)

Aggregator has been edited by Claus Ibsen (Oct 07, 2008).

Change summary:

CAMEL-951

(View changes)

Content:

Aggregator

The Aggregator from the EIP patterns allows you to combine a number of messages together into a single message.

A correlation Expression is used to determine the messages which should be aggregated together. If you want to aggregate all messages into a single message, just use a constant expression. An AggregationStrategy is used to combine all the message exchanges for a single correlation key into a single message exchange. The default strategy just chooses the latest message; so its ideal for throttling messages.

For example, imagine a stock market data system; you are receiving 30,000 messages per second; you may want to throttle down the updates as, say, a GUI cannot cope with such massive update rates. So you may want to aggregate these messages together so that within a window (defined by a maximum number of messages or a timeout), messages for the same stock are aggregated together; by just choosing the latest message and discarding the older prices. (You could apply a delta processing algorithm if you prefer to capture some of the history).

Using the Fluent Builders

The following example shows how to aggregate messages so that only the latest message for a specific value of the cheese header are sent.

from("direct:start").aggregator(header("cheese")).to("mock:result");

//from("seda:header").setHeader("visited", constant(true)).aggregator(header("cheese")).to("mock:result");
from("seda:header").setHeader("visited", constant(true)).to("direct:temp");

from("direct:temp").aggregator(header("cheese")).to("mock:result");

from("direct:predicate").aggregator(header("cheese"), new MyAggregationStrategy()).
    completedPredicate(header("aggregated").isEqualTo(5)).to("mock:result");

If you were using JMS then you may wish to use the JMSDestination header as the correlation key; or some custom header for the stock symbol (using the above stock market example).

from("activemq:someReallyFastTopic").aggregator(header("JMSDestination")).to("activemq:someSlowTopicForGuis");

You can of course use many different Expression languages such as XPath, XQuery, SQL or various Scripting Languages.

For further examples of this pattern in use you could look at the junit test case

Using the Spring XML Extensions

The following example shows how to create a simple aggregator using the XML notation; using an Expression for the correlation value used to aggregate messages together.

<camelContext id="camel" xmlns="http://activemq.apache.org/camel/schema/spring">
  <route>
    <from uri="direct:start"/>
    <aggregator>
      <simple>header.cheese</simple>
      <to uri="mock:result"/>
    </aggregator>
  </route>

  <route>
    <from uri="seda:header"/>
    <process ref="setHeaderProcessor"/>
    <to uri="direct:temp"/>
  </route>

  <route>
    <from uri="direct:temp"/>
    <aggregator>
      <simple>header.cheese</simple>
      <to uri="mock:result"/>
    </aggregator>
  </route>

  <route>
    <from uri="direct:predicate"/>
    <aggregator strategyRef="myAggregatorStrategy">
      <simple>header.cheese</simple>
      <to uri="mock:result"/>
      <completedPredicate>
        <methodCall bean="myAggregatorStrategy" method="isCompleted"/>
      </completedPredicate>
    </aggregator>
  </route>
</camelContext>

You can specify your own AggregationStrategy if you prefer as shown in the following example

<camelContext id="camel" xmlns="http://activemq.apache.org/camel/schema/spring">
  <route>
    <from uri="direct:start"/>
    <aggregator strategyRef="aggregatorStrategy">
      <simple>header.cheese</simple>
      <to uri="mock:result"/>
    </aggregator>
  </route>
</camelContext>

<bean id="aggregatorStrategy" class="org.apache.camel.spring.processor.MyAggregator"/>

Notice how the strategyRef attribute is used on the <aggregator> element to refer to the custom strategy in Spring.

Batch options

The aggregator supports the following batch options:

Option Default Description
batchSize 100 The in batch size. This is the number of incoming exchanges that is processed by the aggregator and when this threshold is reached the batch is completed and send.
batchOutSize 0 The out batch size. This is the number of exchanges currently aggregated in the AggregationCollection. When this threshold is reached the batch is completed and send. By default this option is disabled. The difference to the batchSize options is that this is for outgoing, so setting this size to e.g. 50 ensures that this batch will at maximum contain 50 exchanges when its sent.
batchTimeout 1000L Timeout in millis. How long should the aggregator wait before its completed and sends whatever it has currently aggregated.

AggregationCollection and AggregationStrategy

This aggregator uses a AggregationCollection to store the exchanges that is currently aggregated. The AggregationCollection uses a correlation Expression and an AggregationStrategy.

  • The correlation Expression is used to correlate the incoming exchanges. The default implementation will group messages based on the correlation expression. Other implementations could for instance just add all exchanges as a batch.
  • The strategy is used for aggregate the old (lookup by its correlation id) and the new exchanges together into a single exchange. Possible implementations include performing some kind of combining or delta processing, such as adding line items together into an invoice or just using the newest exchange and removing old exchanges such as for state tracking or market data prices; where old values are of little use.

Camel provides these implementations:

  • DefaultAggregationCollection
  • PredicateAggregationCollection
  • UseLatestAggregationStrategy

Examples

Default example

By default Camel uses DefaultAggregationCollection and UseLatestAggregationStrategy, so this simple example will just keep the latest received exchange for the given correlation Expression:

// our route is aggregating from the direct queue and sending the response to the mock
from("direct:start")
    // aggregated by header id
    // as we have not configured more on the aggregator it will default to aggregate the
    // latest exchange only
    .aggregator().header("id")
    // wait for 0.5 seconds to aggregate
    .batchTimeout(500L)
    .to("mock:result");

Using PredicateAggregationCollection

The PredicateAggregationCollection is an extension to DefaultAggregationCollection that uses a Predicate as well to determine the completion. For instance the Predicate can test for a special header value, a number of maximum aggregated so far etc. To use this the routing is a bit more complex as we need to create our AggregationCollection object as follows:

// create the aggregation collection we will use.
// - we will correlate the recieved message based on the id header
// - as we will just keep the latest message we use the latest strategy
// - and finally we stop aggregate if we recieve 2 or more messages
AggregationCollection ag = new PredicateAggregationCollection(header("id"),
    new UseLatestAggregationStrategy(),
    header(Exchange.AGGREGATED_COUNT).isEqualTo(3));

// our route is aggregating from the direct queue and sending the response to the mock
from("direct:start")
    // we use the collection based aggregator we already have configued
    .aggregator(ag)
    // wait for 2 seconds to aggregate
    .batchTimeout(2000L)
    .to("mock:result");

In this sample we use the predicate that we want at most 3 exchanges aggregated by the same correlation id, this is defined as:

header(Exchange.AGGREGATED_COUNT).isEqualTo(3)

Using this the aggregator will complete if we receive 3 exchanges with the same correlation id.

Using custom aggregation strategy

In this example we will aggregate incoming bids and want to aggregate the highest bid. So we provide our own strategy where we implement the code logic:

private static class MyAggregationStrategy implements AggregationStrategy {

    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        int oldPrice = oldExchange.getIn().getBody(Integer.class);
        int newPrice = newExchange.getIn().getBody(Integer.class);
        // return the "winner" that has the highest price
        return newPrice > oldPrice ? newExchange : oldExchange;
    }
}

Then we setup the routing as follows:

// our route is aggregating from the direct queue and sending the response to the mock
from("direct:start")
    // aggregated by header id and use our own strategy how to aggregate
    .aggregator(new MyAggregationStrategy()).header("id")
    // wait for 2 seconds to aggregate
    .batchTimeout(2000L)
    .to("mock:result");

And since this is based on an unit test we show the test code that send the bids and what is expected as the winners:

MockEndpoint result = getMockEndpoint("mock:result");

// we expect 4 messages as they have different header id
result.expectedMessageCount(2);
result.expectedBodiesReceived("200", "150");

// then we sent all the message at once
template.sendBodyAndHeader("direct:start", "100", "id", "1");
template.sendBodyAndHeader("direct:start", "150", "id", "2");
template.sendBodyAndHeader("direct:start", "130", "id", "2");
template.sendBodyAndHeader("direct:start", "200", "id", "1");
template.sendBodyAndHeader("direct:start", "190", "id", "1");

assertMockEndpointsSatisfied();

Using This Pattern

If you would like to use this EIP Pattern then please read the Getting Started, you may also find the Architecture useful particularly the description of Endpoint and URIs. Then you could try out some of the Examples first before trying this pattern out.

See also