svn commit: r702571 - in /activemq/camel/trunk/camel-core/src: main/java/org/apache/camel/processor/ test/java/org/apache/camel/processor/aggregator/

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

svn commit: r702571 - in /activemq/camel/trunk/camel-core/src: main/java/org/apache/camel/processor/ test/java/org/apache/camel/processor/aggregator/

davsclaus-2
Author: davsclaus
Date: Tue Oct  7 11:14:19 2008
New Revision: 702571

URL: http://svn.apache.org/viewvc?rev=702571&view=rev
Log:
CAMEL-951: Added unit test for custom aggregation collection. Fixed problem with logging the collection as it could cause a side effect of calling the .iterate() method on aggregation collection twice.

Added:
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationCollectionTest.java   (contents, props changed)
      - copied, changed from r702557, activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java
Modified:
    activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorBatchOptionsTest.java   (contents, props changed)
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AlbertoAggregatorTest.java
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java   (contents, props changed)
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DefaultAggregatorCollectionTest.java   (contents, props changed)
    activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/PredicateAggregatorCollectionTest.java   (contents, props changed)

Modified: activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java?rev=702571&r1=702570&r2=702571&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java (original)
+++ activemq/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java Tue Oct  7 11:14:19 2008
@@ -156,10 +156,9 @@
             collection.add(exchange);
         }
 
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Finished batch size: " + batchSize + " timeout: " + batchTimeout + " so sending set: "
-                      + collection);
-        }
+        // we should NOT log the collection directly as it will invoke a toString() on collection
+        // and it will call collection.iterator() where end-users might do stuff that would break
+        // calling the iterator a 2nd time as below
 
         // lets send the batch
         Iterator<Exchange> iter = collection.iterator();

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorBatchOptionsTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorBatchOptionsTest.java?rev=702571&r1=702570&r2=702571&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorBatchOptionsTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorBatchOptionsTest.java Tue Oct  7 11:14:19 2008
@@ -1,154 +1,154 @@
-package org.apache.camel.processor.aggregator;
-
-import org.apache.camel.ContextTestSupport;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-
-/**
- * Unit test for the batch size options on aggregator.
- */
-public class AggregatorBatchOptionsTest extends ContextTestSupport {
-
-    public boolean isUseRouteBuilder() {
-        return false;
-    }
-
-    public void testAggregateOutBatchSize() throws Exception {
-        context.addRoutes(new RouteBuilder() {
-            public void configure() throws Exception {
-                // START SNIPPET: e1
-                // our route is aggregating from the direct queue and sending the response to the mock
-                from("direct:start")
-                    // aggregated by header id
-                    // as we have not configured more on the aggregator it will default to aggregate the
-                    // latest exchange only
-                    .aggregator().header("id")
-                    // wait for 0.5 seconds to aggregate
-                    .batchTimeout(500L)
-                    // batch size in is the limit of number of exchanges recieved, so when we have received 100
-                    // exchanges then whatever we have in the collection will be sent
-                    .batchSize(100)
-                    // limit the out batch size to 3 so when we have aggregated 3 exchanges
-                    // and we reach this limit then the exchanges is send
-                    .outBatchSize(3)
-                    .to("mock:result");
-                // END SNIPPET: e1
-            }
-        });
-        startCamelContext();
-
-        // START SNIPPET: e2
-        MockEndpoint result = getMockEndpoint("mock:result");
-
-        // we expect 3 messages grouped by the latest message only
-        result.expectedMinimumMessageCount(3);
-        result.expectedBodiesReceived("Message 1c", "Message 2b", "Message 3a");
-
-        // then we sent all the message at once
-        template.sendBodyAndHeader("direct:start", "Message 1a", "id", "1");
-        template.sendBodyAndHeader("direct:start", "Message 2a", "id", "2");
-        template.sendBodyAndHeader("direct:start", "Message 1b", "id", "1");
-        template.sendBodyAndHeader("direct:start", "Message 2b", "id", "2");
-        template.sendBodyAndHeader("direct:start", "Message 1c", "id", "1");
-        template.sendBodyAndHeader("direct:start", "Message 3a", "id", "3");
-        // when we send message 4 then we will reach the collection batch size limit and the
-        // exchanges above is the ones we have aggregated in the first batch
-        template.sendBodyAndHeader("direct:start", "Message 4", "id", "4");
-        template.sendBodyAndHeader("direct:start", "Message 3b", "id", "3");
-        template.sendBodyAndHeader("direct:start", "Message 3c", "id", "3");
-        template.sendBodyAndHeader("direct:start", "Message 1d", "id", "1");
-
-        assertMockEndpointsSatisfied();
-        // END SNIPPET: e2
-    }
-
-    public void testAggregateBatchSize() throws Exception {
-        context.addRoutes(new RouteBuilder() {
-            public void configure() throws Exception {
-                // START SNIPPET: e3
-                // our route is aggregating from the direct queue and sending the response to the mock
-                from("direct:start")
-                    // aggregated by header id
-                    // as we have not configured more on the aggregator it will default to aggregate the
-                    // latest exchange only
-                    .aggregator().header("id")
-                    // wait for 0.5 seconds to aggregate
-                    .batchTimeout(500L)
-                    // batch size in is the limit of number of exchanges recieved, so when we have received 100
-                    // exchanges then whatever we have in the collection will be sent
-                    .batchSize(5)
-                    .to("mock:result");
-                // END SNIPPET: e3
-            }
-        });
-        startCamelContext();
-
-        // START SNIPPET: e4
-        MockEndpoint result = getMockEndpoint("mock:result");
-
-        // we expect 3 messages grouped by the latest message only
-        result.expectedMinimumMessageCount(2);
-        result.expectedBodiesReceived("Message 1c", "Message 2b");
-
-        // then we sent all the message at once
-        template.sendBodyAndHeader("direct:start", "Message 1a", "id", "1");
-        template.sendBodyAndHeader("direct:start", "Message 2a", "id", "2");
-        template.sendBodyAndHeader("direct:start", "Message 1b", "id", "1");
-        template.sendBodyAndHeader("direct:start", "Message 2b", "id", "2");
-        template.sendBodyAndHeader("direct:start", "Message 1c", "id", "1");
-        // when we sent the next message we have reached the in batch size limit and the current
-        // aggregated exchanges will be sent
-        template.sendBodyAndHeader("direct:start", "Message 3a", "id", "3");
-        template.sendBodyAndHeader("direct:start", "Message 4", "id", "4");
-        template.sendBodyAndHeader("direct:start", "Message 3b", "id", "3");
-        template.sendBodyAndHeader("direct:start", "Message 3c", "id", "3");
-        template.sendBodyAndHeader("direct:start", "Message 1d", "id", "1");
-
-        assertMockEndpointsSatisfied();
-        // END SNIPPET: e4
-    }
-
-    public void testAggregateBatchTimeout() throws Exception {
-        context.addRoutes(new RouteBuilder() {
-            public void configure() throws Exception {
-                // START SNIPPET: e5
-                // our route is aggregating from the direct queue and sending the response to the mock
-                from("direct:start")
-                    // aggregated by header id
-                    // as we have not configured more on the aggregator it will default to aggregate the
-                    // latest exchange only
-                    .aggregator().header("id")
-                    // wait for 0.5 seconds to aggregate
-                    .batchTimeout(500L)
-                    .to("mock:result");
-                // END SNIPPET: e5
-            }
-        });
-        startCamelContext();
-
-        // START SNIPPET: e6
-        MockEndpoint result = getMockEndpoint("mock:result");
-
-        // we expect 3 messages grouped by the latest message only
-        result.expectedMinimumMessageCount(3);
-        result.expectedBodiesReceived("Message 1c", "Message 2b", "Message 3a");
-
-        // then we sent all the message at once
-        template.sendBodyAndHeader("direct:start", "Message 1a", "id", "1");
-        template.sendBodyAndHeader("direct:start", "Message 2a", "id", "2");
-        template.sendBodyAndHeader("direct:start", "Message 1b", "id", "1");
-        template.sendBodyAndHeader("direct:start", "Message 2b", "id", "2");
-        template.sendBodyAndHeader("direct:start", "Message 1c", "id", "1");
-        template.sendBodyAndHeader("direct:start", "Message 3a", "id", "3");
-        Thread.sleep(600L);
-        // these messages are not aggregated as the timeout should have accoured
-        template.sendBodyAndHeader("direct:start", "Message 4", "id", "4");
-        template.sendBodyAndHeader("direct:start", "Message 3b", "id", "3");
-        template.sendBodyAndHeader("direct:start", "Message 3c", "id", "3");
-        template.sendBodyAndHeader("direct:start", "Message 1d", "id", "1");
-
-        assertMockEndpointsSatisfied();
-        // END SNIPPET: e6
-    }
-
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * Unit test for the batch size options on aggregator.
+ */
+public class AggregatorBatchOptionsTest extends ContextTestSupport {
+
+    public boolean isUseRouteBuilder() {
+        return false;
+    }
+
+    public void testAggregateOutBatchSize() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            public void configure() throws Exception {
+                // START SNIPPET: e1
+                // our route is aggregating from the direct queue and sending the response to the mock
+                from("direct:start")
+                    // aggregated by header id
+                    // as we have not configured more on the aggregator it will default to aggregate the
+                    // latest exchange only
+                    .aggregator().header("id")
+                    // wait for 0.5 seconds to aggregate
+                    .batchTimeout(500L)
+                    // batch size in is the limit of number of exchanges recieved, so when we have received 100
+                    // exchanges then whatever we have in the collection will be sent
+                    .batchSize(100)
+                    // limit the out batch size to 3 so when we have aggregated 3 exchanges
+                    // and we reach this limit then the exchanges is send
+                    .outBatchSize(3)
+                    .to("mock:result");
+                // END SNIPPET: e1
+            }
+        });
+        startCamelContext();
+
+        // START SNIPPET: e2
+        MockEndpoint result = getMockEndpoint("mock:result");
+
+        // we expect 3 messages grouped by the latest message only
+        result.expectedMinimumMessageCount(3);
+        result.expectedBodiesReceived("Message 1c", "Message 2b", "Message 3a");
+
+        // then we sent all the message at once
+        template.sendBodyAndHeader("direct:start", "Message 1a", "id", "1");
+        template.sendBodyAndHeader("direct:start", "Message 2a", "id", "2");
+        template.sendBodyAndHeader("direct:start", "Message 1b", "id", "1");
+        template.sendBodyAndHeader("direct:start", "Message 2b", "id", "2");
+        template.sendBodyAndHeader("direct:start", "Message 1c", "id", "1");
+        template.sendBodyAndHeader("direct:start", "Message 3a", "id", "3");
+        // when we send message 4 then we will reach the collection batch size limit and the
+        // exchanges above is the ones we have aggregated in the first batch
+        template.sendBodyAndHeader("direct:start", "Message 4", "id", "4");
+        template.sendBodyAndHeader("direct:start", "Message 3b", "id", "3");
+        template.sendBodyAndHeader("direct:start", "Message 3c", "id", "3");
+        template.sendBodyAndHeader("direct:start", "Message 1d", "id", "1");
+
+        assertMockEndpointsSatisfied();
+        // END SNIPPET: e2
+    }
+
+    public void testAggregateBatchSize() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            public void configure() throws Exception {
+                // START SNIPPET: e3
+                // our route is aggregating from the direct queue and sending the response to the mock
+                from("direct:start")
+                    // aggregated by header id
+                    // as we have not configured more on the aggregator it will default to aggregate the
+                    // latest exchange only
+                    .aggregator().header("id")
+                    // wait for 0.5 seconds to aggregate
+                    .batchTimeout(500L)
+                    // batch size in is the limit of number of exchanges recieved, so when we have received 100
+                    // exchanges then whatever we have in the collection will be sent
+                    .batchSize(5)
+                    .to("mock:result");
+                // END SNIPPET: e3
+            }
+        });
+        startCamelContext();
+
+        // START SNIPPET: e4
+        MockEndpoint result = getMockEndpoint("mock:result");
+
+        // we expect 3 messages grouped by the latest message only
+        result.expectedMinimumMessageCount(2);
+        result.expectedBodiesReceived("Message 1c", "Message 2b");
+
+        // then we sent all the message at once
+        template.sendBodyAndHeader("direct:start", "Message 1a", "id", "1");
+        template.sendBodyAndHeader("direct:start", "Message 2a", "id", "2");
+        template.sendBodyAndHeader("direct:start", "Message 1b", "id", "1");
+        template.sendBodyAndHeader("direct:start", "Message 2b", "id", "2");
+        template.sendBodyAndHeader("direct:start", "Message 1c", "id", "1");
+        // when we sent the next message we have reached the in batch size limit and the current
+        // aggregated exchanges will be sent
+        template.sendBodyAndHeader("direct:start", "Message 3a", "id", "3");
+        template.sendBodyAndHeader("direct:start", "Message 4", "id", "4");
+        template.sendBodyAndHeader("direct:start", "Message 3b", "id", "3");
+        template.sendBodyAndHeader("direct:start", "Message 3c", "id", "3");
+        template.sendBodyAndHeader("direct:start", "Message 1d", "id", "1");
+
+        assertMockEndpointsSatisfied();
+        // END SNIPPET: e4
+    }
+
+    public void testAggregateBatchTimeout() throws Exception {
+        context.addRoutes(new RouteBuilder() {
+            public void configure() throws Exception {
+                // START SNIPPET: e5
+                // our route is aggregating from the direct queue and sending the response to the mock
+                from("direct:start")
+                    // aggregated by header id
+                    // as we have not configured more on the aggregator it will default to aggregate the
+                    // latest exchange only
+                    .aggregator().header("id")
+                    // wait for 0.5 seconds to aggregate
+                    .batchTimeout(500L)
+                    .to("mock:result");
+                // END SNIPPET: e5
+            }
+        });
+        startCamelContext();
+
+        // START SNIPPET: e6
+        MockEndpoint result = getMockEndpoint("mock:result");
+
+        // we expect 3 messages grouped by the latest message only
+        result.expectedMinimumMessageCount(3);
+        result.expectedBodiesReceived("Message 1c", "Message 2b", "Message 3a");
+
+        // then we sent all the message at once
+        template.sendBodyAndHeader("direct:start", "Message 1a", "id", "1");
+        template.sendBodyAndHeader("direct:start", "Message 2a", "id", "2");
+        template.sendBodyAndHeader("direct:start", "Message 1b", "id", "1");
+        template.sendBodyAndHeader("direct:start", "Message 2b", "id", "2");
+        template.sendBodyAndHeader("direct:start", "Message 1c", "id", "1");
+        template.sendBodyAndHeader("direct:start", "Message 3a", "id", "3");
+        Thread.sleep(600L);
+        // these messages are not aggregated as the timeout should have accoured
+        template.sendBodyAndHeader("direct:start", "Message 4", "id", "4");
+        template.sendBodyAndHeader("direct:start", "Message 3b", "id", "3");
+        template.sendBodyAndHeader("direct:start", "Message 3c", "id", "3");
+        template.sendBodyAndHeader("direct:start", "Message 1d", "id", "1");
+
+        assertMockEndpointsSatisfied();
+        // END SNIPPET: e6
+    }
+
 }
