svn commit: r1057141 - /camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

svn commit: r1057141 - /camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java

davsclaus-2
Author: davsclaus
Date: Mon Jan 10 10:18:58 2011
New Revision: 1057141

URL: http://svn.apache.org/viewvc?rev=1057141&view=rev
Log:
CAMEL-3497: Fixed rare potential deadlock issue with aggregate task not being given time to run due thread pool overloaded when running in parallel mode on multicast/splitter.

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=1057141&r1=1057140&r2=1057141&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Mon Jan 10 10:18:58 2011
@@ -882,10 +882,13 @@ public class MulticastProcessor extends
             throw new IllegalArgumentException("Timeout is used but ParallelProcessing has not been enabled");
         }
         if (isParallelProcessing() && aggregateExecutorService == null) {
-            // use cached thread pool so we ensure the aggregate on-the-fly task always will have assigned a thread
+            // use unbounded thread pool so we ensure the aggregate on-the-fly task always will have assigned a thread
             // and run the tasks when the task is submitted. If not then the aggregate task may not be able to run
             // and signal completion during processing, which would lead to a dead-lock
-            aggregateExecutorService = camelContext.getExecutorServiceStrategy().newCachedThreadPool(this, "AggregateTask");
+            // keep at least one thread in the pool so we re-use the thread avoiding to create new threads because
+            // the pool shrank to zero.
+            String name = getClass().getSimpleName() + "-AggregateTask";
+            aggregateExecutorService = camelContext.getExecutorServiceStrategy().newThreadPool(this, name, 1, Integer.MAX_VALUE);
         }
         ServiceHelper.startServices(processors);
     }