svn commit: r935492 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/model/ camel-core/src/main/java/org/apache/camel/processor/aggregate/ camel-core/src/main/java/org/apache/camel/spi/ camel-core/src/test/java/org/apache/camel/processor/ag...

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

svn commit: r935492 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/model/ camel-core/src/main/java/org/apache/camel/processor/aggregate/ camel-core/src/main/java/org/apache/camel/spi/ camel-core/src/test/java/org/apache/camel/processor/ag...

davsclaus-2
Author: davsclaus
Date: Mon Apr 19 09:33:55 2010
New Revision: 935492

URL: http://svn.apache.org/viewvc?rev=935492&view=rev
Log:
CAMEL-2656: Added completionInterval to Aggregator.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionIntervalTest.java
      - copied, changed from r935469, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutOnlyTest.java
    camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateCompletionIntervalTest.java
      - copied, changed from r935469, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.java
    camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateCompletionIntervalTest.xml
      - copied, changed from r935469, camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.xml
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/MemoryAggregationRepository.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/spi/AggregationRepository.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java?rev=935492&r1=935491&r2=935492&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/AggregateDefinition.java Mon Apr 19 09:33:55 2010
@@ -77,6 +77,8 @@ public class AggregateDefinition extends
     @XmlAttribute
     private Integer completionSize;
     @XmlAttribute
+    private Long completionInterval;
+    @XmlAttribute
     private Long completionTimeout;
     @XmlAttribute
     private Boolean completionFromBatchConsumer;
@@ -180,6 +182,9 @@ public class AggregateDefinition extends
         if (getCompletionTimeout() != null) {
             answer.setCompletionTimeout(getCompletionTimeout());
         }
+        if (getCompletionInterval() != null) {
+            answer.setCompletionInterval(getCompletionInterval());
+        }
         if (getCompletionSizeExpression() != null) {
             Expression expression = getCompletionSizeExpression().createExpression(routeContext);
             answer.setCompletionSizeExpression(expression);
@@ -258,6 +263,14 @@ public class AggregateDefinition extends
         this.completionSize = completionSize;
     }
 
+    public Long getCompletionInterval() {
+        return completionInterval;
+    }
+
+    public void setCompletionInterval(Long completionInterval) {
+        this.completionInterval = completionInterval;
+    }
+
     public Long getCompletionTimeout() {
         return completionTimeout;
     }
