[camel] branch master updated: [CAMEL-12809] Threading issues with the throttler when using a correlation key

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[camel] branch master updated: [CAMEL-12809] Threading issues with the throttler when using a correlation key

gnodet
This is an automated email from the ASF dual-hosted git repository.

gnodet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 8517fdf  [CAMEL-12809] Threading issues with the throttler when using a correlation key
8517fdf is described below

commit 8517fdfc2d934bc4bd1254dcd983e02b3ef4f5e8
Author: Guillaume Nodet <[hidden email]>
AuthorDate: Fri Sep 14 15:47:50 2018 +0200

    [CAMEL-12809] Threading issues with the throttler when using a correlation key
---
 .../org/apache/camel/model/ThrottleDefinition.java |   4 +-
 .../java/org/apache/camel/processor/Throttler.java | 301 ++++++++-------------
 .../camel/management/ManagedThrottlerTest.java     |   3 +-
 3 files changed, 110 insertions(+), 198 deletions(-)

diff --git a/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java b/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
index eca9a3a..e4a616f 100644
--- a/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/ThrottleDefinition.java
@@ -105,8 +105,8 @@ public class ThrottleDefinition extends ExpressionNode implements ExecutorServic
         Processor childProcessor = this.createChildProcessor(routeContext, true);
 
         boolean async = getAsyncDelayed() != null && getAsyncDelayed();
-        boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, async);
-        ScheduledExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredScheduledExecutorService(routeContext, "Throttle", this, async);
+        boolean shutdownThreadPool = ProcessorDefinitionHelper.willCreateNewThreadPool(routeContext, this, true);
+        ScheduledExecutorService threadPool = ProcessorDefinitionHelper.getConfiguredScheduledExecutorService(routeContext, "Throttle", this, true);
         
         // should be default 1000 millis
         long period = getTimePeriodMillis() != null ? getTimePeriodMillis() : 1000L;
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
index 4191c06..b57ff1b 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Throttler.java
@@ -16,16 +16,15 @@
  */
 package org.apache.camel.processor;
 
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.Map;
-import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.DelayQueue;
 import java.util.concurrent.Delayed;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelContext;
@@ -36,11 +35,7 @@ import org.apache.camel.RuntimeExchangeException;
 import org.apache.camel.Traceable;
 import org.apache.camel.spi.IdAware;
 import org.apache.camel.util.AsyncProcessorHelper;
-import org.apache.camel.util.CamelContextHelper;
-import org.apache.camel.util.LRUCache;
-import org.apache.camel.util.LRUCacheFactory;
 import org.apache.camel.util.ObjectHelper;
-import org.apache.camel.util.ServiceHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -68,6 +63,8 @@ import org.slf4j.LoggerFactory;
  */
 public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAware {
 
+    private static final String DEFAULT_KEY = "CamelThrottlerDefaultKey";
+
     private static final String PROPERTY_EXCHANGE_QUEUED_TIMESTAMP = "CamelThrottlerExchangeQueuedTimestamp";
     private static final String PROPERTY_EXCHANGE_STATE = "CamelThrottlerExchangeState";
 
@@ -75,24 +72,21 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
 
     private final Logger log = LoggerFactory.getLogger(Throttler.class);
     private final CamelContext camelContext;
-    private final ExecutorService asyncExecutor;
+    private final ScheduledExecutorService asyncExecutor;
     private final boolean shutdownAsyncExecutor;
 
     private volatile long timePeriodMillis;
+    private volatile long cleanPeriodMillis;
     private String id;
-    private volatile Integer throttleRate = new Integer(0);
     private Expression maxRequestsPerPeriodExpression;
     private boolean rejectExecution;
     private boolean asyncDelayed;
     private boolean callerRunsWhenRejected = true;
-    private final DelayQueue<ThrottlePermit> delayQueue = new DelayQueue<>();
-    // below 3 fields added for (throttling grouping)
     private Expression correlationExpression;
-    private Map<Integer, DelayQueue<ThrottlePermit>> delayQueueCache;
-    private Map<Integer, Integer> throttleRatesMap = new HashMap<>();    
+    private Map<String, ThrottlingState> states = new ConcurrentHashMap<>();
 
     public Throttler(final CamelContext camelContext, final Processor processor, final Expression maxRequestsPerPeriodExpression, final long timePeriodMillis,
-                     final ExecutorService asyncExecutor, final boolean shutdownAsyncExecutor, final boolean rejectExecution, Expression correlation) {
+                     final ScheduledExecutorService asyncExecutor, final boolean shutdownAsyncExecutor, final boolean rejectExecution, Expression correlation) {
         super(processor);
         this.camelContext = camelContext;
         this.rejectExecution = rejectExecution;
@@ -105,6 +99,7 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
             throw new IllegalArgumentException("TimePeriodMillis should be a positive number, was: " + timePeriodMillis);
         }
         this.timePeriodMillis = timePeriodMillis;
+        this.cleanPeriodMillis = timePeriodMillis * 10;
         this.asyncExecutor = asyncExecutor;
         this.correlationExpression = correlation;
     }
