[camel] branch master updated: CAMEL-6840 - add more regression which is similar to already existing ThrottlerTests where grouping is added and fix some CS errors in camel-core

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[camel] branch master updated: CAMEL-6840 - add more regression which is similar to already existing ThrottlerTests where grouping is added and fix some CS errors in camel-core

onders-2
This is an automated email from the ASF dual-hosted git repository.

onders pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 1b5040f  CAMEL-6840 - add more regression which is similar to already existing ThrottlerTests where grouping is added and fix some CS errors in camel-core
     new 338d058  CAMEL-6840 - add more regression which is similar to already existing ThrottlerTests where grouping is added and fix some CS errors in camel-core
1b5040f is described below

commit 1b5040f2b4288d4b1b042d2d7c5456aead8975d7
Author: onders <[hidden email]>
AuthorDate: Tue Jul 31 22:37:21 2018 +0300

    CAMEL-6840 - add more regression which is similar to already existing ThrottlerTests where grouping is added and fix some CS errors in camel-core
---
 .../apache/camel/builder/ExpressionBuilder.java    |   2 +-
 .../org/apache/camel/impl/FileStateRepository.java |   2 +-
 .../java/org/apache/camel/processor/Splitter.java  |   2 -
 .../idempotent/FileIdempotentRepository.java       |   2 +-
 .../camel/support/TokenPairExpressionIterator.java |   2 +-
 .../support/TokenXMLPairExpressionIterator.java    |   2 +-
 .../org/apache/camel/util/GroupTokenIterator.java  |   2 +-
 .../apache/camel/util/IntrospectionSupport.java    |   6 +-
 .../main/java/org/apache/camel/util/Scanner.java   |  10 +-
 .../java/org/apache/camel/util/SkipIterator.java   |   1 -
 .../camel/processor/ThrottlingGroupingTest.java    | 134 +++++++++++++++++++++
 .../apache/camel/util/GroupTokenIteratorTest.java  |   1 -
 .../processor/SpringThrottlerGroupingTest.java     |   2 -
 .../spring/processor/ThrottlerGroupingTest.xml     |  39 ++++++
 14 files changed, 187 insertions(+), 20 deletions(-)

diff --git a/camel-core/src/main/java/org/apache/camel/builder/ExpressionBuilder.java b/camel-core/src/main/java/org/apache/camel/builder/ExpressionBuilder.java
index 11f640b..36c4ced 100644
--- a/camel-core/src/main/java/org/apache/camel/builder/ExpressionBuilder.java
+++ b/camel-core/src/main/java/org/apache/camel/builder/ExpressionBuilder.java
@@ -27,7 +27,6 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Random;
-import org.apache.camel.util.Scanner;
 import java.util.Set;
 import java.util.TimeZone;
 import java.util.concurrent.atomic.AtomicReference;
@@ -71,6 +70,7 @@ import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.MessageHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.OgnlHelper;
+import org.apache.camel.util.Scanner;
 import org.apache.camel.util.SkipIterator;
 import org.apache.camel.util.StringHelper;
 
diff --git a/camel-core/src/main/java/org/apache/camel/impl/FileStateRepository.java b/camel-core/src/main/java/org/apache/camel/impl/FileStateRepository.java
index bac7f95..feda49c 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/FileStateRepository.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/FileStateRepository.java
@@ -21,7 +21,6 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
-import org.apache.camel.util.Scanner;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.camel.api.management.ManagedAttribute;
@@ -32,6 +31,7 @@ import org.apache.camel.support.ServiceSupport;
 import org.apache.camel.util.FileUtil;
 import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.Scanner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/camel-core/src/main/java/org/apache/camel/processor/Splitter.java b/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
index 6eb6833..1833cd5 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
@@ -23,7 +23,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
-import org.apache.camel.util.Scanner;
 import java.util.concurrent.ExecutorService;
 
 import org.apache.camel.AsyncCallback;
