svn commit: r702247 - in /activemq/camel/trunk/camel-core/src: main/java/org/apache/camel/ main/java/org/apache/camel/model/ main/java/org/apache/camel/processor/ main/java/org/apache/camel/processor/aggregate/ test/java/org/apache/camel/ test/java/org...

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

svn commit: r702247 - in /activemq/camel/trunk/camel-core/src: main/java/org/apache/camel/ main/java/org/apache/camel/model/ main/java/org/apache/camel/processor/ main/java/org/apache/camel/processor/aggregate/ test/java/org/apache/camel/ test/java/org...

davsclaus-2
Author: davsclaus
Date: Mon Oct  6 13:06:50 2008
New Revision: 702247

URL: http://svn.apache.org/viewvc?rev=702247&view=rev
Log:
CAMEL-951: Aggregator EIP improvements: added unit test. easier configuration of aggregator collection. added out batch size option to batch processor. AggregatorCollection is now an interface to let end-users provide their own implementation. Polished javadoc.

Added:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregationCollection.java   (contents, props changed)
      - copied, changed from r701171, activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorBatchOptionsTest.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DefaultAggregatorCollectionTest.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/PredicateAggregatorCollectionTest.java
Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/PredicateAggregationCollection.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/ContextTestSupport.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java?rev=702247&r1=702246&r2=702247&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/Exchange.java Mon Oct  6 13:06:50 2008
@@ -47,6 +47,8 @@
      * This typically won't be required as an exchange can be created with a specific MEP
      * by calling {@link Endpoint#createExchange(ExchangePattern)} but it is here just in case
      * it is needed.
+     *
+     * @param pattern  the pattern
      */
     void setPattern(ExchangePattern pattern);
 
@@ -122,6 +124,7 @@
      * Returns the outbound message; optionally lazily creating one if one has
      * not been associated with this exchange
      *
+     * @param lazyCreate <tt>true</tt> will lazy create the out message
      * @return the response
      */
     Message getOut(boolean lazyCreate);
@@ -145,7 +148,6 @@
      * not been associated with this exchange
      *
      * @param lazyCreate <tt>true</tt> will lazy create the fault message
-     *
      * @return the fault
      */
     Message getFault(boolean lazyCreate);
@@ -216,16 +218,12 @@
     void setUnitOfWork(UnitOfWork unitOfWork);
 
     /**
-     * Returns the exchange id
-     *
-     * @return the unique id of the exchange
+     * Returns the exchange id (unique)
      */
     String getExchangeId();
 
     /**
      * Set the exchange id
-     *
-     * @param id
      */
     void setExchangeId(String id);
 

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java?rev=702247&r1=702246&r2=702247&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregatorType.java Mon Oct  6 13:06:50 2008
@@ -54,6 +54,8 @@
     @XmlAttribute(required = false)
     private Integer batchSize;
     @XmlAttribute(required = false)
+    private Integer outBatchSize;
+    @XmlAttribute(required = false)
     private Long batchTimeout;
     @XmlAttribute(required = false)
     private String strategyRef;
@@ -120,15 +122,21 @@
 
         final Aggregator aggregator;
         if (aggregationCollection != null) {
-            aggregator = new Aggregator(from, processor, aggregationCollection);
-        } else {
-            AggregationStrategy strategy = getAggregationStrategy();
-            if (strategy == null && strategyRef != null) {
-                strategy = routeContext.lookup(strategyRef, AggregationStrategy.class);
+            // create the aggregator using the collection
+            // pre configure the collection if its expression and strategy is not set, then
+            // use the ones that is pre configured with this type
+            if (aggregationCollection.getCorrelationExpression() == null) {
+                aggregationCollection.setCorrelationExpression(getExpression());
             }
-            if (strategy == null) {
-                strategy = new UseLatestAggregationStrategy();
+            if (aggregationCollection.getAggregationStrategy() == null) {
+                AggregationStrategy strategy = createAggregationStrategy(routeContext);
+                aggregationCollection.setAggregationStrategy(strategy);
             }
+            aggregator = new Aggregator(from, processor, aggregationCollection);
+        } else {
+            // create the aggregator using a default collection
+            AggregationStrategy strategy = createAggregationStrategy(routeContext);
+
             Expression aggregateExpression = getExpression().createExpression(routeContext);
 
             Predicate predicate = null;
@@ -149,9 +157,26 @@
         if (batchTimeout != null) {
             aggregator.setBatchTimeout(batchTimeout);
         }
+
+        if (outBatchSize != null) {
+            aggregator.setOutBatchSize(outBatchSize);
+        }
         
         return aggregator;
     }
+
+    private AggregationStrategy createAggregationStrategy(RouteContext routeContext) {
+        AggregationStrategy strategy = getAggregationStrategy();
+        if (strategy == null && strategyRef != null) {
+            strategy = routeContext.lookup(strategyRef, AggregationStrategy.class);
+        }
+        if (strategy == null) {
+            // fallback to use latest
+            strategy = new UseLatestAggregationStrategy();
+        }
+        return strategy;
+    }
+
     public AggregationCollection getAggregationCollection() {
         return aggregationCollection;
     }
@@ -176,6 +201,14 @@
         this.batchSize = batchSize;
     }
 
+    public Integer getOutBatchSize() {
+        return outBatchSize;
+    }
+
+    public void setOutBatchSize(Integer outBatchSize) {
+        this.outBatchSize = outBatchSize;
+    }
+
     public Long getBatchTimeout() {
         return batchTimeout;
     }
@@ -207,11 +240,31 @@
         return this;
     }
 
+    public AggregatorType outBatchSize(int batchSize) {
+        setOutBatchSize(batchSize);
+        return this;
+    }
+
     public AggregatorType batchTimeout(long batchTimeout) {
         setBatchTimeout(batchTimeout);
         return this;
     }
 
+    public AggregatorType aggregationCollection(AggregationCollection aggregationCollection) {
+        setAggregationCollection(aggregationCollection);
+        return this;
+    }
+
+    public AggregatorType aggregationStrategy(AggregationStrategy aggregationStrategy) {
+        setAggregationStrategy(aggregationStrategy);
+        return this;
+    }
+
+    public AggregatorType strategyRef(String strategyRef) {
+        setStrategyRef(strategyRef);
+        return this;
+    }
+
     /**
      * Sets the predicate used to determine if the aggregation is completed
      *

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java?rev=702247&r1=702246&r2=702247&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorType.java Mon Oct  6 13:06:50 2008
@@ -39,8 +39,6 @@
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
 import org.apache.camel.Route;
-import org.apache.camel.RuntimeCamelException;
-import org.apache.camel.builder.Builder;
 import org.apache.camel.builder.DataFormatClause;
 import org.apache.camel.builder.DeadLetterChannelBuilder;
 import org.apache.camel.builder.ErrorHandlerBuilder;
@@ -56,8 +54,8 @@
 import org.apache.camel.processor.ConvertBodyProcessor;
 import org.apache.camel.processor.DelegateProcessor;
 import org.apache.camel.processor.Pipeline;
-import org.apache.camel.processor.aggregate.AggregationCollection;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.processor.aggregate.AggregationCollection;
 import org.apache.camel.processor.idempotent.MessageIdRepository;
 import org.apache.camel.spi.DataFormat;
 import org.apache.camel.spi.ErrorHandlerWrappingStrategy;
@@ -702,15 +700,17 @@
     /**
      * Creates an <a
      * href="http://activemq.apache.org/camel/aggregator.html">Aggregator</a>
-     * pattern using a custom aggregation collection implementation.
+     * pattern using a custom aggregation collection implementation. The aggregation collection must
+     * be configued with the strategy and correlation expression that this aggregator should use.
+     * This avoids duplicating this configuration on both the collection and the aggregator itself.
      *
      * @param aggregationCollection the collection used to perform the aggregation
      */
-    public ExpressionClause<AggregatorType> aggregator(AggregationCollection aggregationCollection) {
+    public AggregatorType aggregator(AggregationCollection aggregationCollection) {
         AggregatorType answer = new AggregatorType();
         answer.setAggregationCollection(aggregationCollection);
         addOutput(answer);
-        return ExpressionClause.createAndSetExpression(answer);
+        return answer;
     }
 
     /**

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java?rev=702247&r1=702246&r2=702247&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Aggregator.java Mon Oct  6 13:06:50 2008
@@ -20,9 +20,10 @@
 import org.apache.camel.Expression;
 import org.apache.camel.Predicate;
 import org.apache.camel.Processor;
-import org.apache.camel.processor.aggregate.AggregationCollection;
+import org.apache.camel.processor.aggregate.DefaultAggregationCollection;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 import org.apache.camel.processor.aggregate.PredicateAggregationCollection;
+import org.apache.camel.processor.aggregate.AggregationCollection;
 
 /**
  * An implementation of the <a
@@ -46,7 +47,7 @@
 
     public Aggregator(Endpoint endpoint, Processor processor, Expression correlationExpression,
                       AggregationStrategy aggregationStrategy) {
-        this(endpoint, processor, new AggregationCollection(correlationExpression, aggregationStrategy));
+        this(endpoint, processor, new DefaultAggregationCollection(correlationExpression, aggregationStrategy));
     }
 
     public Aggregator(Endpoint endpoint, Processor processor, Expression correlationExpression,
@@ -67,10 +68,12 @@
     @Override
     protected boolean isBatchCompleted(int index) {
         if (aggregationCompletedPredicate != null) {
+            // TODO: (davsclaus) What is the point with this code? I think its wrong
             if (getCollection().size() > 0) {
                 return true;
             }
         }
+        
         return super.isBatchCompleted(index);
     }
 }

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java?rev=702247&r1=702246&r2=702247&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java Mon Oct  6 13:06:50 2008
@@ -46,6 +46,7 @@
     private Collection<Exchange> collection;
     private long batchTimeout = DEFAULT_BATCH_TIMEOUT;
     private int batchSize = DEFAULT_BATCH_SIZE;
+    private int outBatchSize;
     private PollingConsumer consumer;
     private ExceptionHandler exceptionHandler;
 
@@ -89,10 +90,31 @@
         return batchSize;
     }
 
+    /**
+     * Sets the <b>in</b> batch size. This is the number of incomiing exchanges that this batch processor
+     * will process before its completed. The default value is {@link #DEFAULT_BATCH_SIZE}.
+     *
+     * @param batchSize the size
+     */
     public void setBatchSize(int batchSize) {
         this.batchSize = batchSize;
     }
 
+    public int getOutBatchSize() {
+        return outBatchSize;
+    }
+
+    /**
+     * Sets the <b>out</b> batch size. If the batch processor holds more exchanges than this out size then
+     * the completion is triggered. Can for instance be used to ensure that this batch is completed when
+     * a certain number of exchanges has been collected. By default this feature is <b>not</b> used.
+     *
+     * @param outBatchSize the size
+     */
+    public void setOutBatchSize(int outBatchSize) {
+        this.outBatchSize = outBatchSize;
+    }
+
     public long getBatchTimeout() {
         return batchTimeout;
     }
@@ -119,12 +141,16 @@
         for (int i = 0; !isBatchCompleted(i); i++) {
             long timeout = end - System.currentTimeMillis();
             if (timeout < 0L) {                
-                LOG.debug("batch timeout expired at batch index:"  + i);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("batch timeout expired at batch index: " + i);
+                }
                 break;
             }
             Exchange exchange = consumer.receive(timeout);
             if (exchange == null) {
-                LOG.debug("receive with timeout: " + timeout + " expired at batch index:"  + i);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("receive with timeout: " + timeout + " expired at batch index: " + i);
+                }
                 break;
             }
             collection.add(exchange);
@@ -148,6 +174,11 @@
      * A strategy method to decide if the batch is completed the resulting exchanges should be sent
      */
     protected boolean isBatchCompleted(int index) {
+        // out batch size is optional and we should only check if its enabled (> 0)
+        if (outBatchSize > 0 && collection.size() >= outBatchSize) {
+            return true;
+        }
+        // fallback yo regular batch size check
         return index >= batchSize;
     }
 

Added: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java?rev=702247&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java (added)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java Mon Oct  6 13:06:50 2008
@@ -0,0 +1,60 @@
+package org.apache.camel.processor.aggregate;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Expression;
+
+/**
+ * A {@link Collection} which aggregates exchanges together,
+ * using a correlation {@link Expression} and a {@link AggregationStrategy}.
+ * <p/>
+ * The Default Implementation will group messages based on the correlation expression.
+ * Other implementations could for instance just add all exchanges as a batch.
+ *
+ * @version $Revision$
+ */
+public interface AggregationCollection extends Collection<Exchange> {
+
+    /**
+     * Gets the correlation expression
+     */
+    Expression<Exchange> getCorrelationExpression();
+
+    /**
+     * Sets the correlation expression to be used
+     */
+    void setCorrelationExpression(Expression<Exchange> correlationExpression);
+
+    /**
+     * Gets the aggregation strategy
+     */
+    AggregationStrategy getAggregationStrategy();
+
+    /**
+     * Sets the aggregation strategy to be used
+     */
+    void setAggregationStrategy(AggregationStrategy aggregationStrategy);
+
+    /**
+     * Adds the given exchange to this collection
+     */
+    boolean add(Exchange exchange);
+
+    /**
+     * Gets the iterator to iterate this collection.
+     */
+    Iterator<Exchange> iterator();
+
+    /**
+     * Gets the size of this collection
+     */
+    int size();
+
+    /**
+     * Clears this colleciton
+     */
+    void clear();
+
+}

Copied: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregationCollection.java (from r701171, activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java)
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregationCollection.java?p2=activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregationCollection.java&p1=activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java&r1=701171&r2=702247&rev=702247&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregationCollection.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregationCollection.java Mon Oct  6 13:06:50 2008
@@ -34,14 +34,17 @@
  *
  * @version $Revision$
  */
-public class AggregationCollection extends AbstractCollection<Exchange> {
-    private static final transient Log LOG = LogFactory.getLog(AggregationCollection.class);
-    private final Expression<Exchange> correlationExpression;
-    private final AggregationStrategy aggregationStrategy;
+public class DefaultAggregationCollection extends AbstractCollection<Exchange> implements AggregationCollection {
+
+    private static final transient Log LOG = LogFactory.getLog(DefaultAggregationCollection.class);
+    private Expression<Exchange> correlationExpression;
+    private AggregationStrategy aggregationStrategy;
     private Map<Object, Exchange> map = new LinkedHashMap<Object, Exchange>();
 
-    public AggregationCollection(Expression<Exchange> correlationExpression,
-                                 AggregationStrategy aggregationStrategy) {
+    public DefaultAggregationCollection() {
+    }
+
+    public DefaultAggregationCollection(Expression<Exchange> correlationExpression, AggregationStrategy aggregationStrategy) {
         this.correlationExpression = correlationExpression;
         this.aggregationStrategy = aggregationStrategy;
     }
@@ -96,4 +99,20 @@
      */
     protected void onAggregation(Object correlationKey, Exchange newExchange) {
     }
+
+    public Expression<Exchange> getCorrelationExpression() {
+        return correlationExpression;
+    }
+
+    public void setCorrelationExpression(Expression<Exchange> correlationExpression) {
+        this.correlationExpression = correlationExpression;
+    }
+
+    public AggregationStrategy getAggregationStrategy() {
+        return aggregationStrategy;
+    }
+
+    public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
+        this.aggregationStrategy = aggregationStrategy;
+    }
 }

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregationCollection.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/DefaultAggregationCollection.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/PredicateAggregationCollection.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/PredicateAggregationCollection.java?rev=702247&r1=702246&r2=702247&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/PredicateAggregationCollection.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/PredicateAggregationCollection.java Mon Oct  6 13:06:50 2008
@@ -30,7 +30,7 @@
  *
  * @version $Revision$
  */
-public class PredicateAggregationCollection extends AggregationCollection {
+public class PredicateAggregationCollection extends DefaultAggregationCollection {
     private Predicate aggregationCompletedPredicate;
     private List<Exchange> collection = new ArrayList<Exchange>();
 
@@ -42,8 +42,7 @@
     @Override
     protected void onAggregation(Object correlationKey, Exchange newExchange) {
         if (aggregationCompletedPredicate.matches(newExchange)) {
-            // this exchange has now aggregated so lets add it to the collection of things
-            // to send
+            // this exchange has now aggregated so lets add it to the collection of things to send
             super.getMap().remove(correlationKey);
             collection.add(newExchange);
         }

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/ContextTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/ContextTestSupport.java?rev=702247&r1=702246&r2=702247&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/ContextTestSupport.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/ContextTestSupport.java Mon Oct  6 13:06:50 2008
@@ -70,19 +70,17 @@
 
         template = context.createProducerTemplate();
 
-        if (useRouteBuilder) {
+        if (isUseRouteBuilder()) {
             RouteBuilder[] builders = createRouteBuilders();
             for (RouteBuilder builder : builders) {
                 log.debug("Using created route builder: " + builder);
                 context.addRoutes(builder);
             }
+            startCamelContext();
+            log.debug("Routing Rules are: " + context.getRoutes());
         } else {
             log.debug("Using route builder from the created context: " + context);
         }
-
-        startCamelContext();
-
-        log.debug("Routing Rules are: " + context.getRoutes());
     }
 
     @Override

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java?rev=702247&r1=702246&r2=702247&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/RouteBuilderTest.java Mon Oct  6 13:06:50 2008
@@ -51,12 +51,6 @@
     protected DelegateProcessor interceptor1;
     protected DelegateProcessor interceptor2;
 
-    @Override
-    protected void setUp() throws Exception {
-
-        super.setUp();
-    }
-
     protected List<Route> buildSimpleRoute() throws Exception {
         // START SNIPPET: e1
         RouteBuilder builder = new RouteBuilder() {

Added: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorBatchOptionsTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorBatchOptionsTest.java?rev=702247&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorBatchOptionsTest.java (added)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorBatchOptionsTest.java Mon Oct  6 13:06:50 2008
@@ -0,0 +1,154 @@
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * Unit test for the batch size options on aggregator.
+ */
+public class AggregatorBatchOptionsTest extends ContextTestSupport {
+
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+    public void testAggregateOutBatchSize() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            public void configure() throws Exception {
+                // START SNIPPET: e1
+                // our route is aggregating from the direct queue and sending the response to the mock
+                from("direct:start")
+                    // aggregated by header id
+                    // as we have not configured more on the aggregator it will default to aggregate the
+                    // latest exchange only
+                    .aggregator().header("id")
+                    // wait for 2 seconds to aggregate
+                    .batchTimeout(2000L)
+                    // batch size in is the limit of number of exchanges recieved, so when we have received 100
+                    // exchanges then whatever we have in the collection will be sent
+                    .batchSize(100)
+                    // limit the out batch size to 3 so when we have aggregated 3 exchanges
+                    // and we reach this limit then the exchanges is send
+                    .outBatchSize(3)
+                    .to("mock:result");
+                // END SNIPPET: e1
+            }
+        });
+        startCamelContext();
+
+        // START SNIPPET: e2
+        MockEndpoint result = getMockEndpoint("mock:result");
+
+        // we expect 3 messages grouped by the latest message only
+        result.expectedMessageCount(3);
+        result.expectedBodiesReceived("Message 1c", "Message 2b", "Message 3a");
+
+        // then we sent all the message at once
+        template.sendBodyAndHeader("direct:start", "Message 1a", "id", "1");
+        template.sendBodyAndHeader("direct:start", "Message 2a", "id", "2");
+        template.sendBodyAndHeader("direct:start", "Message 1b", "id", "1");
+        template.sendBodyAndHeader("direct:start", "Message 2b", "id", "2");
+        template.sendBodyAndHeader("direct:start", "Message 1c", "id", "1");
+        template.sendBodyAndHeader("direct:start", "Message 3a", "id", "3");
+        // when we send message 4 then we will reach the collection batch size limit and the
+        // exchanges above is the ones we have aggregated in the first batch
+        template.sendBodyAndHeader("direct:start", "Message 4", "id", "4");
+        template.sendBodyAndHeader("direct:start", "Message 3b", "id", "3");
+        template.sendBodyAndHeader("direct:start", "Message 3c", "id", "3");
+        template.sendBodyAndHeader("direct:start", "Message 1d", "id", "1");
+
+        assertMockEndpointsSatisfied();
+        // END SNIPPET: e2
+    }
+
+    public void testAggregateBatchSize() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            public void configure() throws Exception {
+                // START SNIPPET: e3
+                // our route is aggregating from the direct queue and sending the response to the mock
+                from("direct:start")
+                    // aggregated by header id
+                    // as we have not configured more on the aggregator it will default to aggregate the
+                    // latest exchange only
+                    .aggregator().header("id")
+                    // wait for 2 seconds to aggregate
+                    .batchTimeout(2000L)
+                    // batch size in is the limit of number of exchanges recieved, so when we have received 100
+                    // exchanges then whatever we have in the collection will be sent
+                    .batchSize(5)
+                    .to("mock:result");
+                // END SNIPPET: e3
+            }
+        });
+        startCamelContext();
+
+        // START SNIPPET: e4
+        MockEndpoint result = getMockEndpoint("mock:result");
+
+        // we expect 3 messages grouped by the latest message only
+        result.expectedMessageCount(2);
+        result.expectedBodiesReceived("Message 1c", "Message 2b");
+
+        // then we sent all the message at once
+        template.sendBodyAndHeader("direct:start", "Message 1a", "id", "1");
+        template.sendBodyAndHeader("direct:start", "Message 2a", "id", "2");
+        template.sendBodyAndHeader("direct:start", "Message 1b", "id", "1");
+        template.sendBodyAndHeader("direct:start", "Message 2b", "id", "2");
+        template.sendBodyAndHeader("direct:start", "Message 1c", "id", "1");
+        // when we sent the next message we have reached the in batch size limit and the current
+        // aggregated exchanges will be sent
+        template.sendBodyAndHeader("direct:start", "Message 3a", "id", "3");
+        template.sendBodyAndHeader("direct:start", "Message 4", "id", "4");
+        template.sendBodyAndHeader("direct:start", "Message 3b", "id", "3");
+        template.sendBodyAndHeader("direct:start", "Message 3c", "id", "3");
+        template.sendBodyAndHeader("direct:start", "Message 1d", "id", "1");
+
+        assertMockEndpointsSatisfied();
+        // END SNIPPET: e4
+    }
+
+    public void testAggregateBatchTimeout() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            public void configure() throws Exception {
+                // START SNIPPET: e5
+                // our route is aggregating from the direct queue and sending the response to the mock
+                from("direct:start")
+                    // aggregated by header id
+                    // as we have not configured more on the aggregator it will default to aggregate the
+                    // latest exchange only
+                    .aggregator().header("id")
+                    // wait for 0.5 seconds to aggregate
+                    .batchTimeout(500L)
+                    .to("mock:result");
+                // END SNIPPET: e5
+            }
+        });
+        startCamelContext();
+
+        // START SNIPPET: e6
+        MockEndpoint result = getMockEndpoint("mock:result");
+
+        // we expect 3 messages grouped by the latest message only
+        result.expectedMessageCount(3);
+        result.expectedBodiesReceived("Message 1c", "Message 2b", "Message 3a");
+
+        // then we sent all the message at once
+        template.sendBodyAndHeader("direct:start", "Message 1a", "id", "1");
+        template.sendBodyAndHeader("direct:start", "Message 2a", "id", "2");
+        template.sendBodyAndHeader("direct:start", "Message 1b", "id", "1");
+        template.sendBodyAndHeader("direct:start", "Message 2b", "id", "2");
+        template.sendBodyAndHeader("direct:start", "Message 1c", "id", "1");
+        template.sendBodyAndHeader("direct:start", "Message 3a", "id", "3");
+        Thread.sleep(600L);
+        // these messages are not aggregated as the timeout should have accoured
+        template.sendBodyAndHeader("direct:start", "Message 4", "id", "4");
+        template.sendBodyAndHeader("direct:start", "Message 3b", "id", "3");
+        template.sendBodyAndHeader("direct:start", "Message 3c", "id", "3");
+        template.sendBodyAndHeader("direct:start", "Message 1d", "id", "1");
+
+        assertMockEndpointsSatisfied();
+        // END SNIPPET: e6
+    }
+
+}
\ No newline at end of file

