svn commit: r935508 - in /camel/trunk/components/camel-hawtdb/src: main/java/org/apache/camel/component/hawtdb/ test/java/org/apache/camel/component/hawtdb/

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

svn commit: r935508 - in /camel/trunk/components/camel-hawtdb/src: main/java/org/apache/camel/component/hawtdb/ test/java/org/apache/camel/component/hawtdb/

davsclaus-2
Author: davsclaus
Date: Mon Apr 19 10:04:38 2010
New Revision: 935508

URL: http://svn.apache.org/viewvc?rev=935508&view=rev
Log:
CAMEL-2656: Added completionInterval to Aggregator. And implemented this in camel-hawtdb.

Added:
    camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateCompletionIntervalTest.java
      - copied, changed from r935469, camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateTest.java
Modified:
    camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java
    camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelMarshaller.java
    camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostRemovedWhenConfirmedTest.java
    camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostTest.java

Modified: camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java?rev=935508&r1=935507&r2=935508&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java (original)
+++ camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBAggregationRepository.java Mon Apr 19 10:04:38 2010
@@ -18,8 +18,10 @@ package org.apache.camel.component.hawtd
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -33,6 +35,7 @@ import org.apache.camel.util.ServiceHelp
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.fusesource.hawtdb.api.Index;
+import org.fusesource.hawtdb.api.IndexVisitor;
 import org.fusesource.hawtdb.api.Transaction;
 import org.fusesource.hawtdb.util.buffer.Buffer;
 
@@ -48,7 +51,7 @@ public class HawtDBAggregationRepository
     private Integer bufferSize;
     private boolean sync = true;
     private boolean returnOldExchange;
-    private HawtDBCamelMarshaller<String> marshaller = new HawtDBCamelMarshaller<String>();
+    private HawtDBCamelMarshaller marshaller = new HawtDBCamelMarshaller();
     private long recoveryInterval = 5000;
     private boolean useRecovery = true;
     private int maximumRedeliveries;
@@ -74,7 +77,7 @@ public class HawtDBAggregationRepository
      * Creates an aggregation repository using a new {@link org.apache.camel.component.hawtdb.HawtDBFile}
      * that persists using the provided file.
      *
