svn commit: r802532 - /camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

svn commit: r802532 - /camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java

davsclaus-2
Author: davsclaus
Date: Sun Aug  9 12:36:40 2009
New Revision: 802532

URL: http://svn.apache.org/viewvc?rev=802532&view=rev
Log:
CAMEL-1896: data set using throughput logger for both sender and receiver

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java?rev=802532&r1=802531&r2=802532&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/component/dataset/DataSetEndpoint.java Sun Aug  9 12:36:40 2009
@@ -25,6 +25,7 @@
 import org.apache.camel.Processor;
 import org.apache.camel.Service;
 import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.ThroughputLogger;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.commons.logging.Log;
@@ -36,26 +37,29 @@
  * @version $Revision$
  */
 public class DataSetEndpoint extends MockEndpoint implements Service {
-    private static final transient Log LOG = LogFactory.getLog(DataSetEndpoint.class);
+    private final transient Log log;
     private DataSet dataSet;
     private AtomicInteger receivedCounter = new AtomicInteger();
     private int minRate;
     private long produceDelay;
     private long consumeDelay;
-    private long startTime;
     private long preloadSize;
+    private Processor reporter;
 
     public DataSetEndpoint() {
+        this.log = LogFactory.getLog(DataSetEndpoint.class);
     }
 
     public DataSetEndpoint(String endpointUri, Component component, DataSet dataSet) {
         super(endpointUri, component);
         this.dataSet = dataSet;
+        this.log = LogFactory.getLog(endpointUri);
     }
 
     public DataSetEndpoint(String endpointUri, DataSet dataSet) {
         super(endpointUri);
         this.dataSet = dataSet;
+        this.log = LogFactory.getLog(endpointUri);
     }
 
     public static void assertEquals(String description, Object expected, Object actual, Exchange exchange) {
@@ -159,44 +163,40 @@
         this.produceDelay = produceDelay;
     }
 
+    /**
+     * Sets a custom progress reporter
+     */
+    public void setReporter(Processor reporter) {
+        this.reporter = reporter;
+    }
+
+
     // Implementation methods
     //-------------------------------------------------------------------------
 
     @Override
     protected void performAssertions(Exchange actual) throws Exception {
-        if (startTime == 0) {
-            startTime = System.currentTimeMillis();
-        }
         int receivedCount = receivedCounter.incrementAndGet();
         long index = receivedCount - 1;
         Exchange expected = createExchange(index);
 
         // now lets assert that they are the same
-        if (LOG.isDebugEnabled()) {
+        if (log.isDebugEnabled()) {
             Integer dsi = actual.getIn().getHeader(Exchange.DATASET_INDEX, Integer.class);
-            LOG.debug("Received message: " + index + " (DataSet index=" + dsi + ") = " + actual);
+            log.debug("Received message: " + index + " (DataSet index=" + dsi + ") = " + actual);
         }
 
         assertMessageExpected(index, expected, actual);
 
-        if (consumeDelay > 0) {
-            Thread.sleep(consumeDelay);
+        if (reporter != null) {
+            reporter.process(actual);
         }
 
-        if (receivedCount % getDataSet().getReportCount() == 0) {
-            reportProgress(actual, receivedCount);
+        if (consumeDelay > 0) {
+            Thread.sleep(consumeDelay);
         }
     }
 
-    protected void reportProgress(Exchange actual, int receivedCount) {
-        long time = System.currentTimeMillis();
-        long elapsed = time - startTime;
-        startTime = time;
-
-        LOG.info("Received: " + receivedCount + " messages so far. Last group of "
-            + getDataSet().getReportCount() + " took: " + elapsed + " millis");
-    }
-
     protected void assertMessageExpected(long index, Exchange expected, Exchange actual) throws Exception {
         long actualCounter = ExchangeHelper.getMandatoryHeader(actual, Exchange.DATASET_INDEX, Long.class);
         assertEquals("Header: " + Exchange.DATASET_INDEX, index, actualCounter, actual);
@@ -204,13 +204,23 @@
         getDataSet().assertMessageExpected(this, expected, actual, index);
     }
 
+    protected ThroughputLogger createReporter() {
+        ThroughputLogger answer = new ThroughputLogger(this.getEndpointUri(), (int) this.getDataSet().getReportCount());
+        answer.setAction("Received");
+        return answer;
+    }
+
     public void start() throws Exception {
         long size = getDataSet().getSize();
         expectedMessageCount((int) size);
-        LOG.info("Start: " + this + " expecting " + size + " messages");
+        if (reporter == null) {
+            reporter = createReporter();
+        }
+        log.info("Start: " + this + " expecting " + size + " messages");
     }
 
     public void stop() throws Exception {
-        LOG.info("Stop: " + this);
+        log.info("Stop: " + this);
     }
+
 }