Added: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java?rev=702247&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java (added)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java Mon Oct  6 13:06:50 2008
@@ -0,0 +1,61 @@
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+
+/**
+ * Unit test for using our own aggregation strategy.
+ */
+public class CustomAggregationStrategyTest extends ContextTestSupport {
+
+    public void testCustomAggregationStrategy() throws Exception {
+        // START SNIPPET: e2
+        MockEndpoint result = getMockEndpoint("mock:result");
+
+        // we expect 4 messages as they have different header id
+        result.expectedMessageCount(2);
+        result.expectedBodiesReceived("200", "150");
+
+        // then we sent all the message at once
+        template.sendBodyAndHeader("direct:start", "100", "id", "1");
+        template.sendBodyAndHeader("direct:start", "150", "id", "2");
+        template.sendBodyAndHeader("direct:start", "130", "id", "2");
+        template.sendBodyAndHeader("direct:start", "200", "id", "1");
+        template.sendBodyAndHeader("direct:start", "190", "id", "1");
+
+        assertMockEndpointsSatisfied();
+        // END SNIPPET: e2
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                // START SNIPPET: e1
+                // our route is aggregating from the direct queue and sending the response to the mock
+                from("direct:start")
+                    // aggregated by header id and use our own strategy how to aggregate
+                    .aggregator(new MyAggregationStrategy()).header("id")
+                    // wait for 2 seconds to aggregate
+                    .batchTimeout(2000L)
+                    .to("mock:result");
+                // END SNIPPET: e1
+            }
+        };
+    }
+
+    // START SNIPPET: e3
+    private static class MyAggregationStrategy implements AggregationStrategy {
+
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            int oldPrice = oldExchange.getIn().getBody(Integer.class);
+            int newPrice = newExchange.getIn().getBody(Integer.class);
+            // return the "winner" that has the highest price
+            return newPrice > oldPrice ? newExchange : oldExchange;
+        }
+    }
+    // END SNIPPET: e3
+}
\ No newline at end of file

