svn commit: r1057139 - in /camel/trunk/camel-core/src/main/java/org/apache/camel: component/seda/SedaEndpoint.java processor/MulticastProcessor.java spi/ExecutorServiceStrategy.java util/concurrent/ExecutorServiceHelper.java

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

svn commit: r1057139 - in /camel/trunk/camel-core/src/main/java/org/apache/camel: component/seda/SedaEndpoint.java processor/MulticastProcessor.java spi/ExecutorServiceStrategy.java util/concurrent/ExecutorServiceHelper.java

davsclaus-2
Author: davsclaus
Date: Mon Jan 10 09:59:22 2011
New Revision: 1057139

URL: http://svn.apache.org/viewvc?rev=1057139&view=rev
Log:
CAMEL-3497: Fixed rare potential deadlock issue with aggregate task not being given time to run due thread pool overloaded when running in parallel mode on multicast/splitter.

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java?rev=1057139&r1=1057138&r2=1057139&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/seda/SedaEndpoint.java Mon Jan 10 09:59:22 2011
@@ -35,6 +35,7 @@ import org.apache.camel.WaitForTaskToCom
 import org.apache.camel.impl.DefaultEndpoint;
 import org.apache.camel.processor.MulticastProcessor;
 import org.apache.camel.spi.BrowsableEndpoint;
+import org.apache.camel.util.ServiceHelper;
 
 /**
  * An implementation of the <a
@@ -103,7 +104,11 @@ public class SedaEndpoint extends Defaul
         return conumserMulticastProcessor;
     }
     
-    protected synchronized void updateMulticastProcessor() {
+    protected synchronized void updateMulticastProcessor() throws Exception {
+        if (conumserMulticastProcessor != null) {
+            ServiceHelper.stopService(conumserMulticastProcessor);
+        }
+
         int size = getConsumers().size();
         if (size == 0 && multicastExecutor != null) {
             // stop the multicastExecutor
@@ -118,7 +123,7 @@ public class SedaEndpoint extends Defaul
             processors.add(consumer.getProcessor());
         }
         conumserMulticastProcessor = new MulticastProcessor(getCamelContext(), processors, null, true, multicastExecutor, false, false, 0);
-  
+        ServiceHelper.startService(conumserMulticastProcessor);
     }
 
     public void setQueue(BlockingQueue<Exchange> queue) {
@@ -203,12 +208,12 @@ public class SedaEndpoint extends Defaul
         producers.remove(producer);
     }
 
-    void onStarted(SedaConsumer consumer) {
+    void onStarted(SedaConsumer consumer) throws Exception {
         consumers.add(consumer);
         updateMulticastProcessor();
     }
 
-    void onStopped(SedaConsumer consumer) {
+    void onStopped(SedaConsumer consumer) throws Exception {
         consumers.remove(consumer);
         updateMulticastProcessor();
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=1057139&r1=1057138&r2=1057139&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Mon Jan 10 09:59:22 2011
@@ -144,6 +144,7 @@ public class MulticastProcessor extends
     private final boolean streaming;
     private final boolean stopOnException;
     private final ExecutorService executorService;
+    private ExecutorService aggregateExecutorService;
     private final long timeout;
     private final ConcurrentMap<PreparedErrorHandler, Processor> errorHandlers = new ConcurrentHashMap<PreparedErrorHandler, Processor>();
 
@@ -233,6 +234,7 @@ public class MulticastProcessor extends
                                      final boolean streaming, final AsyncCallback callback) throws Exception {
 
         ObjectHelper.notNull(executorService, "ExecutorService", this);
+        ObjectHelper.notNull(aggregateExecutorService, "AggregateExecutorService", this);
 
         final CompletionService<Exchange> completion;
         if (streaming) {
@@ -260,7 +262,7 @@ public class MulticastProcessor extends
                     aggregationOnTheFlyDone, allTasksSubmitted, executionException);
 
             // and start the aggregation task so we can aggregate on-the-fly
-            executorService.submit(task);
+            aggregateExecutorService.submit(task);
         }
 
         LOG.trace("Starting to submit parallel tasks");
@@ -383,11 +385,10 @@ public class MulticastProcessor extends
                 }
             } finally {
                 // must signal we are done so the latch can open and let the other thread continue processing
-                LOG.trace("Signaling we are done aggregating on the fly");
+                LOG.debug("Signaling we are done aggregating on the fly");
+                LOG.trace("Aggregate on the fly task +++ done +++");
                 aggregationOnTheFlyDone.countDown();
             }
-
-            LOG.trace("Aggregate on the fly task +++ done +++");
         }
 
         private void aggregateOnTheFly() throws InterruptedException, ExecutionException {
@@ -880,6 +881,12 @@ public class MulticastProcessor extends
         if (timeout > 0 && !isParallelProcessing()) {
             throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled");
         }
+        if (isParallelProcessing() && aggregateExecutorService == null) {
+            // use cached thread pool so we ensure the aggregate on-the-fly task always will have assigned a thread
+            // and run the tasks when the task is submitted. If not then the aggregate task may not be able to run
+            // and signal completion during processing, which would lead to a dead-lock
+            aggregateExecutorService = camelContext.getExecutorServiceStrategy().newCachedThreadPool(this, "AggregateTask");
+        }
         ServiceHelper.startServices(processors);
     }
 

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java?rev=1057139&r1=1057138&r2=1057139&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/ExecutorServiceStrategy.java Mon Jan 10 09:59:22 2011
@@ -144,13 +144,13 @@ public interface ExecutorServiceStrategy
 
     /**
      * Creates a new cached thread pool.
+     * <p/>
+     * <b>Important:</b> Using cached thread pool is discouraged as they have no upper bound and can overload the JVM.
      *
      * @param source      the source object, usually it should be <tt>this</tt> passed in as parameter
      * @param name        name which is appended to the thread name
      * @return the created thread pool
-     * @deprecated using cached thread pool is discouraged as they have no upper bound and can overload the JVM
      */