@@ -42,7 +41,6 @@ import org.apache.camel.spi.RouteContext;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.ObjectHelper;
-
 import static org.apache.camel.util.ObjectHelper.notNull;
 
 /**
diff --git a/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java b/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java
index eef5c6d..c6758b9 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/idempotent/FileIdempotentRepository.java
@@ -22,7 +22,6 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
-import org.apache.camel.util.Scanner;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.camel.api.management.ManagedAttribute;
@@ -35,6 +34,7 @@ import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.LRUCache;
 import org.apache.camel.util.LRUCacheFactory;
 import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.Scanner;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
diff --git a/camel-core/src/main/java/org/apache/camel/support/TokenPairExpressionIterator.java b/camel-core/src/main/java/org/apache/camel/support/TokenPairExpressionIterator.java
index 0ccfdd3..d9964b9 100644
--- a/camel-core/src/main/java/org/apache/camel/support/TokenPairExpressionIterator.java
+++ b/camel-core/src/main/java/org/apache/camel/support/TokenPairExpressionIterator.java
@@ -20,13 +20,13 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Iterator;
-import org.apache.camel.util.Scanner;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.InvalidPayloadException;
 import org.apache.camel.language.simple.SimpleLanguage;
 import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.Scanner;
 
 /**
  * {@link org.apache.camel.Expression} to walk a {@link org.apache.camel.Message} body
diff --git a/camel-core/src/main/java/org/apache/camel/support/TokenXMLPairExpressionIterator.java b/camel-core/src/main/java/org/apache/camel/support/TokenXMLPairExpressionIterator.java
index 39664c9..23b3b2f 100644
--- a/camel-core/src/main/java/org/apache/camel/support/TokenXMLPairExpressionIterator.java
+++ b/camel-core/src/main/java/org/apache/camel/support/TokenXMLPairExpressionIterator.java
@@ -20,13 +20,13 @@ import java.io.InputStream;
 import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.Map;
-import org.apache.camel.util.Scanner;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.language.simple.SimpleLanguage;
 import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.Scanner;
 
 /**
  * {@link org.apache.camel.Expression} to walk a {@link org.apache.camel.Message} XML body
diff --git a/camel-core/src/main/java/org/apache/camel/util/GroupTokenIterator.java b/camel-core/src/main/java/org/apache/camel/util/GroupTokenIterator.java
index 0fa0a5c..f6349a9 100644
--- a/camel-core/src/main/java/org/apache/camel/util/GroupTokenIterator.java
+++ b/camel-core/src/main/java/org/apache/camel/util/GroupTokenIterator.java
@@ -21,9 +21,9 @@ import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.Iterator;
-import org.apache.camel.util.Scanner;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
 import org.apache.camel.NoTypeConversionAvailableException;
diff --git a/camel-core/src/main/java/org/apache/camel/util/IntrospectionSupport.java b/camel-core/src/main/java/org/apache/camel/util/IntrospectionSupport.java
index 008c6e1..f1738db 100644
--- a/camel-core/src/main/java/org/apache/camel/util/IntrospectionSupport.java
+++ b/camel-core/src/main/java/org/apache/camel/util/IntrospectionSupport.java
@@ -44,9 +44,9 @@ import org.apache.camel.Component;
 import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.TypeConverter;
 import org.apache.camel.component.properties.PropertiesComponent;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-
 import static org.apache.camel.util.ObjectHelper.isAssignableFrom;
 
 /**
@@ -133,8 +133,8 @@ public final class IntrospectionSupport {
      */
     public static void stop() {
         if (LOG.isDebugEnabled() && CACHE instanceof LRUCache) {
-            LRUCache CACHE = (LRUCache) IntrospectionSupport.CACHE;
-            LOG.debug("Clearing cache[size={}, hits={}, misses={}, evicted={}]", new Object[]{CACHE.size(), CACHE.getHits(), CACHE.getMisses(), CACHE.getEvicted()});
+            LRUCache localCache = (LRUCache) IntrospectionSupport.CACHE;
+            LOG.debug("Clearing cache[size={}, hits={}, misses={}, evicted={}]", new Object[]{localCache.size(), localCache.getHits(), localCache.getMisses(), localCache.getEvicted()});
         }
         CACHE.clear();
 
diff --git a/camel-core/src/main/java/org/apache/camel/util/Scanner.java b/camel-core/src/main/java/org/apache/camel/util/Scanner.java
index a042c57..43919eb 100644
--- a/camel-core/src/main/java/org/apache/camel/util/Scanner.java
+++ b/camel-core/src/main/java/org/apache/camel/util/Scanner.java
@@ -55,11 +55,11 @@ public final class Scanner implements Iterator<String>, Closeable {
     private Matcher matcher;
     private CharBuffer buf;
     private int position;
-    private boolean inputExhausted = false;
-    private boolean needInput = false;
-    private boolean skipped = false;
+    private boolean inputExhausted;
+    private boolean needInput;
+    private boolean skipped;
     private int savedPosition = -1;
-    private boolean closed = false;
+    private boolean closed;
     private IOException lastIOException;
 
     public Scanner(InputStream source, String charsetName, String pattern) {
@@ -194,7 +194,7 @@ public final class Scanner implements Iterator<String>, Closeable {
 
     private void throwFor() {
         skipped = false;
-        if ((inputExhausted) && (position == buf.limit())) {
+        if (inputExhausted && position == buf.limit()) {
             throw new NoSuchElementException();
         } else {
             throw new InputMismatchException();
diff --git a/camel-core/src/main/java/org/apache/camel/util/SkipIterator.java b/camel-core/src/main/java/org/apache/camel/util/SkipIterator.java
index 9c9b622..dc43715 100644
--- a/camel-core/src/main/java/org/apache/camel/util/SkipIterator.java
+++ b/camel-core/src/main/java/org/apache/camel/util/SkipIterator.java
@@ -19,7 +19,6 @@ package org.apache.camel.util;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Iterator;
-import org.apache.camel.util.Scanner;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.camel.CamelContext;
diff --git a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java
index 09f1160..9a39781 100644
--- a/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java
+++ b/camel-core/src/test/java/org/apache/camel/processor/ThrottlingGroupingTest.java
@@ -18,14 +18,21 @@ package org.apache.camel.processor;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
 
 /**
  * @version
  */
 public class ThrottlingGroupingTest extends ContextTestSupport {
+    private static final int INTERVAL = 500;
+    private static final int MESSAGE_COUNT = 9;
+    private static final int TOLERANCE = 50;
 
     public void testGroupingWithSingleConstant() throws Exception {
         getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Bye World");
@@ -60,6 +67,127 @@ public class ThrottlingGroupingTest extends ContextTestSupport {
         
         assertMockEndpointsSatisfied();
     }
+    
+    public void testSendLotsOfMessagesButOnly3GetThroughWithin2Seconds() throws Exception {
+
+        MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:gresult", MockEndpoint.class);
+        resultEndpoint.expectedMessageCount(3);
+        resultEndpoint.setResultWaitTime(2000);
+
+        Map<String, Object> headers = new HashMap<String, Object>();
+        for (int i = 0; i < 9; i++) {
+            if (i % 2 == 0) {
+                headers.put("key", "1");
+            } else {
+                headers.put("key", "2");
+            }
+            template.sendBodyAndHeaders("seda:ga", "<message>" + i + "</message>", headers);
+        }
+
+        // lets pause to give the requests time to be processed
+        // to check that the throttle really does kick in
+        resultEndpoint.assertIsSatisfied();
+    }
+    
+    private void assertThrottlerTiming(final long elapsedTimeMs, final int throttle, final int intervalMs, final int messageCount) {
+        // now assert that they have actually been throttled (use +/- 50 as slack)
+        long minimum = calculateMinimum(intervalMs, throttle, messageCount) - 50;
+        long maximum = calculateMaximum(intervalMs, throttle, messageCount) + 50;
+        // add 500 in case running on slow CI boxes
+        maximum += 500;
+        log.info("Sent {} exchanges in {}ms, with throttle rate of {} per {}ms. Calculated min {}ms and max {}ms", new Object[]{messageCount, elapsedTimeMs, throttle, intervalMs, minimum, maximum});
+
+        assertTrue("Should take at least " + minimum + "ms, was: " + elapsedTimeMs, elapsedTimeMs >= minimum);
+        assertTrue("Should take at most " + maximum + "ms, was: " + elapsedTimeMs, elapsedTimeMs <= maximum + TOLERANCE);
+    }
+
+    private long sendMessagesAndAwaitDelivery(final int messageCount, final String endpointUri, final int threadPoolSize, final MockEndpoint receivingEndpoint) throws InterruptedException {
+        ExecutorService executor = Executors.newFixedThreadPool(threadPoolSize);
+        try {
+            if (receivingEndpoint != null) {
+                receivingEndpoint.expectedMessageCount(messageCount);
+            }
+
+            long start = System.nanoTime();
+            for (int i = 0; i < messageCount; i++) {
+                executor.execute(new Runnable() {
+                    public void run() {
+                        Map<String, Object> headers = new HashMap<String, Object>();
+                        if (messageCount % 2 == 0) {
+                            headers.put("key", "1");
+                        } else {
+                            headers.put("key", "2");
+                        }
+                        template.sendBodyAndHeaders(endpointUri, "<message>payload</message>", headers);
+                    }
+                });
+            }
+
+            // let's wait for the exchanges to arrive
+            if (receivingEndpoint != null) {
+                receivingEndpoint.assertIsSatisfied();
+            }
+            return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+        } finally {
+            executor.shutdownNow();
+        }
+    }
+    
+    public void testSendLotsOfMessagesSimultaneouslyButOnlyGetThroughAsConstantThrottleValue() throws Exception {
+        MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:gresult", MockEndpoint.class);
+        long elapsed = sendMessagesAndAwaitDelivery(MESSAGE_COUNT, "direct:ga", MESSAGE_COUNT, resultEndpoint);
+        assertThrottlerTiming(elapsed, 5, INTERVAL, MESSAGE_COUNT);
+    }
+    
+    public void testConfigurationWithHeaderExpression() throws Exception {
+        MockEndpoint resultEndpoint = resolveMandatoryEndpoint("mock:gresult", MockEndpoint.class);
+        resultEndpoint.expectedMessageCount(MESSAGE_COUNT);
+
+        ExecutorService executor = Executors.newFixedThreadPool(MESSAGE_COUNT);
+        try {
+            sendMessagesWithHeaderExpression(executor, resultEndpoint, 5, INTERVAL, MESSAGE_COUNT);
+        } finally {
+            executor.shutdownNow();
+        }
+    }
+    
+    private long calculateMinimum(final long periodMs, final long throttleRate, final long messageCount) {
+        if (messageCount % throttleRate > 0) {
+            return (long) Math.floor((double)messageCount / (double)throttleRate) * periodMs;
+        } else {
+            return (long) (Math.floor((double)messageCount / (double)throttleRate) * periodMs) - periodMs;
+        }
+    }
+
+    private long calculateMaximum(final long periodMs, final long throttleRate, final long messageCount) {
+        return ((long)Math.ceil((double)messageCount / (double)throttleRate)) * periodMs;
+    }
+    
+    private void sendMessagesWithHeaderExpression(final ExecutorService executor, final MockEndpoint resultEndpoint, final int throttle, final int intervalMs, final int messageCount)
+            throws InterruptedException {
+        resultEndpoint.expectedMessageCount(messageCount);
+
+        long start = System.nanoTime();
+        for (int i = 0; i < messageCount; i++) {
+            executor.execute(new Runnable() {
+                public void run() {
+                    Map<String, Object> headers = new HashMap<String, Object>();
+                    headers.put("throttleValue", throttle);
+                    if (messageCount % 2 == 0) {
+                        headers.put("key", "1");
+                    } else {
+                        headers.put("key", "2");
+                    }
+                    template.sendBodyAndHeaders("direct:gexpressionHeader", "<message>payload</message>", headers);
+                }
+            });
+        }
+
+        // let's wait for the exchanges to arrive
+        resultEndpoint.assertIsSatisfied();
+        long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+        assertThrottlerTiming(elapsed, throttle, intervalMs, messageCount);
+    }
 
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
@@ -71,6 +199,12 @@ public class ThrottlingGroupingTest extends ContextTestSupport {
                 from("seda:a").throttle(header("max"), 1).to("mock:result");
                 from("seda:b").throttle(header("max"), 2).to("mock:result2");
                 from("seda:c").throttle(header("max"), header("key")).to("mock:resultdynamic");
+                
+                from("seda:ga").throttle(constant(3), header("key")).timePeriodMillis(1000).to("log:gresult", "mock:gresult");
+                
+                from("direct:ga").throttle(constant(5), header("key")).timePeriodMillis(INTERVAL).to("log:gresult", "mock:gresult");
+                
+                from("direct:gexpressionHeader").throttle(header("throttleValue"), header("key")).timePeriodMillis(INTERVAL).to("log:gresult", "mock:gresult");
             }
         };
     }
diff --git a/camel-core/src/test/java/org/apache/camel/util/GroupTokenIteratorTest.java b/camel-core/src/test/java/org/apache/camel/util/GroupTokenIteratorTest.java
index dfff48c..a36bf1f 100644
--- a/camel-core/src/test/java/org/apache/camel/util/GroupTokenIteratorTest.java
+++ b/camel-core/src/test/java/org/apache/camel/util/GroupTokenIteratorTest.java
@@ -19,7 +19,6 @@ package org.apache.camel.util;
 import java.io.ByteArrayInputStream;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
-import org.apache.camel.util.Scanner;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.Exchange;
diff --git a/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringThrottlerGroupingTest.java b/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringThrottlerGroupingTest.java
index e321838..7d82398 100644
--- a/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringThrottlerGroupingTest.java
+++ b/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringThrottlerGroupingTest.java
@@ -18,8 +18,6 @@ package org.apache.camel.spring.processor;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.processor.ThrottlingGroupingTest;
-import org.junit.Ignore;
-
 import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
 
 public class SpringThrottlerGroupingTest extends ThrottlingGroupingTest {
diff --git a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ThrottlerGroupingTest.xml b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ThrottlerGroupingTest.xml
index c3019cd..d383782 100644
--- a/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ThrottlerGroupingTest.xml
+++ b/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/ThrottlerGroupingTest.xml
@@ -60,6 +60,45 @@
         <to uri="mock:resultdynamic"/>
       </throttle>
     </route>
+    
+    <route errorHandlerRef="dlc">
+      <from uri="seda:ga"/>
+      <!-- throttle 3 messages per 1 sec -->
+      <throttle timePeriodMillis="1000">
+        <constant>3</constant>
+        <correlationExpression>
+          <header>key</header>
+        </correlationExpression>
+        <to uri="log:gresult"/>
+        <to uri="mock:gresult"/>
+      </throttle>
+    </route>
+    
+    <route errorHandlerRef="dlc">
+      <from uri="direct:ga"/>
+      <!-- throttle 5 messages per 0.5 sec -->
+      <throttle timePeriodMillis="500">
+        <constant>5</constant>
+        <correlationExpression>
+          <header>key</header>
+        </correlationExpression>
+        <to uri="log:gresult"/>
+        <to uri="mock:gresult"/>
+      </throttle>
+    </route>
+    
+    <route errorHandlerRef="dlc">
+      <from uri="direct:gexpressionHeader"/>
+      <throttle timePeriodMillis="500">
+        <!-- use a header to determine how many messages to throttle per 0.5 sec -->
+        <header>throttleValue</header>
+        <correlationExpression>
+          <header>key</header>
+        </correlationExpression>
+        <to uri="log:gresult"/>
+        <to uri="mock:gresult"/>
+      </throttle>
+    </route>
 
   </camelContext>