svn commit: r1057133 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/processor/ main/java/org/apache/camel/util/concurrent/ test/java/org/apache/camel/processor/async/ test/java/org/apache/camel/util/concurrent/

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

svn commit: r1057133 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/processor/ main/java/org/apache/camel/util/concurrent/ test/java/org/apache/camel/processor/async/ test/java/org/apache/camel/util/concurrent/

davsclaus-2
Author: davsclaus
Date: Mon Jan 10 09:03:05 2011
New Revision: 1057133

URL: http://svn.apache.org/viewvc?rev=1057133&view=rev
Log:
CAMEL-3497: Fixed timeout issue when using parallel on multicast/splitter etc. to fetch any already completed tasks to aggregate during timeout.

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/SubmitOrderedCompletionService.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientListParallel3Test.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/SubmitOrderedCompletionServiceTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=1057133&r1=1057132&r2=1057133&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Mon Jan 10 09:03:05 2011
@@ -61,6 +61,7 @@ import org.apache.camel.util.ServiceHelp
 import org.apache.camel.util.StopWatch;
 import org.apache.camel.util.concurrent.AtomicException;
 import org.apache.camel.util.concurrent.AtomicExchange;
+import org.apache.camel.util.concurrent.ExecutorServiceHelper;
 import org.apache.camel.util.concurrent.SubmitOrderedCompletionService;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -115,12 +116,10 @@ public class MulticastProcessor extends
 
         public void begin() {
             // noop
-            LOG.trace("ProcessorExchangePair #" + index + " begin: " + exchange);
         }
 
         public void done() {
             // noop
-            LOG.trace("ProcessorExchangePair #" + index + " done: " + exchange);
         }
 
     }
@@ -412,6 +411,9 @@ public class MulticastProcessor extends
                     // we are timed out but try to grab if some tasks has been completed
                     // poll will return null if no tasks is present
                     future = completion.poll();
+                    if (LOG.isTraceEnabled()) {
+                        LOG.trace("Polled completion task #" + aggregated + " after timeout to grab already completed tasks: " + future);
+                    }
                 } else if (timeout > 0) {
                     long left = timeout - watch.taken();
                     if (left < 0) {
@@ -451,7 +453,14 @@ public class MulticastProcessor extends
                         // log a WARN we timed out since it will not be aggregated and the Exchange will be lost
                         LOG.warn("Parallel processing timed out after " + timeout + " millis for number " + aggregated + ". This task will be cancelled and will not be aggregated.");
                     }
+                    if (LOG.isDebugEnabled()) {
+                        LOG.debug("Timeout occurred after " + timeout + " millis for number " + aggregated + " task.");
+                    }
                     timedOut = true;
+
+                    // mark that index as timed out, which allows us to try to retrieve
+                    // any already completed tasks in the next loop
+                    ExecutorServiceHelper.timeoutTask(completion);
                 } else {
                     // there is a result to aggregate
                     Exchange subExchange = future.get();

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java?rev=1057133&r1=1057132&r2=1057133&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java Mon Jan 10 09:03:05 2011
@@ -17,6 +17,7 @@
 package org.apache.camel.util.concurrent;
 
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
@@ -347,4 +348,18 @@ public final class ExecutorServiceHelper
         return null;
     }
 
+    /**
+     * Timeout the completion service.
+     * <p/>
+     * This can be used to mark the completion service as timed out, allowing you to poll any already completed tasks.
+     * This applies when using the {@link SubmitOrderedCompletionService}.
+     *
+     * @param completionService the completion service.
+     */
+    public static void timeoutTask(CompletionService completionService) {
+        if (completionService instanceof SubmitOrderedCompletionService) {
+            ((SubmitOrderedCompletionService) completionService).timeoutTask();
+        }
+    }
+
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/SubmitOrderedCompletionService.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/SubmitOrderedCompletionService.java?rev=1057133&r1=1057132&r2=1057133&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/SubmitOrderedCompletionService.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/SubmitOrderedCompletionService.java Mon Jan 10 09:03:05 2011
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.util.concurrent;
 
+import java.util.ArrayList;
+import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.DelayQueue;
@@ -65,7 +67,8 @@ public class SubmitOrderedCompletionServ
             // if the answer is 0 then this task is ready to be taken
             return id - index.get();
         }
-        
+
+        @SuppressWarnings("unchecked")
         public int compareTo(Delayed o) {
             SubmitOrderFutureTask other = (SubmitOrderFutureTask) o;
             return (int) (this.id - other.id);
@@ -76,6 +79,12 @@ public class SubmitOrderedCompletionServ
             // when we are done add to the completion queue
             completionQueue.add(this);
         }
+
+        @Override
+        public String toString() {
+            // output using zero-based index
+            return "SubmitOrderedFutureTask[" + (id - 1) + "]";
+        }
     }
 
     public SubmitOrderedCompletionService(Executor executor) {
@@ -89,7 +98,7 @@ public class SubmitOrderedCompletionServ
         }
         SubmitOrderFutureTask f = new SubmitOrderFutureTask(id.incrementAndGet(), task);
         executor.execute(f);