\ No newline at end of file

Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorBatchOptionsTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorBatchOptionsTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AlbertoAggregatorTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AlbertoAggregatorTest.java?rev=702571&r1=702570&r2=702571&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AlbertoAggregatorTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AlbertoAggregatorTest.java Tue Oct  7 11:14:19 2008
@@ -187,7 +187,7 @@
                         from("direct:joinBrothers").aggregator(header(TYPE_HEADER),
                                 brothersAggregator);
 
-                agg.setBatchTimeout(5000L);
+                agg.setBatchTimeout(1000L);
                 agg.removeHeader(SURNAME_HEADER)
                         .removeHeader(TYPE_HEADER)
                         .to("mock:result");

Copied: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationCollectionTest.java (from r702557, activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java)
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationCollectionTest.java?p2=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationCollectionTest.java&p1=activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java&r1=702557&r2=702571&rev=702571&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationCollectionTest.java Tue Oct  7 11:14:19 2008
@@ -1,61 +1,106 @@
-package org.apache.camel.processor.aggregator;
-
-import org.apache.camel.ContextTestSupport;
-import org.apache.camel.Exchange;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.processor.aggregate.AggregationStrategy;
-
-/**
- * Unit test for using our own aggregation strategy.
- */
-public class CustomAggregationStrategyTest extends ContextTestSupport {
-
-    public void testCustomAggregationStrategy() throws Exception {
-        // START SNIPPET: e2
-        MockEndpoint result = getMockEndpoint("mock:result");
-
-        // we expect 4 messages as they have different header id
-        result.expectedMessageCount(2);
-        result.expectedBodiesReceived("200", "150");
-
-        // then we sent all the message at once
-        template.sendBodyAndHeader("direct:start", "100", "id", "1");
-        template.sendBodyAndHeader("direct:start", "150", "id", "2");
-        template.sendBodyAndHeader("direct:start", "130", "id", "2");
-        template.sendBodyAndHeader("direct:start", "200", "id", "1");
-        template.sendBodyAndHeader("direct:start", "190", "id", "1");
-
-        assertMockEndpointsSatisfied();
-        // END SNIPPET: e2
-    }
-
-    @Override
-    protected RouteBuilder createRouteBuilder() throws Exception {
-        return new RouteBuilder() {
-            public void configure() throws Exception {
-                // START SNIPPET: e1
-                // our route is aggregating from the direct queue and sending the response to the mock
-                from("direct:start")
-                    // aggregated by header id and use our own strategy how to aggregate
-                    .aggregator(new MyAggregationStrategy()).header("id")
-                    // wait for 2 seconds to aggregate
-                    .batchTimeout(2000L)
-                    .to("mock:result");
-                // END SNIPPET: e1
-            }
-        };
-    }
-
-    // START SNIPPET: e3
-    private static class MyAggregationStrategy implements AggregationStrategy {
-
-        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
-            int oldPrice = oldExchange.getIn().getBody(Integer.class);
-            int newPrice = newExchange.getIn().getBody(Integer.class);
-            // return the "winner" that has the highest price
-            return newPrice > oldPrice ? newExchange : oldExchange;
-        }
-    }
-    // END SNIPPET: e3
+package org.apache.camel.processor.aggregator;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.AbstractCollection;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Expression;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.camel.processor.aggregate.AggregationCollection;
+
+/**
+ * Unit test for using our own aggregation collection.
+ */
+public class CustomAggregationCollectionTest extends ContextTestSupport {
+
+    public void testCustomAggregationCollection() throws Exception {
+        // START SNIPPET: e2
+        MockEndpoint result = getMockEndpoint("mock:result");
+
+        // we expect 5 messages since our custom aggregation collection just gets it all
+        // but returns them in reverse order
+        result.expectedMessageCount(5);
+        result.expectedBodiesReceived("190", "200", "130", "150", "100");
+
+        // then we sent all the message at once
+        template.sendBodyAndHeader("direct:start", "100", "id", "1");
+        template.sendBodyAndHeader("direct:start", "150", "id", "2");
+        template.sendBodyAndHeader("direct:start", "130", "id", "2");
+        template.sendBodyAndHeader("direct:start", "200", "id", "1");
+        template.sendBodyAndHeader("direct:start", "190", "id", "1");
+
+        assertMockEndpointsSatisfied();
+        // END SNIPPET: e2
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                // START SNIPPET: e1
+                // our route is aggregating from the direct queue and sending the response to the mock
+                from("direct:start")
+                    // use our own collection for aggregation
+                    .aggregator(new MyReverseAggregationCollection())
+                    // wait for 0.5 seconds to aggregate
+                    .batchTimeout(500L)
+                    .to("mock:result");
+                // END SNIPPET: e1
+            }
+        };
+    }
+
+    // START SNIPPET: e3
+    private static class MyReverseAggregationCollection extends AbstractCollection<Exchange> implements AggregationCollection {
+
+        private List<Exchange> collection = new ArrayList<Exchange>();
+        private Expression<Exchange> correlation;
+        private AggregationStrategy strategy;
+
+        public Expression<Exchange> getCorrelationExpression() {
+            return correlation;
+        }
+
+        public void setCorrelationExpression(Expression<Exchange> correlationExpression) {
+            this.correlation = correlationExpression;
+        }
+
+        public AggregationStrategy getAggregationStrategy() {
+            return strategy;
+        }
+
+        public void setAggregationStrategy(AggregationStrategy aggregationStrategy) {
+            this.strategy = aggregationStrategy;
+        }
+
+        public boolean add(Exchange exchange) {
+            return collection.add(exchange);
+        }
+
+        public Iterator<Exchange> iterator() {
+            // demonstrate the we can do something with this collection, so we reverse it
+            Collections.reverse(collection);
+
+            return collection.iterator();
+        }
+
+        public int size() {
+            return collection.size();
+        }
+
+        public void clear() {
+            collection.clear();
+        }
+
+        public void onAggregation(Object correlationKey, Exchange newExchange) {
+            add(newExchange);
+        }
+    }
+    // END SNIPPET: e3
 }