@@ -125,28 +120,24 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
                 throw new RejectedExecutionException("Run is not allowed");
             }
 
-            DelayQueue<ThrottlePermit> delayQ = null;
-            Integer key = null;
+            String key = DEFAULT_KEY;
             if (correlationExpression != null) {
-                key = correlationExpression.evaluate(exchange, Integer.class);
-                delayQ = locateDelayQueue(key, doneSync);
-                calculateAndSetMaxRequestsPerPeriod(delayQ, exchange, key);
-            } else {
-                delayQ = delayQueue;
-                calculateAndSetMaxRequestsPerPeriod(exchange);
+                key = correlationExpression.evaluate(exchange, String.class);
             }
+            ThrottlingState throttlingState = states.computeIfAbsent(key, ThrottlingState::new);
+            throttlingState.calculateAndSetMaxRequestsPerPeriod(exchange);
 
-            ThrottlePermit permit = delayQ.poll();
+            ThrottlePermit permit = throttlingState.poll();
 
             if (permit == null) {
                 if (isRejectExecution()) {
                     throw new ThrottlerRejectedExecutionException("Exceeded the max throttle rate of "
-                            + ((correlationExpression != null) ? throttleRatesMap.get(key) : throttleRate) + " within " + timePeriodMillis + "ms");
+                            + throttlingState.getThrottleRate() + " within " + timePeriodMillis + "ms");
                 } else {
                     // delegate to async pool
                     if (isAsyncDelayed() && !exchange.isTransacted() && state == State.SYNC) {
                         log.debug("Throttle rate exceeded but AsyncDelayed enabled, so queueing for async processing, exchangeId: {}", exchange.getExchangeId());
-                        return processAsynchronously(exchange, callback);
+                        return processAsynchronously(exchange, callback, throttlingState);
                     }
 
                     // block waiting for a permit
@@ -155,11 +146,11 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
                     if (log.isTraceEnabled()) {
                         start = System.currentTimeMillis();
                     }
-                    permit = delayQ.take();
+                    permit = throttlingState.take();
                     if (log.isTraceEnabled()) {
                         elapsed = System.currentTimeMillis() - start;
                     }
-                    enqueuePermit(permit, exchange, delayQ);
+                    throttlingState.enqueue(permit, exchange);
 
                     if (state == State.ASYNC) {
                         if (log.isTraceEnabled()) {
@@ -171,7 +162,7 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
                     }
                 }
             } else {
-                enqueuePermit(permit, exchange, delayQ);
+                throttlingState.enqueue(permit, exchange);
 
                 if (state == State.ASYNC) {
                     if (log.isTraceEnabled()) {
@@ -217,57 +208,18 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
     }
 
     /**
-     *
-     * Finds the right Delay Queue to put a permit into with the exchanges time arrival timestamp +  timePeriodInMillis
-     * In case of asynchronous routing there may be cases where we create new group whose correlationExpression
-     * might first hit after long series of exchanges with a different correlationExpression and are to be on hold in
-     * their delayQueue so we need to find delay queue to add new ones while we create a new empty delay
-     * queue for the new group hit for the first time. that's why locating delay queues for those frequently
-     * hitting exchanges for the group during asynchronous routing would be better be asynchronous with asyncExecutor
-     *
-     * @param key is evaluated value of correlationExpression
-     * @param doneSync is a flag indicating if the exchange is routed asynchronously or not
-     * @return DelayQueue in which the exchange with permit expiry to be put into
-     */
-    private DelayQueue<ThrottlePermit> locateDelayQueue(final Integer key, final boolean doneSync) throws InterruptedException, ExecutionException {        
-        CompletableFuture<DelayQueue<ThrottlePermit>> futureDelayQueue = new CompletableFuture<>();
-
-        if (!doneSync) {
-            asyncExecutor.submit(() -> {
-                futureDelayQueue.complete(findDelayQueue(key));
-            });
-        }
-        DelayQueue<ThrottlePermit> currentQueue = (!doneSync) ? futureDelayQueue.get() : findDelayQueue(key);  
-        return currentQueue;
-    }
-
-    private DelayQueue<ThrottlePermit> findDelayQueue(Integer key) {
-        DelayQueue<ThrottlePermit> currentDelayQueue = delayQueueCache.get(key);
-        if (currentDelayQueue == null) {
-            currentDelayQueue = new DelayQueue<>();
-            throttleRatesMap.put(key, 0);
-            delayQueueCache.put(key, currentDelayQueue);
-        }
-        return currentDelayQueue;
-    }
-
-    /**
      * Delegate blocking on the DelayQueue to an asyncExecutor. Except if the executor rejects the submission
      * and isCallerRunsWhenRejected() is enabled, then this method will delegate back to process(), but not
      * before changing the exchange state to stop any recursion.
      */
-    protected boolean processAsynchronously(final Exchange exchange, final AsyncCallback callback) {
+    protected boolean processAsynchronously(final Exchange exchange, final AsyncCallback callback, ThrottlingState throttlingState) {
         try {
             if (log.isTraceEnabled()) {
                 exchange.setProperty(PROPERTY_EXCHANGE_QUEUED_TIMESTAMP, System.currentTimeMillis());
             }
             exchange.setProperty(PROPERTY_EXCHANGE_STATE, State.ASYNC);
-            asyncExecutor.submit(new Runnable() {
-                @Override
-                public void run() {
-                    process(exchange, callback);
-                }
-            });
+            long delay = throttlingState.peek().getDelay(TimeUnit.NANOSECONDS);
+            asyncExecutor.schedule(() -> process(exchange, callback), delay, TimeUnit.NANOSECONDS);
             return false;
         } catch (final RejectedExecutionException e) {
             if (isCallerRunsWhenRejected()) {
@@ -279,38 +231,96 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
         }
     }
 
-    /**
-     * Returns a permit to the DelayQueue, first resetting it's delay to be relative to now.
-     * @throws ExecutionException
-     * @throws InterruptedException
-     */
-    protected void enqueuePermit(final ThrottlePermit permit, final Exchange exchange, DelayQueue<ThrottlePermit> delayQueue) throws InterruptedException, ExecutionException {
-        permit.setDelayMs(getTimePeriodMillis());
-        delayQueue.put(permit);
-        // try and incur the least amount of overhead while releasing permits back to the queue
-        if (log.isTraceEnabled()) {
-            log.trace("Permit released, for exchangeId: {}", exchange.getExchangeId());
+    @SuppressWarnings("unchecked")
+    @Override
+    protected void doStart() throws Exception {
+        if (isAsyncDelayed()) {
+            ObjectHelper.notNull(asyncExecutor, "executorService", this);
         }
+        super.doStart();
     }
 
-    /**
-     * Evaluates the maxRequestsPerPeriodExpression and adjusts the throttle rate up or down.
-     */
-    protected void calculateAndSetMaxRequestsPerPeriod(final Exchange exchange) throws Exception {
-        Integer newThrottle = maxRequestsPerPeriodExpression.evaluate(exchange, Integer.class);
+    @SuppressWarnings("rawtypes")
+    @Override
+    protected void doShutdown() throws Exception {
+        if (shutdownAsyncExecutor && asyncExecutor != null) {
+            camelContext.getExecutorServiceManager().shutdownNow(asyncExecutor);
+        }
+        states.clear();
+        super.doShutdown();
+    }
+
+    private class ThrottlingState {
+        private final String key;
+        private final DelayQueue<ThrottlePermit> delayQueue = new DelayQueue<>();
+        private final AtomicReference<ScheduledFuture<?>> cleanFuture = new AtomicReference<>();
+        private volatile int throttleRate = 0;
+
+        ThrottlingState(String key) {
+            this.key = key;
+        }
+
+        public String getKey() {
+            return key;
+        }
+
+        public int getThrottleRate() {
+            return throttleRate;
+        }
+
+        public ThrottlePermit poll() {
+            return delayQueue.poll();
+        }
+
+        public ThrottlePermit peek() {
+            return delayQueue.peek();
+        }
+
+        public ThrottlePermit take() throws InterruptedException {
+            return delayQueue.take();
+        }
 
-        if (newThrottle != null && newThrottle < 0) {
-            throw new IllegalStateException("The maximumRequestsPerPeriod must be a positive number, was: " + newThrottle);
+        public void clean() {
+            states.remove(key);
+        }
+
+        /**
+         * Returns a permit to the DelayQueue, first resetting it's delay to be relative to now.
+         */
+        public void enqueue(final ThrottlePermit permit, final Exchange exchange) {
+            permit.setDelayMs(getTimePeriodMillis());
+            delayQueue.put(permit);
+            try {
+                ScheduledFuture<?> next = asyncExecutor.schedule(this::clean, cleanPeriodMillis, TimeUnit.MILLISECONDS);
+                ScheduledFuture<?> prev = cleanFuture.getAndSet(next);
+                if (prev != null) {
+                    prev.cancel(false);
+                }
+                // try and incur the least amount of overhead while releasing permits back to the queue
+                if (log.isTraceEnabled()) {
+                    log.trace("Permit released, for exchangeId: {}", exchange.getExchangeId());
+                }
+            } catch (RejectedExecutionException e) {
+                log.debug("Throttling queue cleaning rejected", e);
+            }
         }
 
-        synchronized (this) {
+        /**
+         * Evaluates the maxRequestsPerPeriodExpression and adjusts the throttle rate up or down.
+         */
+        public synchronized void calculateAndSetMaxRequestsPerPeriod(final Exchange exchange) throws Exception {
+            Integer newThrottle = maxRequestsPerPeriodExpression.evaluate(exchange, Integer.class);
+
+            if (newThrottle != null && newThrottle < 0) {
+                throw new IllegalStateException("The maximumRequestsPerPeriod must be a positive number, was: " + newThrottle);
+            }
+
             if (newThrottle == null && throttleRate == 0) {
                 throw new RuntimeExchangeException("The maxRequestsPerPeriodExpression was evaluated as null: " + maxRequestsPerPeriodExpression, exchange);
             }
 
             if (newThrottle != null) {
-                if (!newThrottle.equals(throttleRate)) {
-                    // get the queue from the cache
+                if (newThrottle != throttleRate) {
                     // decrease
                     if (throttleRate > newThrottle) {
                         int delta = throttleRate - newThrottle;
@@ -323,7 +333,7 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
                         }
                         log.debug("Throttle rate decreased from {} to {}, triggered by ExchangeId: {}", throttleRate, newThrottle, exchange.getExchangeId());
 
-                    // increase
+                        // increase
                     } else if (newThrottle > throttleRate) {
                         int delta = newThrottle - throttleRate;
                         for (int i = 0; i < delta; i++) {
@@ -340,102 +350,6 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
             }
         }
     }
-    
-    /**
-     * Evaluates the maxRequestsPerPeriodExpression and adjusts the throttle rate up or down.
-     */
-    protected void calculateAndSetMaxRequestsPerPeriod(DelayQueue<ThrottlePermit> delayQueue, final Exchange exchange, final Integer key) throws Exception {
-        Integer newThrottle = maxRequestsPerPeriodExpression.evaluate(exchange, Integer.class);
-
-        if (newThrottle != null && newThrottle < 0) {
-            throw new IllegalStateException("The maximumRequestsPerPeriod must be a positive number, was: " + newThrottle);
-        }
-
-        synchronized (key) {
-            if (newThrottle == null && throttleRatesMap.get(key) == 0) {
-                throw new RuntimeExchangeException("The maxRequestsPerPeriodExpression was evaluated as null: " + maxRequestsPerPeriodExpression, exchange);
-            }
-
-            if (newThrottle != null) {
-                if (!newThrottle.equals(throttleRatesMap.get(key))) {
-                    // get the queue from the cache
-                    // decrease
-                    if (throttleRatesMap.get(key) > newThrottle) {
-                        int delta = throttleRatesMap.get(key) - newThrottle;
-
-                        // discard any permits that are needed to decrease throttling
-                        while (delta > 0) {
-                            delayQueue.take();
-                            delta--;
-                            log.trace("Permit discarded due to throttling rate decrease, triggered by ExchangeId: {}", exchange.getExchangeId());
-                        }
-                        log.debug("Throttle rate decreased from {} to {}, triggered by ExchangeId: {}", throttleRatesMap.get(key), newThrottle, exchange.getExchangeId());
-
-                    // increase
-                    } else if (newThrottle > throttleRatesMap.get(key)) {
-                        int delta = newThrottle - throttleRatesMap.get(key);
-                        for (int i = 0; i < delta; i++) {
-                            delayQueue.put(new ThrottlePermit(-1));
-                        }
-                        if (throttleRatesMap.get(key) == 0) {
-                            log.debug("Initial throttle rate set to {}, triggered by ExchangeId: {}", newThrottle, exchange.getExchangeId());
-                        } else {
-                            log.debug("Throttle rate increase from {} to {}, triggered by ExchangeId: {}", throttleRatesMap.get(key), newThrottle, exchange.getExchangeId());
-                        }
-                    }
-                    throttleRatesMap.put(key, newThrottle);
-                }
-            }
-        }
-    }
-
-    @SuppressWarnings("unchecked")
-    @Override
-    protected void doStart() throws Exception {
-        if (isAsyncDelayed()) {
-            ObjectHelper.notNull(asyncExecutor, "executorService", this);
-        }
-        if (correlationExpression != null) {
-            if (camelContext != null) {
-                int maxSize = CamelContextHelper.getMaximumSimpleCacheSize(camelContext);
-                if (maxSize > 0) {
-                    delayQueueCache = LRUCacheFactory.newLRUCache(16, maxSize, false);
-                    log.debug("DelayQueues cache size: {}", maxSize);
-                } else {
-                    delayQueueCache = LRUCacheFactory.newLRUCache(100);
-                    log.debug("Defaulting DelayQueues cache size: {}", 100);
-                }
-            }
-            if (delayQueueCache != null) {
-                ServiceHelper.startService(delayQueueCache);
-            }
-        }
-        super.doStart();
-    }
-
-    @SuppressWarnings("rawtypes")
-    @Override
-    protected void doShutdown() throws Exception {
-        if (shutdownAsyncExecutor && asyncExecutor != null) {
-            camelContext.getExecutorServiceManager().shutdownNow(asyncExecutor);
-        }
-        if (correlationExpression != null) {
-            if (delayQueueCache != null) {
-                ServiceHelper.stopService(delayQueueCache);
-                if (log.isDebugEnabled()) {
-                    if (delayQueueCache instanceof LRUCache) {
-                        log.debug("Clearing deleay queues cache[size={}, hits={}, misses={}, evicted={}]",
-                                delayQueueCache.size(), ((LRUCache) delayQueueCache).getHits(), ((LRUCache) delayQueueCache).getMisses(), ((LRUCache) delayQueueCache).getEvicted());
-                    }
-                }
-                delayQueueCache.clear();
-            }
-            if (throttleRatesMap != null && throttleRatesMap.size() > 0) {
-                throttleRatesMap.clear();
-            }
-        }
-        super.doShutdown();
-    }
 
     /**
      * Permit that implements the Delayed interface needed by DelayQueue.
@@ -507,14 +421,11 @@ public class Throttler extends DelegateAsyncProcessor implements Traceable, IdAw
 
     /**
      * Gets the current maximum request per period value.
-     * If it is grouped throttling applied with correlationExpression
+     * If it is grouped throttling applied with correlationExpression
      * than the max per period within the group will return
      */
     public int getCurrentMaximumRequestsPerPeriod() {
-        if (correlationExpression == null) {
-            return throttleRate;
-        }
-        return Collections.max(throttleRatesMap.entrySet(), (entry1, entry2) -> entry1.getValue() - entry2.getValue()).getValue();
+        return states.values().stream().mapToInt(ThrottlingState::getThrottleRate).max().orElse(0);
     }
 
     /**
diff --git a/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlerTest.java b/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlerTest.java
index 62faa05..d95b534 100644
--- a/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlerTest.java
+++ b/camel-core/src/test/java/org/apache/camel/management/ManagedThrottlerTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel.management;
 
+import java.util.concurrent.Callable;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
@@ -293,7 +294,7 @@ public class ManagedThrottlerTest extends ManagementTestSupport {
     protected RouteBuilder createRouteBuilder() throws Exception {
         final ScheduledExecutorService badService = new ScheduledThreadPoolExecutor(1) {
             @Override
-            public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
+            public <V> ScheduledFuture<V> schedule(Callable<V> command, long delay, TimeUnit unit) {
                 throw new RejectedExecutionException();
             }
         };