Added: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DefaultAggregatorCollectionTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DefaultAggregatorCollectionTest.java?rev=702247&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DefaultAggregatorCollectionTest.java (added)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DefaultAggregatorCollectionTest.java Mon Oct  6 13:06:50 2008
@@ -0,0 +1,54 @@
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * Unit test for DefaultAggregatorCollection.
+ */
+public class DefaultAggregatorCollectionTest extends ContextTestSupport {
+
+    public void testDefaultAggregateCollection() throws Exception {
+        // START SNIPPET: e2
+        MockEndpoint result = getMockEndpoint("mock:result");
+
+        // we expect 4 messages grouped by the latest message only
+        result.expectedMessageCount(4);
+        result.expectedBodiesReceived("Message 1d", "Message 2b", "Message 3c", "Message 4");
+
+        // then we sent all the message at once
+        template.sendBodyAndHeader("direct:start", "Message 1a", "id", "1");
+        template.sendBodyAndHeader("direct:start", "Message 2a", "id", "2");
+        template.sendBodyAndHeader("direct:start", "Message 3a", "id", "3");
+        template.sendBodyAndHeader("direct:start", "Message 1b", "id", "1");
+        template.sendBodyAndHeader("direct:start", "Message 3b", "id", "3");
+        template.sendBodyAndHeader("direct:start", "Message 1c", "id", "1");
+        template.sendBodyAndHeader("direct:start", "Message 3c", "id", "3");
+        template.sendBodyAndHeader("direct:start", "Message 2b", "id", "2");
+        template.sendBodyAndHeader("direct:start", "Message 1d", "id", "1");
+        template.sendBodyAndHeader("direct:start", "Message 4", "id", "4");
+
+        assertMockEndpointsSatisfied();
+        // END SNIPPET: e2
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                // START SNIPPET: e1
+                // our route is aggregating from the direct queue and sending the response to the mock
+                from("direct:start")
+                    // aggregated by header id
+                    // as we have not configured more on the aggregator it will default to aggregate the
+                    // latest exchange only
+                    .aggregator().header("id")
+                    // wait for 2 seconds to aggregate
+                    .batchTimeout(2000L)
+                    .to("mock:result");
+                // END SNIPPET: e1
+            }
+        };
+    }
+}
\ No newline at end of file