\ No newline at end of file

Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationCollectionTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationCollectionTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationCollectionTest.java
------------------------------------------------------------------------------
    svn:mergeinfo =

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java?rev=702571&r1=702570&r2=702571&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java Tue Oct  7 11:14:19 2008
@@ -1,61 +1,61 @@
-package org.apache.camel.processor.aggregator;
-
-import org.apache.camel.ContextTestSupport;
-import org.apache.camel.Exchange;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.processor.aggregate.AggregationStrategy;
-
-/**
- * Unit test for using our own aggregation strategy.
- */
-public class CustomAggregationStrategyTest extends ContextTestSupport {
-
-    public void testCustomAggregationStrategy() throws Exception {
-        // START SNIPPET: e2
-        MockEndpoint result = getMockEndpoint("mock:result");
-
-        // we expect 4 messages as they have different header id
-        result.expectedMessageCount(2);
-        result.expectedBodiesReceived("200", "150");
-
-        // then we sent all the message at once
-        template.sendBodyAndHeader("direct:start", "100", "id", "1");
-        template.sendBodyAndHeader("direct:start", "150", "id", "2");
-        template.sendBodyAndHeader("direct:start", "130", "id", "2");
-        template.sendBodyAndHeader("direct:start", "200", "id", "1");
-        template.sendBodyAndHeader("direct:start", "190", "id", "1");
-
-        assertMockEndpointsSatisfied();
-        // END SNIPPET: e2
-    }
-
-    @Override
-    protected RouteBuilder createRouteBuilder() throws Exception {
-        return new RouteBuilder() {
-            public void configure() throws Exception {
-                // START SNIPPET: e1
-                // our route is aggregating from the direct queue and sending the response to the mock
-                from("direct:start")
-                    // aggregated by header id and use our own strategy how to aggregate
-                    .aggregator(new MyAggregationStrategy()).header("id")
-                    // wait for 2 seconds to aggregate
-                    .batchTimeout(2000L)
-                    .to("mock:result");
-                // END SNIPPET: e1
-            }
-        };
-    }
-
-    // START SNIPPET: e3
-    private static class MyAggregationStrategy implements AggregationStrategy {
-
-        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
-            int oldPrice = oldExchange.getIn().getBody(Integer.class);
-            int newPrice = newExchange.getIn().getBody(Integer.class);
-            // return the "winner" that has the highest price
-            return newPrice > oldPrice ? newExchange : oldExchange;
-        }
-    }
-    // END SNIPPET: e3
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+
+/**
+ * Unit test for using our own aggregation strategy.
+ */
+public class CustomAggregationStrategyTest extends ContextTestSupport {
+
+    public void testCustomAggregationStrategy() throws Exception {
+        // START SNIPPET: e2
+        MockEndpoint result = getMockEndpoint("mock:result");
+
+        // we expect to find the two winners with the highest bid
+        result.expectedMessageCount(2);
+        result.expectedBodiesReceived("200", "150");
+
+        // then we sent all the message at once
+        template.sendBodyAndHeader("direct:start", "100", "id", "1");
+        template.sendBodyAndHeader("direct:start", "150", "id", "2");
+        template.sendBodyAndHeader("direct:start", "130", "id", "2");
+        template.sendBodyAndHeader("direct:start", "200", "id", "1");
+        template.sendBodyAndHeader("direct:start", "190", "id", "1");
+
+        assertMockEndpointsSatisfied();
+        // END SNIPPET: e2
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                // START SNIPPET: e1
+                // our route is aggregating from the direct queue and sending the response to the mock
+                from("direct:start")
+                    // aggregated by header id and use our own strategy how to aggregate
+                    .aggregator(new MyAggregationStrategy()).header("id")
+                    // wait for 0.5 seconds to aggregate
+                    .batchTimeout(500L)
+                    .to("mock:result");
+                // END SNIPPET: e1
+            }
+        };
+    }
+
+    // START SNIPPET: e3
+    private static class MyAggregationStrategy implements AggregationStrategy {
+
+        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            int oldPrice = oldExchange.getIn().getBody(Integer.class);
+            int newPrice = newExchange.getIn().getBody(Integer.class);
+            // return the "winner" that has the highest price
+            return newPrice > oldPrice ? newExchange : oldExchange;
+        }
+    }
+    // END SNIPPET: e3
 }