-        return (Future<V>) f;
+        return f;
     }
 
     public Future<V> submit(Runnable task, Object result) {
@@ -104,18 +113,37 @@ public class SubmitOrderedCompletionServ
     @SuppressWarnings("unchecked")
     public Future<V> take() throws InterruptedException {
         index.incrementAndGet();
-        return (Future) completionQueue.take();
+        return completionQueue.take();
     }
 
     @SuppressWarnings("unchecked")
     public Future<V> poll() {
         index.incrementAndGet();
-        return (Future) completionQueue.poll();
+        Future answer = completionQueue.poll();
+        if (answer == null) {
+            // decrease counter if we didnt get any data
+            index.decrementAndGet();
+        }
+        return answer;
     }
 
     @SuppressWarnings("unchecked")
     public Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException {
         index.incrementAndGet();
-        return (Future) completionQueue.poll(timeout, unit);
+        Future answer = completionQueue.poll(timeout, unit);
+        if (answer == null) {
+            // decrease counter if we didnt get any data
+            index.decrementAndGet();
+        }
+        return answer;
     }
+
+    /**
+     * Marks the current task as timeout, which allows you to poll the next
+     * tasks which may already have been completed.
+     */
+    public void timeoutTask() {
+        index.incrementAndGet();
+    }
+
 }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientListParallel3Test.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientListParallel3Test.java?rev=1057133&r1=1057132&r2=1057133&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientListParallel3Test.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/async/AsyncEndpointRecipientListParallel3Test.java Mon Jan 10 09:03:05 2011
@@ -57,7 +57,7 @@ public class AsyncEndpointRecipientListP
                                 beforeThreadName = Thread.currentThread().getName();
                             }
                         })
-                        .recipientList(constant("async:Hi Camel,direct:foo")).parallelProcessing();
+                        .recipientList(constant("async:Hi Camel?delay=2000,direct:foo")).parallelProcessing();
 
                 from("direct:foo")
                         .process(new Processor() {

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/SubmitOrderedCompletionServiceTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/SubmitOrderedCompletionServiceTest.java?rev=1057133&r1=1057132&r2=1057133&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/SubmitOrderedCompletionServiceTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/util/concurrent/SubmitOrderedCompletionServiceTest.java Mon Jan 10 09:03:05 2011
@@ -19,6 +19,7 @@ package org.apache.camel.util.concurrent
 import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import junit.framework.TestCase;
 
@@ -85,6 +86,63 @@ public class SubmitOrderedCompletionServ
         assertEquals("B", b);
     }
 
+    public void testSubmitOrderedFirstTaskIsSlowUsingPollTimeout() throws Exception {
+
+        service.submit(new Callable<Object>() {
+            public Object call() throws Exception {
+                // this task should be slower than B but we should still get it first
+                Thread.sleep(200);
+                return "A";
+            }
+        });
+
+        service.submit(new Callable<Object>() {
+            public Object call() throws Exception {
+                return "B";
+            }
+        });
+
+        Object a = service.poll(5, TimeUnit.SECONDS).get();
+        Object b = service.poll(5, TimeUnit.SECONDS).get();
+
+        assertEquals("A", a);
+        assertEquals("B", b);
+    }
+
+    public void testSubmitOrderedFirstTaskIsSlowUsingPoll() throws Exception {
+
+        service.submit(new Callable<Object>() {
+            public Object call() throws Exception {
+                // this task should be slower than B but we should still get it first
+                Thread.sleep(1000);
+                return "A";
+            }
+        });
+
+        service.submit(new Callable<Object>() {
+            public Object call() throws Exception {
+                return "B";
+            }
+        });
+
+        // poll should not get it the first time
+        Object a = service.poll();
+        assertNull(a);
+
+        Thread.sleep(100);
+
+        // and neither the 2nd time
+        a = service.poll();
+        assertNull(a);
+
+        // okay take them
+        a = service.take().get();
+        Object b = service.take().get();
+
+        assertEquals("A", a);
+        assertEquals("B", b);
+    }
+
     public void testSubmitOrderedSecondTaskIsSlow() throws Exception {
 
         service.submit(new Callable<Object>() {
@@ -106,4 +164,63 @@ public class SubmitOrderedCompletionServ
         assertEquals("A", a);
         assertEquals("B", b);
     }
+
+    public void testSubmitOrderedSecondTaskIsSlowUsingPollTimeout() throws Exception {
+
+        service.submit(new Callable<Object>() {
+            public Object call() throws Exception {
+                return "A";
+            }
+        });
+
+        service.submit(new Callable<Object>() {
+            public Object call() throws Exception {
+                Thread.sleep(200);
+                return "B";
+            }
+        });
+
+        Object a = service.poll(5, TimeUnit.SECONDS).get();
+        Object b = service.poll(5, TimeUnit.SECONDS).get();
+
+        assertEquals("A", a);
+        assertEquals("B", b);
+    }
+
+    public void testSubmitOrderedLastTaskIsSlowUsingPoll() throws Exception {
+
+        service.submit(new Callable<Object>() {
+            public Object call() throws Exception {
+                return "A";
+            }
+        });
+
+        service.submit(new Callable<Object>() {
+            public Object call() throws Exception {
+                Thread.sleep(1000);
+                return "B";
+            }
+        });
+
+        // take a
+        Object a = service.take().get();
+        assertNotNull(a);
+
+        // poll should not get it the first time
+        Object b = service.poll();
+        assertNull(b);
+
+        Thread.sleep(100);
+
+        // and neither the 2nd time
+        b = service.poll();
+        assertNull(b);
+
+        // okay take it
+        b = service.take().get();
+
+        assertEquals("A", a);
+        assertEquals("B", b);
+    }
+
 }