Added: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/PredicateAggregatorCollectionTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/PredicateAggregatorCollectionTest.java?rev=702247&view=auto
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/PredicateAggregatorCollectionTest.java (added)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/PredicateAggregatorCollectionTest.java Mon Oct  6 13:06:50 2008
@@ -0,0 +1,65 @@
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.PredicateAggregationCollection;
+import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
+import org.apache.camel.processor.aggregate.AggregationCollection;
+
+/**
+ * Unit test for PredicateAggregatorCollection.
+ */
+public class PredicateAggregatorCollectionTest extends ContextTestSupport {
+
+    public void testPredicateAggregateCollection() throws Exception {
+        // START SNIPPET: e2
+        MockEndpoint result = getMockEndpoint("mock:result");
+
+        // we only expect two messages as they have reached the completed predicate
+        // that we want 3 messages that has the same header id
+        result.expectedMessageCount(2);
+        result.expectedBodiesReceived("Message 1c", "Message 3c");
+
+        // then we sent all the message at once
+        template.sendBodyAndHeader("direct:start", "Message 1a", "id", "1");
+        template.sendBodyAndHeader("direct:start", "Message 2a", "id", "2");
+        template.sendBodyAndHeader("direct:start", "Message 3a", "id", "3");
+        template.sendBodyAndHeader("direct:start", "Message 1b", "id", "1");
+        template.sendBodyAndHeader("direct:start", "Message 3b", "id", "3");
+        template.sendBodyAndHeader("direct:start", "Message 1c", "id", "1");
+        template.sendBodyAndHeader("direct:start", "Message 3c", "id", "3");
+        template.sendBodyAndHeader("direct:start", "Message 2b", "id", "2");
+        template.sendBodyAndHeader("direct:start", "Message 1d", "id", "1");
+        template.sendBodyAndHeader("direct:start", "Message 4", "id", "4");
+
+        assertMockEndpointsSatisfied();
+        // END SNIPPET: e2
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                // START SNIPPET: e1
+                // create the aggregation collection we will use.
+                // - we will correlate the recieved message based on the id header
+                // - as we will just keep the latest message we use the latest strategy
+                // - and finally we stop aggregate if we recieve 2 or more messages
+                AggregationCollection ag = new PredicateAggregationCollection(header("id"),
+                    new UseLatestAggregationStrategy(),
+                    header(Exchange.AGGREGATED_COUNT).isEqualTo(3));
+
+                // our route is aggregating from the direct queue and sending the response to the mock
+                from("direct:start")
+                    // we use the collection based aggregator we already have configued
+                    .aggregator(ag)
+                    // wait for 2 seconds to aggregate
+                    .batchTimeout(2000L)
+                    .to("mock:result");
+                // END SNIPPET: e1
+            }
+        };
+    }
+}