\ No newline at end of file

Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DefaultAggregatorCollectionTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DefaultAggregatorCollectionTest.java?rev=702571&r1=702570&r2=702571&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DefaultAggregatorCollectionTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DefaultAggregatorCollectionTest.java Tue Oct  7 11:14:19 2008
@@ -1,54 +1,54 @@
-package org.apache.camel.processor.aggregator;
-
-import org.apache.camel.ContextTestSupport;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-
-/**
- * Unit test for DefaultAggregatorCollection.
- */
-public class DefaultAggregatorCollectionTest extends ContextTestSupport {
-
-    public void testDefaultAggregateCollection() throws Exception {
-        // START SNIPPET: e2
-        MockEndpoint result = getMockEndpoint("mock:result");
-
-        // we expect 4 messages grouped by the latest message only
-        result.expectedMessageCount(4);
-        result.expectedBodiesReceived("Message 1d", "Message 2b", "Message 3c", "Message 4");
-
-        // then we sent all the message at once
-        template.sendBodyAndHeader("direct:start", "Message 1a", "id", "1");
-        template.sendBodyAndHeader("direct:start", "Message 2a", "id", "2");
-        template.sendBodyAndHeader("direct:start", "Message 3a", "id", "3");
-        template.sendBodyAndHeader("direct:start", "Message 1b", "id", "1");
-        template.sendBodyAndHeader("direct:start", "Message 3b", "id", "3");
-        template.sendBodyAndHeader("direct:start", "Message 1c", "id", "1");
-        template.sendBodyAndHeader("direct:start", "Message 3c", "id", "3");
-        template.sendBodyAndHeader("direct:start", "Message 2b", "id", "2");
-        template.sendBodyAndHeader("direct:start", "Message 1d", "id", "1");
-        template.sendBodyAndHeader("direct:start", "Message 4", "id", "4");
-
-        assertMockEndpointsSatisfied();
-        // END SNIPPET: e2
-    }
-
-    @Override
-    protected RouteBuilder createRouteBuilder() throws Exception {
-        return new RouteBuilder() {
-            public void configure() throws Exception {
-                // START SNIPPET: e1
-                // our route is aggregating from the direct queue and sending the response to the mock
-                from("direct:start")
-                    // aggregated by header id
-                    // as we have not configured more on the aggregator it will default to aggregate the
-                    // latest exchange only
-                    .aggregator().header("id")
-                    // wait for 2 seconds to aggregate
-                    .batchTimeout(2000L)
-                    .to("mock:result");
-                // END SNIPPET: e1
-            }
-        };
-    }
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+/**
+ * Unit test for DefaultAggregatorCollection.
+ */
+public class DefaultAggregatorCollectionTest extends ContextTestSupport {
+
+    public void testDefaultAggregateCollection() throws Exception {
+        // START SNIPPET: e2
+        MockEndpoint result = getMockEndpoint("mock:result");
+
+        // we expect 4 messages grouped by the latest message only
+        result.expectedMessageCount(4);
+        result.expectedBodiesReceived("Message 1d", "Message 2b", "Message 3c", "Message 4");
+
+        // then we sent all the message at once
+        template.sendBodyAndHeader("direct:start", "Message 1a", "id", "1");
+        template.sendBodyAndHeader("direct:start", "Message 2a", "id", "2");
+        template.sendBodyAndHeader("direct:start", "Message 3a", "id", "3");
+        template.sendBodyAndHeader("direct:start", "Message 1b", "id", "1");
+        template.sendBodyAndHeader("direct:start", "Message 3b", "id", "3");
+        template.sendBodyAndHeader("direct:start", "Message 1c", "id", "1");
+        template.sendBodyAndHeader("direct:start", "Message 3c", "id", "3");
+        template.sendBodyAndHeader("direct:start", "Message 2b", "id", "2");
+        template.sendBodyAndHeader("direct:start", "Message 1d", "id", "1");
+        template.sendBodyAndHeader("direct:start", "Message 4", "id", "4");
+
+        assertMockEndpointsSatisfied();
+        // END SNIPPET: e2
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                // START SNIPPET: e1
+                // our route is aggregating from the direct queue and sending the response to the mock
+                from("direct:start")
+                    // aggregated by header id
+                    // as we have not configured more on the aggregator it will default to aggregate the
+                    // latest exchange only
+                    .aggregator().header("id")
+                    // wait for 0.5 seconds to aggregate
+                    .batchTimeout(500L)
+                    .to("mock:result");
+                // END SNIPPET: e1
+            }
+        };
+    }
 }
