svn commit: r1057086 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/model/ camel-core/src/main/java/org/apache/camel/processor/ camel-core/src/test/java/org/apache/camel/processor/ components/camel-spring/src/test/resources/org/apache/cam...

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

svn commit: r1057086 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/model/ camel-core/src/main/java/org/apache/camel/processor/ camel-core/src/test/java/org/apache/camel/processor/ components/camel-spring/src/test/resources/org/apache/cam...

ningjiang
Author: ningjiang
Date: Mon Jan 10 05:24:23 2011
New Revision: 1057086

URL: http://svn.apache.org/viewvc?rev=1057086&view=rev
Log:
CAMEL-3514 allow sampling throttler to sample based on message frequency

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/model/SamplingDefinition.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SamplingThrottler.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SamplingThrottlerTest.java
    camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/samplingThrottler.xml

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java?rev=1057086&r1=1057085&r2=1057086&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/ProcessorDefinition.java Mon Jan 10 05:24:23 2011
@@ -1538,6 +1538,23 @@ public abstract class ProcessorDefinitio
     }
 
     /**
+     * <a href="http://camel.apache.org/sampling.html">Sampling Throttler</a>
+     * Creates a sampling throttler allowing you to extract a sample of exchanges
+     * from the traffic through a route. It is configured with a sampling message frequency
+     * during which only a single exchange is allowed to pass through.
+     * All other exchanges will be stopped.
+     *
+     * @param messageFrequency this is the sample message frequency, only one exchange is
+     *              allowed through for this many messages received
+     * @return the builder
+     */
+    public SamplingDefinition sample(long messageFrequency) {
+        SamplingDefinition answer = new SamplingDefinition(messageFrequency);
+        addOutput(answer);
+        return answer;
+    }
+
+    /**
      * <a href="http://camel.apache.org/splitter.html">Splitter EIP:</a>
      * Creates a splitter allowing you split a message into a number of pieces and process them individually.
      * <p>

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/model/SamplingDefinition.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/model/SamplingDefinition.java?rev=1057086&r1=1057085&r2=1057086&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/model/SamplingDefinition.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/model/SamplingDefinition.java Mon Jan 10 05:24:23 2011
@@ -43,6 +43,9 @@ public class SamplingDefinition extends
     private Long samplePeriod;
 
     @XmlAttribute()
+    private Long messageFrequency;
+
+    @XmlAttribute()
     @XmlJavaTypeAdapter(TimeUnitAdapter.class)
     private TimeUnit units;
 
@@ -54,9 +57,17 @@ public class SamplingDefinition extends
         this.units = units;
     }
 
+    public SamplingDefinition(long messageFrequency) {
+        this.messageFrequency = messageFrequency;
+    }
+    
     @Override
     public String toString() {
-        return "Sample[1 Exchange per " + getSamplePeriod() + " " + getUnits().toString().toLowerCase() + " -> " + getOutputs() + "]";
+        if (messageFrequency != null) {
+            return "Sample[1 Exchange per " + getMessageFrequency() + " messages received -> " + getOutputs() + "]";
+        } else {
+            return "Sample[1 Exchange per " + getSamplePeriod() + " " + getUnits().toString().toLowerCase() + " -> " + getOutputs() + "]";
+        }
     }
 
     @Override
@@ -66,23 +77,43 @@ public class SamplingDefinition extends
 
     @Override
     public String getLabel() {
-        return "sample[1 Exchange per " + getSamplePeriod() + " " + getUnits().toString().toLowerCase() + "]";
+        if (messageFrequency != null) {
+            return "sample[1 Exchange per " + getMessageFrequency() + " messages received]";
+        } else {
+            return "sample[1 Exchange per " + getSamplePeriod() + " " + getUnits().toString().toLowerCase() + "]";
+        }
     }
 
     @Override
     public Processor createProcessor(RouteContext routeContext) throws Exception {
         Processor childProcessor = this.createChildProcessor(routeContext, true);
-        // should default be 1 sample period
-        long time = getSamplePeriod() != null ? getSamplePeriod() : 1L;
-        // should default be in seconds
-        TimeUnit tu = getUnits() != null ? getUnits() : TimeUnit.SECONDS;
-        return new SamplingThrottler(childProcessor, time, tu);
+        
+        if (messageFrequency != null) {
+            return new SamplingThrottler(childProcessor, messageFrequency);
+        } else {
+            // should default be 1 sample period
+            long time = getSamplePeriod() != null ? getSamplePeriod() : 1L;
+            // should default be in seconds
+            TimeUnit tu = getUnits() != null ? getUnits() : TimeUnit.SECONDS;
+            return new SamplingThrottler(childProcessor, time, tu);
+        }
     }
 
     // Fluent API
     // -------------------------------------------------------------------------
 
     /**
+     * Sets the sample message count which only a single {@link org.apache.camel.Exchange} will pass through after this many received.
+     *
+     * @param messageFrequency
+     * @return the builder
+     */
+    public SamplingDefinition sampleMessageFrequency(long messageFrequency) {
+        setMessageFrequency(messageFrequency);
+        return this;
+    }
+    
+    /**
      * Sets the sample period during which only a single {@link org.apache.camel.Exchange} will pass through.
      *
      * @param samplePeriod the period
@@ -115,6 +146,14 @@ public class SamplingDefinition extends
         this.samplePeriod = samplePeriod;
     }
 
+    public Long getMessageFrequency() {
+        return messageFrequency;
+    }
+
+    public void setMessageFrequency(Long messageFrequency) {
+        this.messageFrequency = messageFrequency;
+    }
+    
     public void setUnits(String units) {
         this.units = TimeUnit.valueOf(units);
     }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SamplingThrottler.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SamplingThrottler.java?rev=1057086&r1=1057085&r2=1057086&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SamplingThrottler.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/SamplingThrottler.java Mon Jan 10 05:24:23 2011
@@ -40,6 +40,8 @@ import org.apache.commons.logging.LogFac
 public class SamplingThrottler extends DelegateAsyncProcessor {
 
     protected final transient Log log = LogFactory.getLog(getClass());
+    private long messageFrequency;
+    private long currentMessageCount;
     private long samplePeriod;
     private long periodInNanos;
     private TimeUnit units;
@@ -48,6 +50,15 @@ public class SamplingThrottler extends D
     private Object calculationLock = new Object();
     private SampleStats sampled = new SampleStats();
 
+    public SamplingThrottler(Processor processor, long messageFrequency) {
+        super(processor);
+
+        if (messageFrequency <= 0) {
+            throw new IllegalArgumentException("A positive value is required for the sampling message frequency");
+        }
+        this.messageFrequency = messageFrequency;
+    }
+
     public SamplingThrottler(Processor processor, long samplePeriod, TimeUnit units) {
         super(processor);
 
@@ -64,11 +75,19 @@ public class SamplingThrottler extends D
 
     @Override
     public String toString() {
-        return "SamplingThrottler[1 exchange per: " + samplePeriod + " " + units.toString().toLowerCase() + " -> " + getProcessor() + "]";
+        if (messageFrequency > 0) {
+            return "SamplingThrottler[1 exchange per: " + messageFrequency + " messages received -> " + getProcessor() + "]";
+        } else {
+            return "SamplingThrottler[1 exchange per: " + samplePeriod + " " + units.toString().toLowerCase() + " -> " + getProcessor() + "]";
+        }
     }
 
     public String getTraceLabel() {
-        return "samplingThrottler[1 exchange per: " + samplePeriod + " " + units.toString().toLowerCase() + "]";
+        if (messageFrequency > 0) {
+            return "samplingThrottler[1 exchange per: " + messageFrequency + " messages received]";
+        } else {
+            return "samplingThrottler[1 exchange per: " + samplePeriod + " " + units.toString().toLowerCase() + "]";
+        }
     }
 
     @Override
@@ -76,16 +95,24 @@ public class SamplingThrottler extends D
         boolean doSend = false;
 
         synchronized (calculationLock) {
-            long now = System.nanoTime();
-            if (now >= timeOfLastExchange + periodInNanos) {
-                doSend = true;
-                if (log.isTraceEnabled()) {
-                    log.trace(sampled.sample());
+            
+            if (messageFrequency > 0) {
+                currentMessageCount++;
+                if (currentMessageCount % messageFrequency == 0) {
+                    doSend = true;
                 }
-                timeOfLastExchange = now;
             } else {
-                if (log.isTraceEnabled()) {
-                    log.trace(sampled.drop());
+                long now = System.nanoTime();
+                if (now >= timeOfLastExchange + periodInNanos) {
+                    doSend = true;
+                    if (log.isTraceEnabled()) {
+                        log.trace(sampled.sample());
+                    }
+                    timeOfLastExchange = now;
+                } else {
+                    if (log.isTraceEnabled()) {
+                        log.trace(sampled.drop());
+                    }
                 }
             }
         }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SamplingThrottlerTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SamplingThrottlerTest.java?rev=1057086&r1=1057085&r2=1057086&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SamplingThrottlerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SamplingThrottlerTest.java Mon Jan 10 05:24:23 2011
@@ -87,6 +87,32 @@ public class SamplingThrottlerTest exten
         mock.assertIsSatisfied();
     }
 
+    public void testSamplingUsingmessageFrequency() throws Exception {
+        long totalMessages = 100;
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(10);
+        mock.setResultWaitTime(100);
+
+        for (int i = 0; i < totalMessages; i++) {
+            template.sendBody("direct:sample-messageFrequency", "<message>" + i + "</message>");
+        }
+        
+        mock.assertIsSatisfied();
+    }
+    
+    public void testSamplingUsingmessageFrequencyViaDSL() throws Exception {
+        long totalMessages = 50;
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(10);
+        mock.setResultWaitTime(100);
+
+        for (int i = 0; i < totalMessages; i++) {
+            template.sendBody("direct:sample-messageFrequency-via-dsl", "<message>" + i + "</message>");
+        }
+        
+        mock.assertIsSatisfied();
+    }
+    
     private void sendExchangesThroughDroppingThrottler(List<Exchange> sentExchanges, int messages) throws Exception {
         ProducerTemplate myTemplate = context.createProducerTemplate();
 
@@ -130,6 +156,15 @@ public class SamplingThrottlerTest exten
                 from("direct:sample-configured-via-dsl")
                     .sample().samplePeriod(1).timeUnits(TimeUnit.SECONDS)
                     .to("mock:result");
+                
+                from("direct:sample-messageFrequency")
+                    .sample(10)
+                    .to("mock:result");
+                
+                from("direct:sample-messageFrequency-via-dsl")
+                    .sample().sampleMessageFrequency(5)
+                    .to("mock:result");
+
                 // END SNIPPET: e1
             }
         };

Modified: camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/samplingThrottler.xml
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/samplingThrottler.xml?rev=1057086&r1=1057085&r2=1057086&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/samplingThrottler.xml (original)
+++ camel/trunk/components/camel-spring/src/test/resources/org/apache/camel/spring/processor/samplingThrottler.xml Mon Jan 10 05:24:23 2011
@@ -21,7 +21,6 @@
        http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
        http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd
     ">
-
     <camelContext xmlns="http://camel.apache.org/schema/spring">
         <!-- START SNIPPET: e1 -->
         <route>
@@ -29,6 +28,18 @@
             <sample samplePeriod="1" units="seconds">
                 <to uri="mock:result"/>
             </sample>
+        </route>            
+        <route>
+         <from uri="direct:sample-messageFrequency"/>
+            <sample messageFrequency="10">
+                <to uri="mock:result"/>
+            </sample>
+        </route>
+        <route>
+            <from uri="direct:sample-messageFrequency-via-dsl"/>
+            <sample messageFrequency="10">
+                <to uri="mock:result"/>
+            </sample>
         </route>
         <!-- END SNIPPET: e1 -->