-    @Deprecated
     ExecutorService newCachedThreadPool(Object source, String name);
 
     /**

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java?rev=1057139&r1=1057138&r2=1057139&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/concurrent/ExecutorServiceHelper.java Mon Jan 10 09:59:22 2011
@@ -27,7 +27,7 @@ import java.util.concurrent.SynchronousQ
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.camel.model.ExecutorServiceAwareDefinition;
 import org.apache.camel.spi.ExecutorServiceStrategy;
@@ -49,12 +49,12 @@ import org.apache.camel.util.ObjectHelpe
 public final class ExecutorServiceHelper {
 
     public static final String DEFAULT_PATTERN = "Camel Thread ${counter} - ${name}";
-    private static AtomicInteger threadCounter = new AtomicInteger();
+    private static AtomicLong threadCounter = new AtomicLong();
 
     private ExecutorServiceHelper() {
     }
 
-    private static int nextThreadCounter() {
+    private static long nextThreadCounter() {
         return threadCounter.getAndIncrement();
     }
 
@@ -152,15 +152,15 @@ public final class ExecutorServiceHelper
     }
 
     /**
-     * Creates a new cached thread pool
+     * Creates a new cached thread pool.
+     * <p/>
+     * <b>Important:</b> Using cached thread pool is discouraged as they have no upper bound and can overload the JVM.
      *
      * @param pattern pattern of the thread name
      * @param name    ${name} in the pattern name
      * @param daemon  whether the threads is daemon or not
      * @return the created pool
-     * @deprecated using cached thread pool is discouraged as they have no upper bound and can overload the JVM
      */
-    @Deprecated
     public static ExecutorService newCachedThreadPool(final String pattern, final String name, final boolean daemon) {
         return Executors.newCachedThreadPool(new ThreadFactory() {
             public Thread newThread(Runnable r) {
@@ -187,7 +187,7 @@ public final class ExecutorServiceHelper
      *
      * @param pattern      pattern of the thread name
      * @param name         ${name} in the pattern name
-     * @param corePoolSize the core size
+     * @param corePoolSize the core pool size
      * @param maxPoolSize  the maximum pool size
      * @return the created pool
      */
@@ -201,7 +201,7 @@ public final class ExecutorServiceHelper
      *
      * @param pattern      pattern of the thread name
      * @param name         ${name} in the pattern name
-     * @param corePoolSize the core size
+     * @param corePoolSize the core pool size
      * @param maxPoolSize  the maximum pool size
      * @param maxQueueSize the maximum number of tasks in the queue, use <tt>Integer.MAX_VALUE</tt> or <tt>-1</tt> to indicate unbounded
      * @return the created pool
@@ -216,7 +216,7 @@ public final class ExecutorServiceHelper
      *
      * @param pattern                  pattern of the thread name
      * @param name                     ${name} in the pattern name
-     * @param corePoolSize             the core size
+     * @param corePoolSize             the core pool size
      * @param maxPoolSize              the maximum pool size
      * @param keepAliveTime            keep alive time
      * @param timeUnit                 keep alive time unit