\ No newline at end of file

Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DefaultAggregatorCollectionTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/DefaultAggregatorCollectionTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/PredicateAggregatorCollectionTest.java
URL: http://svn.apache.org/viewvc/activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/PredicateAggregatorCollectionTest.java?rev=702571&r1=702570&r2=702571&view=diff
==============================================================================
--- activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/PredicateAggregatorCollectionTest.java (original)
+++ activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/PredicateAggregatorCollectionTest.java Tue Oct  7 11:14:19 2008
@@ -1,65 +1,65 @@
-package org.apache.camel.processor.aggregator;
-
-import org.apache.camel.ContextTestSupport;
-import org.apache.camel.Exchange;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.processor.aggregate.PredicateAggregationCollection;
-import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
-import org.apache.camel.processor.aggregate.AggregationCollection;
-
-/**
- * Unit test for PredicateAggregatorCollection.
- */
-public class PredicateAggregatorCollectionTest extends ContextTestSupport {
-
-    public void testPredicateAggregateCollection() throws Exception {
-        // START SNIPPET: e2
-        MockEndpoint result = getMockEndpoint("mock:result");
-
-        // we only expect two messages as they have reached the completed predicate
-        // that we want 3 messages that has the same header id
-        result.expectedMessageCount(2);
-        result.expectedBodiesReceived("Message 1c", "Message 3c");
-
-        // then we sent all the message at once
-        template.sendBodyAndHeader("direct:start", "Message 1a", "id", "1");
-        template.sendBodyAndHeader("direct:start", "Message 2a", "id", "2");
-        template.sendBodyAndHeader("direct:start", "Message 3a", "id", "3");
-        template.sendBodyAndHeader("direct:start", "Message 1b", "id", "1");
-        template.sendBodyAndHeader("direct:start", "Message 3b", "id", "3");
-        template.sendBodyAndHeader("direct:start", "Message 1c", "id", "1");
-        template.sendBodyAndHeader("direct:start", "Message 3c", "id", "3");
-        template.sendBodyAndHeader("direct:start", "Message 2b", "id", "2");
-        template.sendBodyAndHeader("direct:start", "Message 1d", "id", "1");
-        template.sendBodyAndHeader("direct:start", "Message 4", "id", "4");
-
-        assertMockEndpointsSatisfied();
-        // END SNIPPET: e2
-    }
-
-    @Override
-    protected RouteBuilder createRouteBuilder() throws Exception {
-        return new RouteBuilder() {
-            public void configure() throws Exception {
-                // START SNIPPET: e1
-                // create the aggregation collection we will use.
-                // - we will correlate the recieved message based on the id header
-                // - as we will just keep the latest message we use the latest strategy
-                // - and finally we stop aggregate if we recieve 2 or more messages
-                AggregationCollection ag = new PredicateAggregationCollection(header("id"),
-                    new UseLatestAggregationStrategy(),
-                    header(Exchange.AGGREGATED_COUNT).isEqualTo(3));
-
-                // our route is aggregating from the direct queue and sending the response to the mock
-                from("direct:start")
-                    // we use the collection based aggregator we already have configued
-                    .aggregator(ag)
-                    // wait for 2 seconds to aggregate
-                    .batchTimeout(2000L)
-                    .to("mock:result");
-                // END SNIPPET: e1
-            }
-        };
-    }
-}
+package org.apache.camel.processor.aggregator;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.PredicateAggregationCollection;
+import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
+import org.apache.camel.processor.aggregate.AggregationCollection;
+
+/**
+ * Unit test for PredicateAggregatorCollection.
+ */
+public class PredicateAggregatorCollectionTest extends ContextTestSupport {
+
+    public void testPredicateAggregateCollection() throws Exception {
+        // START SNIPPET: e2
+        MockEndpoint result = getMockEndpoint("mock:result");
+
+        // we only expect two messages as they have reached the completed predicate
+        // that we want 3 messages that has the same header id
+        result.expectedMessageCount(2);
+        result.expectedBodiesReceived("Message 1c", "Message 3c");
+
+        // then we sent all the message at once
+        template.sendBodyAndHeader("direct:start", "Message 1a", "id", "1");
+        template.sendBodyAndHeader("direct:start", "Message 2a", "id", "2");
+        template.sendBodyAndHeader("direct:start", "Message 3a", "id", "3");
+        template.sendBodyAndHeader("direct:start", "Message 1b", "id", "1");
+        template.sendBodyAndHeader("direct:start", "Message 3b", "id", "3");
+        template.sendBodyAndHeader("direct:start", "Message 1c", "id", "1");
+        template.sendBodyAndHeader("direct:start", "Message 3c", "id", "3");
+        template.sendBodyAndHeader("direct:start", "Message 2b", "id", "2");
+        template.sendBodyAndHeader("direct:start", "Message 1d", "id", "1");
+        template.sendBodyAndHeader("direct:start", "Message 4", "id", "4");
+
+        assertMockEndpointsSatisfied();
+        // END SNIPPET: e2
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                // START SNIPPET: e1
+                // create the aggregation collection we will use.
+                // - we will correlate the recieved message based on the id header
+                // - as we will just keep the latest message we use the latest strategy
+                // - and finally we stop aggregate if we recieve 2 or more messages
+                AggregationCollection ag = new PredicateAggregationCollection(header("id"),
+                    new UseLatestAggregationStrategy(),
+                    header(Exchange.AGGREGATED_COUNT).isEqualTo(3));
+
+                // our route is aggregating from the direct queue and sending the response to the mock
+                from("direct:start")
+                    // we use the collection based aggregator we already have configued
+                    .aggregator(ag)
+                    // wait for 0.5 seconds to aggregate
+                    .batchTimeout(500L)
+                    .to("mock:result");
+                // END SNIPPET: e1
+            }
+        };
+    }
+}

Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/PredicateAggregatorCollectionTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/PredicateAggregatorCollectionTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date