-     * @param repositoryName the repository name
+     * @param repositoryName     the repository name
      * @param persistentFileName the persistent store filename
      */
     public HawtDBAggregationRepository(String repositoryName, String persistentFileName) {
@@ -88,7 +91,7 @@ public class HawtDBAggregationRepository
      * Creates an aggregation repository using the provided {@link org.apache.camel.component.hawtdb.HawtDBFile}.
      *
      * @param repositoryName the repository name
-     * @param hawtDBFile the hawtdb file to use as persistent store
+     * @param hawtDBFile     the hawtdb file to use as persistent store
      */
     public HawtDBAggregationRepository(String repositoryName, HawtDBFile hawtDBFile) {
         ObjectHelper.notEmpty(repositoryName, "repositoryName");
@@ -169,7 +172,7 @@ public class HawtDBAggregationRepository
         }
         try {
             final Buffer keyBuffer = marshaller.marshallKey(key);
-            final Buffer confirmKeyBuffer = marshaller.marshallConfirmKey(exchange.getExchangeId());
+            final Buffer confirmKeyBuffer = marshaller.marshallKey(exchange.getExchangeId());
             final Buffer exchangeBuffer = marshaller.marshallExchange(camelContext, exchange);
             hawtDBFile.execute(new Work<Buffer>() {
                 public Buffer execute(Transaction tx) {
@@ -199,7 +202,7 @@ public class HawtDBAggregationRepository
             LOG.debug("Confirming exchangeId [" + exchangeId + "]");
         }
         try {
-            final Buffer confirmKeyBuffer = marshaller.marshallConfirmKey(exchangeId);
+            final Buffer confirmKeyBuffer = marshaller.marshallKey(exchangeId);
             hawtDBFile.execute(new Work<Buffer>() {
                 public Buffer execute(Transaction tx) {
                     Index<Buffer, Buffer> indexCompleted = hawtDBFile.getRepositoryIndex(tx, getRepositoryNameCompleted());
@@ -217,6 +220,50 @@ public class HawtDBAggregationRepository
         }
     }
 
+    public Set<String> getKeys() {
+        final Set<String> keys = new LinkedHashSet<String>();
+
+        hawtDBFile.execute(new Work<Buffer>() {
+            public Buffer execute(Transaction tx) {
+                // interval task could potentially be running while we are shutting down so check for that
+                if (!isRunAllowed()) {
+                    return null;
+                }
+
+                Index<Buffer, Buffer> indexCompleted = hawtDBFile.getRepositoryIndex(tx, getRepositoryName());
+
+                Iterator<Map.Entry<Buffer, Buffer>> it = indexCompleted.iterator();
+                // scan could potentially be running while we are shutting down so check for that
+                while (it.hasNext() && isRunAllowed()) {
+                    Map.Entry<Buffer, Buffer> entry = it.next();
+                    Buffer keyBuffer = entry.getKey();
+
+                    String key;
+                    try {
+                        key  = marshaller.unmarshallKey(keyBuffer);
+                    } catch (IOException e) {
+                        throw new RuntimeException("Error unmarshalling key: " + keyBuffer, e);
+                    }
+                    if (key != null) {
+                        if (LOG.isTraceEnabled()) {
+                            LOG.trace("getKey [" + key + "]");
+                        }
+                        keys.add(key);
+                    }
+                }
+                return null;
+
+            }
+
+            @Override
+            public String toString() {
+                return "getKeys";
+            }
+        });
+
+        return Collections.unmodifiableSet(keys);
+    }
+
     public Set<String> scan(CamelContext camelContext) {
         final Set<String> answer = new LinkedHashSet<String>();
         hawtDBFile.execute(new Work<Buffer>() {
@@ -236,11 +283,14 @@ public class HawtDBAggregationRepository
 
                     String exchangeId;
                     try {
-                        exchangeId = marshaller.unmarshallConfirmKey(keyBuffer);
+                        exchangeId = marshaller.unmarshallKey(keyBuffer);
                     } catch (IOException e) {
                         throw new RuntimeException("Error unmarshalling confirm key: " + keyBuffer, e);
                     }
                     if (exchangeId != null) {
+                        if (LOG.isTraceEnabled()) {
+                            LOG.trace("Scan exchangeId [" + exchangeId + "]");
+                        }
                         answer.add(exchangeId);
                     }
                 }
@@ -267,7 +317,7 @@ public class HawtDBAggregationRepository
     public Exchange recover(CamelContext camelContext, final String exchangeId) {
         Exchange answer = null;
         try {
-            final Buffer confirmKeyBuffer = marshaller.marshallConfirmKey(exchangeId);
+            final Buffer confirmKeyBuffer = marshaller.marshallKey(exchangeId);
             Buffer rc = hawtDBFile.execute(new Work<Buffer>() {
                 public Buffer execute(Transaction tx) {
                     Index<Buffer, Buffer> indexCompleted = hawtDBFile.getRepositoryIndex(tx, getRepositoryNameCompleted());

Modified: camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelMarshaller.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelMarshaller.java?rev=935508&r1=935507&r2=935508&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelMarshaller.java (original)
+++ camel/trunk/components/camel-hawtdb/src/main/java/org/apache/camel/component/hawtdb/HawtDBCamelMarshaller.java Mon Apr 19 10:04:38 2010
@@ -34,27 +34,20 @@ import org.fusesource.hawtdb.util.marsha
 /**
  * @version $Revision$
  */
-public final class HawtDBCamelMarshaller<K> {
+public final class HawtDBCamelMarshaller {
 
-    private Marshaller<K> keyMarshaller = new ObjectMarshaller<K>();
-    private Marshaller<String> confirmKeyMarshaller = new StringMarshaller();
+    private Marshaller<String> keyMarshaller = new StringMarshaller();
     private Marshaller<DefaultExchangeHolder> exchangeMarshaller = new ObjectMarshaller<DefaultExchangeHolder>();
 
-    public Buffer marshallKey(K key) throws IOException {
+    public Buffer marshallKey(String key) throws IOException {
         DataByteArrayOutputStream baos = new DataByteArrayOutputStream();
         keyMarshaller.writePayload(key, baos);
         return baos.toBuffer();
     }
 
-    public Buffer marshallConfirmKey(String exchangeId) throws IOException {
-        DataByteArrayOutputStream baos = new DataByteArrayOutputStream();
-        confirmKeyMarshaller.writePayload(exchangeId, baos);
-        return baos.toBuffer();
-    }
-
-    public String unmarshallConfirmKey(Buffer buffer) throws IOException {
+    public String unmarshallKey(Buffer buffer) throws IOException {
         DataByteArrayInputStream bais = new DataByteArrayInputStream(buffer);
-        String key = confirmKeyMarshaller.readPayload(bais);
+        String key = keyMarshaller.readPayload(bais);
         return key;
     }
 

Copied: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateCompletionIntervalTest.java (from r935469, camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateCompletionIntervalTest.java?p2=camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateCompletionIntervalTest.java&p1=camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateTest.java&r1=935469&r2=935508&rev=935508&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateTest.java (original)
+++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateCompletionIntervalTest.java Mon Apr 19 10:04:38 2010
@@ -23,7 +23,7 @@ import org.apache.camel.processor.aggreg
 import org.apache.camel.test.junit4.CamelTestSupport;
 import org.junit.Test;
 
-public class HawtDBAggregateTest extends CamelTestSupport {
+public class HawtDBAggregateCompletionIntervalTest extends CamelTestSupport {
 
     @Override
     public void setUp() throws Exception {
@@ -32,14 +32,20 @@ public class HawtDBAggregateTest extends
     }
 
     @Test
-    public void testHawtDBAggregate() throws Exception {
+    public void testHawtDBAggregateCompletionInterval() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:aggregated");
-        mock.expectedBodiesReceived("ABCDE");
+        mock.expectedBodiesReceived("ABCD", "E");
+
+        // wait a bit so we complete on the next poll
+        Thread.sleep(2000);
 
         template.sendBodyAndHeader("direct:start", "A", "id", 123);
         template.sendBodyAndHeader("direct:start", "B", "id", 123);
         template.sendBodyAndHeader("direct:start", "C", "id", 123);
         template.sendBodyAndHeader("direct:start", "D", "id", 123);
+
+        Thread.sleep(6000);
+
         template.sendBodyAndHeader("direct:start", "E", "id", 123);
 
         assertMockEndpointsSatisfied();
@@ -60,8 +66,8 @@ public class HawtDBAggregateTest extends
                 // here is the Camel route where we aggregate
                 from("direct:start")
                     .aggregate(header("id"), new MyAggregationStrategy())
-                        // use our created hawtdb repo as aggregation repository
-                        .completionSize(5).aggregationRepository(repo)
+                        // complete every 5th seconds
+                        .completionInterval(5000).aggregationRepository(repo)
                         .to("mock:aggregated");
             }
             // END SNIPPET: e1

Modified: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostRemovedWhenConfirmedTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostRemovedWhenConfirmedTest.java?rev=935508&r1=935507&r2=935508&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostRemovedWhenConfirmedTest.java (original)
+++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostRemovedWhenConfirmedTest.java Mon Apr 19 10:04:38 2010
@@ -52,8 +52,8 @@ public class HawtDBAggregateNotLostRemov
 
         // the exchange should NOT be in the completed repo as it was confirmed
         final HawtDBFile hawtDBFile = repo.getHawtDBFile();
-        final HawtDBCamelMarshaller<Object> marshaller = new HawtDBCamelMarshaller<Object>();
-        final Buffer confirmKeyBuffer = marshaller.marshallConfirmKey(exchangeId);
+        final HawtDBCamelMarshaller marshaller = new HawtDBCamelMarshaller();
+        final Buffer confirmKeyBuffer = marshaller.marshallKey(exchangeId);
         Buffer bf = hawtDBFile.execute(new Work<Buffer>() {
             public Buffer execute(Transaction tx) {
                 Index<Buffer, Buffer> index = hawtDBFile.getRepositoryIndex(tx, "repo1-completed");

Modified: camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostTest.java?rev=935508&r1=935507&r2=935508&view=diff
==============================================================================
--- camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostTest.java (original)
+++ camel/trunk/components/camel-hawtdb/src/test/java/org/apache/camel/component/hawtdb/HawtDBAggregateNotLostTest.java Mon Apr 19 10:04:38 2010
@@ -53,8 +53,8 @@ public class HawtDBAggregateNotLostTest
 
         // the exchange should be in the completed repo where we should be able to find it
         final HawtDBFile hawtDBFile = repo.getHawtDBFile();
-        final HawtDBCamelMarshaller<Object> marshaller = new HawtDBCamelMarshaller<Object>();
-        final Buffer confirmKeyBuffer = marshaller.marshallConfirmKey(exchangeId);
+        final HawtDBCamelMarshaller marshaller = new HawtDBCamelMarshaller();
+        final Buffer confirmKeyBuffer = marshaller.marshallKey(exchangeId);
         Buffer bf = hawtDBFile.execute(new Work<Buffer>() {
             public Buffer execute(Transaction tx) {
                 Index<Buffer, Buffer> index = hawtDBFile.getRepositoryIndex(tx, "repo1-completed");