@@ -454,6 +467,18 @@ public class AggregateDefinition extends
     }
 
     /**
+     * Sets the completion interval, which would cause the aggregate to consider the group as complete
+     * and send out the aggregated exchange.
+     *
+     * @param completionInterval  the interval in millis
+     * @return the builder
+     */
+    public AggregateDefinition completionInterval(long completionInterval) {
+        setCompletionInterval(completionInterval);
+        return this;
+    }
+
+    /**
      * Sets the completion timeout, which would cause the aggregate to consider the group as complete
      * and send out the aggregated exchange.
      *

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java?rev=935492&r1=935491&r2=935492&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java Mon Apr 19 09:33:55 2010
@@ -109,6 +109,7 @@ public class AggregateProcessor extends
     private Predicate completionPredicate;
     private long completionTimeout;
     private Expression completionTimeoutExpression;
+    private long completionInterval;
     private int completionSize;
     private Expression completionSizeExpression;
     private boolean completionFromBatchConsumer;
@@ -420,6 +421,14 @@ public class AggregateProcessor extends
         this.completionTimeoutExpression = completionTimeoutExpression;
     }
 
+    public long getCompletionInterval() {
+        return completionInterval;
+    }
+
+    public void setCompletionInterval(long completionInterval) {
+        this.completionInterval = completionInterval;
+    }
+
     public int getCompletionSize() {
         return completionSize;
     }
@@ -568,6 +577,46 @@ public class AggregateProcessor extends
     }
 
     /**
+     * Background task that triggers completion based on interval.
+     */
+    private final class AggregationIntervalTask implements Runnable {
+
+        public void run() {
+            // only run if CamelContext has been fully started
+            if (!camelContext.getStatus().isStarted()) {
+                if (LOG.isTraceEnabled()) {
+                    LOG.trace("Completion interval task cannot start due CamelContext(" + camelContext.getName() + ") has not been started yet");
+                }
+                return;
+            }
+
+            LOG.trace("Starting completion interval task");
+
+            // trigger completion for all in the repository
+            try {
+                lock.lock();
+
+                Set<String> keys = aggregationRepository.getKeys();
+                for (String key : keys) {
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("Completion interval triggered for correlation key: " + key);
+                    }
+                    Exchange exchange = aggregationRepository.get(camelContext, key);
+
+                    // indicate it was completed by interval
+                    exchange.setProperty(Exchange.AGGREGATED_COMPLETED_BY, "interval");
+
+                    onCompletion(key, exchange, false);
+                }
+            } finally {
+                lock.unlock();
+            }
+
+            LOG.trace("Completion interval task complete");
+        }
+    }
+
+    /**
      * Background task that looks for aggregated exchanges to recover.
      */
     private final class RecoverTask implements Runnable {
@@ -670,11 +719,11 @@ public class AggregateProcessor extends
 
     @Override
     protected void doStart() throws Exception {
-        if (getCompletionTimeout() <= 0 && getCompletionSize() <= 0 && getCompletionPredicate() == null
+        if (getCompletionTimeout() <= 0 && getCompletionInterval() <= 0 && getCompletionSize() <= 0 && getCompletionPredicate() == null
                 && !isCompletionFromBatchConsumer() && getCompletionTimeoutExpression() == null
                 && getCompletionSizeExpression() == null) {
             throw new IllegalStateException("At least one of the completions options"
-                    + " [completionTimeout, completionSize, completionPredicate, completionFromBatchConsumer] must be set");
+                    + " [completionTimeout, completionInterval, completionSize, completionPredicate, completionFromBatchConsumer] must be set");
         }
 
         if (getCloseCorrelationKeyOnCompletion() != null) {
@@ -717,8 +766,19 @@ public class AggregateProcessor extends
             }
         }
 
+        if (getCompletionInterval() > 0 && getCompletionTimeout() > 0) {
+            throw new IllegalArgumentException("Only one of completionInterval or completionTimeout can be used, not both.");
+        }
+        if (getCompletionInterval() > 0) {
+            LOG.info("Using CompletionInterval to run every " + getCompletionInterval() + " millis.");
+            ScheduledExecutorService scheduler = camelContext.getExecutorServiceStrategy().newScheduledThreadPool(this, "AggregateTimeoutChecker", 1);
+            // trigger completion based on interval
+            scheduler.scheduleAtFixedRate(new AggregationIntervalTask(), 1000L, getCompletionInterval(), TimeUnit.MILLISECONDS);
+        }
+
         // start timeout service if its in use
         if (getCompletionTimeout() > 0 || getCompletionTimeoutExpression() != null) {
+            LOG.info("Using CompletionTimeout to trigger after " + getCompletionInterval() + " millis of inactivity.");
             ScheduledExecutorService scheduler = camelContext.getExecutorServiceStrategy().newScheduledThreadPool(this, "AggregateTimeoutChecker", 1);
             // check for timed out aggregated messages once every second
             timeoutMap = new AggregationTimeoutMap(scheduler, 1000L);

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/MemoryAggregationRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/MemoryAggregationRepository.java?rev=935492&r1=935491&r2=935492&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/MemoryAggregationRepository.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/aggregate/MemoryAggregationRepository.java Mon Apr 19 09:33:55 2010
@@ -16,7 +16,9 @@
  */
 package org.apache.camel.processor.aggregate;
 
+import java.util.Collections;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.camel.CamelContext;
@@ -48,6 +50,11 @@ public class MemoryAggregationRepository
         // noop
     }
 
+    public Set<String> getKeys() {
+        // do not allow edits to the set
+        return Collections.unmodifiableSet(cache.keySet());
+    }
+
     @Override
     protected void doStart() throws Exception {
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/AggregationRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/AggregationRepository.java?rev=935492&r1=935491&r2=935492&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/AggregationRepository.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/AggregationRepository.java Mon Apr 19 09:33:55 2010
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.spi;
 
+import java.util.Set;
+
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 
@@ -65,4 +67,11 @@ public interface AggregationRepository {
      */
     void confirm(CamelContext camelContext, String exchangeId);
 
+    /**
+     * Gets the keys currently in the repository.
+     *
+     * @return the keys
+     */
+    Set<String> getKeys();
+
 }

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionIntervalTest.java (from r935469, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutOnlyTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionIntervalTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionIntervalTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutOnlyTest.java&r1=935469&r2=935492&rev=935492&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateTimeoutOnlyTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionIntervalTest.java Mon Apr 19 09:33:55 2010
@@ -22,19 +22,20 @@ import org.apache.camel.component.mock.M
 import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
 
 /**
- * Unit test to verify that aggregate by timeout only also works.
- *
+ * Unit test to verify that aggregate by interval only also works.
+ *
  * @version $Revision$
  */
-public class AggregateTimeoutOnlyTest extends ContextTestSupport {
+public class AggregateCompletionIntervalTest extends ContextTestSupport {
 
-    public void testAggregateTimeoutOnly() throws Exception {
+    public void testAggregateInterval() throws Exception {
         MockEndpoint result = getMockEndpoint("mock:result");
         // by default the use latest aggregation strategy is used so we get message 9
         result.expectedBodiesReceived("Message 9");
-        // should take 3 seconds to complete this one
-        result.setMinimumResultWaitTime(2500);
 
+        // ensure messages are send after the 1s
+        Thread.sleep(2000);
+        
         for (int i = 0; i < 10; i++) {
             template.sendBodyAndHeader("direct:start", "Message " + i, "id", "1");
         }
@@ -49,11 +50,12 @@ public class AggregateTimeoutOnlyTest ex
             public void configure() throws Exception {
                 // START SNIPPET: e1
                 from("direct:start")
-                    // aggregate every 3th second and disable the batch size so we set it to 0
-                    .aggregate(header("id"), new UseLatestAggregationStrategy()).completionTimeout(3000).completionSize(0)
+                    .aggregate(header("id"), new UseLatestAggregationStrategy())
+                        // trigger completion every 5th second
+                        .completionInterval(5000)
                     .to("mock:result");
                 // END SNIPPET: e1
             }
         };
     }
-}
+}
\ No newline at end of file

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java?rev=935492&r1=935491&r2=935492&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateProcessorTest.java Mon Apr 19 09:33:55 2010
@@ -236,6 +236,50 @@ public class AggregateProcessorTest exte
         ap.stop();
     }
 
+    public void testAggregateCompletionInterval() throws Exception {
+        // camel context must be started
+        context.start();
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("A+B+C", "D");
+        mock.expectedPropertyReceived(Exchange.AGGREGATED_COMPLETED_BY, "interval");
+
+        Processor done = new SendProcessor(context.getEndpoint("mock:result"));
+        Expression corr = header("id");
+        AggregationStrategy as = new BodyInAggregatingStrategy();
+
+        AggregateProcessor ap = new AggregateProcessor(context, done, corr, as, executorService);
+        ap.setCompletionInterval(3000);
+        ap.start();
+
+        Exchange e1 = new DefaultExchange(context);
+        e1.getIn().setBody("A");
+        e1.getIn().setHeader("id", 123);
+
+        Exchange e2 = new DefaultExchange(context);
+        e2.getIn().setBody("B");
+        e2.getIn().setHeader("id", 123);
+
+        Exchange e3 = new DefaultExchange(context);
+        e3.getIn().setBody("C");
+        e3.getIn().setHeader("id", 123);
+
+        Exchange e4 = new DefaultExchange(context);
+        e4.getIn().setBody("D");
+        e4.getIn().setHeader("id", 123);
+
+        ap.process(e1);
+        ap.process(e2);
+        ap.process(e3);
+
+        Thread.sleep(5000);
+        ap.process(e4);
+
+        assertMockEndpointsSatisfied();
+
+        ap.stop();
+    }
+
     public void testAggregateIgnoreInvalidCorrelationKey() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedBodiesReceived("A+C+END");

Copied: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateCompletionIntervalTest.java (from r935469, camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateCompletionIntervalTest.java?p2=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateCompletionIntervalTest.java&p1=camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.java&r1=935469&r2=935492&rev=935492&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.java (original)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/aggregator/SpringAggregateCompletionIntervalTest.java Mon Apr 19 09:33:55 2010
@@ -17,17 +17,17 @@
 package org.apache.camel.spring.processor.aggregator;
 
 import org.apache.camel.CamelContext;
-import org.apache.camel.processor.aggregator.AggregateSimpleTimeoutTest;
+import org.apache.camel.processor.aggregator.AggregateCompletionIntervalTest;
 
 import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
 
 /**
  * @version $Revision$
  */
-public class SpringAggregateSimpleTimeoutTest extends AggregateSimpleTimeoutTest {
+public class SpringAggregateCompletionIntervalTest extends AggregateCompletionIntervalTest {
 
     protected CamelContext createCamelContext() throws Exception {
-        return createSpringCamelContext(this, "org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.xml");
+        return createSpringCamelContext(this, "org/apache/camel/spring/processor/aggregator/SpringAggregateCompletionIntervalTest.xml");
     }
 
-}
+}
\ No newline at end of file

Copied: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateCompletionIntervalTest.xml (from r935469, camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.xml)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateCompletionIntervalTest.xml?p2=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateCompletionIntervalTest.xml&p1=camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.xml&r1=935469&r2=935492&rev=935492&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateSimpleTimeoutTest.xml (original)
+++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/aggregator/SpringAggregateCompletionIntervalTest.xml Mon Apr 19 09:33:55 2010
@@ -26,16 +26,17 @@
     <camelContext id="camel" xmlns="http://camel.apache.org/schema/spring">
         <route>
             <from uri="direct:start"/>
-            <aggregate strategyRef="aggregatorStrategy" completionTimeout="3000">
+            <!-- trigger completion of all current aggregated exchanges every 5th second -->
+            <aggregate strategyRef="aggregatorStrategy" completionInterval="5000">
                 <correlationExpression>
                     <simple>header.id</simple>
                 </correlationExpression>
-                <to uri="mock:aggregated"/>
+                <to uri="mock:result"/>
             </aggregate>
         </route>
     </camelContext>
 
-    <bean id="aggregatorStrategy" class="org.apache.camel.processor.BodyInAggregatingStrategy"/>
+    <bean id="aggregatorStrategy" class="org.apache.camel.processor.aggregate.UseLatestAggregationStrategy"/>
     <!-- END SNIPPET: e1 -->
 
 </beans>