[camel] branch exchange-factory updated (0dc3992 -> 0245ed7)

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

[camel] branch exchange-factory updated (0dc3992 -> 0245ed7)

davsclaus-2
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a change to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from 0dc3992  CAMEL-16222: PooledExchangeFactory experiment
     new 23172db  CAMEL-16222: PooledExchangeFactory experiment
     new 0245ed7  CAMEL-16222: PooledExchangeFactory experiment

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/camel/component/sjms/SjmsEndpoint.java  |  8 +++---
 .../apache/camel/component/sjms/SjmsMessage.java   |  8 ++++++
 .../sjms/consumer/EndpointMessageListener.java     | 29 ++++++++++++++++++++--
 .../sjms/consumer/InOnlyConsumerQueueTest.java     | 11 ++++++++
 ...toryTest.java => InOnlyPooledExchangeTest.java} | 25 ++++++++++++-------
 .../java/org/apache/camel/ExtendedExchange.java    |  8 ++++++
 .../org/apache/camel/support/AbstractExchange.java | 12 +++++++++
 7 files changed, 86 insertions(+), 15 deletions(-)
 copy components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/{AutowiredConnectionFactoryTest.java => InOnlyPooledExchangeTest.java} (75%)

Reply | Threaded
Open this post in threaded view
|

[camel] 01/02: CAMEL-16222: PooledExchangeFactory experiment

davsclaus-2
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 23172db0bf5083bc54290dc315f509e881ea6cbf
Author: Claus Ibsen <[hidden email]>
AuthorDate: Tue Feb 23 10:31:55 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../apache/camel/component/sjms/SjmsEndpoint.java    |  8 ++++----
 .../sjms/consumer/EndpointMessageListener.java       | 20 ++++++++++++++++++--
 2 files changed, 22 insertions(+), 6 deletions(-)

diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
index d8e1bee..c65df4f 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsEndpoint.java
@@ -318,13 +318,13 @@ public class SjmsEndpoint extends DefaultEndpoint
 
     @Override
     public Consumer createConsumer(Processor processor) throws Exception {
-        EndpointMessageListener listener = new EndpointMessageListener(this, processor);
-        configureMessageListener(listener);
-
         MessageListenerContainer container = createMessageListenerContainer(this);
+        SjmsConsumer consumer = new SjmsConsumer(this, processor, container);
+
+        EndpointMessageListener listener = new EndpointMessageListener(consumer, this, processor);
+        configureMessageListener(listener);
         container.setMessageListener(listener);
 
-        SjmsConsumer consumer = new SjmsConsumer(this, processor, container);
         configureConsumer(consumer);
         return consumer;
     }
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java
index 8c7b9d3..b3d9f11 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java
@@ -35,7 +35,9 @@ import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.component.sjms.SessionCallback;
 import org.apache.camel.component.sjms.SessionMessageListener;
 import org.apache.camel.component.sjms.SjmsConstants;
+import org.apache.camel.component.sjms.SjmsConsumer;
 import org.apache.camel.component.sjms.SjmsEndpoint;
+import org.apache.camel.component.sjms.SjmsMessage;
 import org.apache.camel.component.sjms.SjmsTemplate;
 import org.apache.camel.component.sjms.jms.JmsMessageHelper;
 import org.apache.camel.support.AsyncProcessorConverterHelper;
@@ -54,6 +56,7 @@ public class EndpointMessageListener implements SessionMessageListener {
 
     private static final Logger LOG = LoggerFactory.getLogger(EndpointMessageListener.class);
 
+    private final SjmsConsumer consumer;
     private final SjmsEndpoint endpoint;
     private final AsyncProcessor processor;
     private Object replyToDestination;
@@ -63,7 +66,8 @@ public class EndpointMessageListener implements SessionMessageListener {
     private String eagerPoisonBody;
     private volatile SjmsTemplate template;
 
-    public EndpointMessageListener(SjmsEndpoint endpoint, Processor processor) {
+    public EndpointMessageListener(SjmsConsumer consumer, SjmsEndpoint endpoint, Processor processor) {
+        this.consumer = consumer;
         this.endpoint = endpoint;
         this.processor = AsyncProcessorConverterHelper.convert(processor);
     }
@@ -207,6 +211,9 @@ public class EndpointMessageListener implements SessionMessageListener {
             // if we failed processed the exchange from the async callback task, then grab the exception
             rce = exchange.getException(RuntimeCamelException.class);
 
+            // the exchange is now done so release it
+            consumer.releaseExchange(exchange, false);
+
         } catch (Exception e) {
             rce = wrapRuntimeCamelException(e);
         }
@@ -234,7 +241,10 @@ public class EndpointMessageListener implements SessionMessageListener {
     }
 
     public Exchange createExchange(Message message, Session session, Object replyDestination) {
-        Exchange exchange = endpoint.createExchange(message, session);
+        Exchange exchange = consumer.createExchange(false);
+        // create a mew SjmsMessage as it cannot be reset for reuse
+        // TODO: optimize to allow reset
+        exchange.setIn(new SjmsMessage(exchange, message, session, endpoint.getBinding()));
 
         // lets set to an InOut if we have some kind of reply-to destination
         if (replyDestination != null && !disableReplyTo) {
@@ -455,6 +465,12 @@ public class EndpointMessageListener implements SessionMessageListener {
                     }
                 }
             }
+
+            // if we completed from async processing then we should release the exchange
+            // the sync processing will release the exchange outside this callback
+            if (!doneSync) {
+                consumer.releaseExchange(exchange, false);
+            }
         }
     }
 

Reply | Threaded
Open this post in threaded view
|

[camel] 02/02: CAMEL-16222: PooledExchangeFactory experiment

davsclaus-2
In reply to this post by davsclaus-2
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch exchange-factory
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 0245ed7f39b95fe5bbc5723b580688c5e2589501
Author: Claus Ibsen <[hidden email]>
AuthorDate: Tue Feb 23 10:46:16 2021 +0100

    CAMEL-16222: PooledExchangeFactory experiment
---
 .../apache/camel/component/sjms/SjmsMessage.java   |  8 ++++++++
 .../sjms/consumer/EndpointMessageListener.java     | 15 +++++++++++---
 .../sjms/consumer/InOnlyConsumerQueueTest.java     | 11 +++++++++++
 ...ueueTest.java => InOnlyPooledExchangeTest.java} | 23 +++++++++++++++++++++-
 .../java/org/apache/camel/ExtendedExchange.java    |  8 ++++++++
 .../org/apache/camel/support/AbstractExchange.java | 12 +++++++++++
 6 files changed, 73 insertions(+), 4 deletions(-)

diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsMessage.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsMessage.java
index 35f066e..2f76d12 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsMessage.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/SjmsMessage.java
@@ -53,6 +53,14 @@ public class SjmsMessage extends DefaultMessage {
     }
 
     @Override
+    public void reset() {
+        super.reset();
+        jmsMessage = null;
+        jmsSession = null;
+        binding = null;
+    }
+
+    @Override
     public String toString() {
         // do not print jmsMessage as there could be sensitive details
         if (jmsMessage != null) {
diff --git a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java
index b3d9f11..b2c9515 100644
--- a/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java
+++ b/components/camel-sjms/src/main/java/org/apache/camel/component/sjms/consumer/EndpointMessageListener.java
@@ -29,6 +29,7 @@ import org.apache.camel.AsyncProcessor;
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.ExchangePattern;
+import org.apache.camel.ExtendedExchange;
 import org.apache.camel.Processor;
 import org.apache.camel.RollbackExchangeException;
 import org.apache.camel.RuntimeCamelException;
@@ -242,9 +243,17 @@ public class EndpointMessageListener implements SessionMessageListener {
 
     public Exchange createExchange(Message message, Session session, Object replyDestination) {
         Exchange exchange = consumer.createExchange(false);
-        // create a mew SjmsMessage as it cannot be reset for reuse
-        // TODO: optimize to allow reset
-        exchange.setIn(new SjmsMessage(exchange, message, session, endpoint.getBinding()));
+
+        // optimize: either create a new SjmsMessage or reuse existing if exists
+        SjmsMessage msg = exchange.adapt(ExtendedExchange.class).getInOrNull(SjmsMessage.class);
+        if (msg == null) {
+            msg = new SjmsMessage(exchange, message, session, endpoint.getBinding());
+            exchange.setIn(msg);
+        } else {
+            msg.setJmsMessage(message);
+            msg.setJmsSession(session);
+            msg.setBinding(endpoint.getBinding());
+        }
 
         // lets set to an InOut if we have some kind of reply-to destination
         if (replyDestination != null && !disableReplyTo) {
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyConsumerQueueTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyConsumerQueueTest.java
index b55fec2..af55fc2 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyConsumerQueueTest.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyConsumerQueueTest.java
@@ -38,6 +38,17 @@ public class InOnlyConsumerQueueTest extends JmsTestSupport {
         mock.assertIsSatisfied();
     }
 
+    @Test
+    public void testTwoSynchronous() throws Exception {
+        MockEndpoint mock = getMockEndpoint(MOCK_RESULT);
+        mock.expectedBodiesReceived("Hello World", "Bye World");
+
+        template.sendBody(SJMS_QUEUE_NAME, "Hello World");
+        template.sendBody(SJMS_QUEUE_NAME, "Bye World");
+
+        mock.assertIsSatisfied();
+    }
+
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
diff --git a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyConsumerQueueTest.java b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyPooledExchangeTest.java
similarity index 69%
copy from components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyConsumerQueueTest.java
copy to components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyPooledExchangeTest.java
index b55fec2..9ddbf05 100644
--- a/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyConsumerQueueTest.java
+++ b/components/camel-sjms/src/test/java/org/apache/camel/component/sjms/consumer/InOnlyPooledExchangeTest.java
@@ -16,16 +16,26 @@
  */
 package org.apache.camel.component.sjms.consumer;
 
+import org.apache.camel.CamelContext;
+import org.apache.camel.ExtendedCamelContext;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.component.sjms.support.JmsTestSupport;
+import org.apache.camel.impl.engine.PooledExchangeFactory;
 import org.junit.jupiter.api.Test;
 
-public class InOnlyConsumerQueueTest extends JmsTestSupport {
+public class InOnlyPooledExchangeTest extends JmsTestSupport {
 
     private static final String SJMS_QUEUE_NAME = "sjms:queue:in.only.consumer.queue";
     private static final String MOCK_RESULT = "mock:result";
 
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        CamelContext context = super.createCamelContext();
+        context.adapt(ExtendedCamelContext.class).setExchangeFactory(new PooledExchangeFactory());
+        return context;
+    }
+
     @Test
     public void testSynchronous() throws Exception {
         final String expectedBody = "Hello World";
@@ -38,6 +48,17 @@ public class InOnlyConsumerQueueTest extends JmsTestSupport {
         mock.assertIsSatisfied();
     }
 
+    @Test
+    public void testTwoSynchronous() throws Exception {
+        MockEndpoint mock = getMockEndpoint(MOCK_RESULT);
+        mock.expectedBodiesReceived("Hello World", "Bye World");
+
+        template.sendBody(SJMS_QUEUE_NAME, "Hello World");
+        template.sendBody(SJMS_QUEUE_NAME, "Bye World");
+
+        mock.assertIsSatisfied();
+    }
+
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
diff --git a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
index bb0b523..7974ca1 100644
--- a/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
+++ b/core/camel-api/src/main/java/org/apache/camel/ExtendedExchange.java
@@ -29,6 +29,14 @@ import org.apache.camel.spi.UnitOfWork;
 public interface ExtendedExchange extends Exchange {
 
     /**
+     * If there is an existing inbound message of the given type then return it as-is, otherwise return null.
+     *
+     * @param  type the given type
+     * @return      the message if exists with the given type, otherwise null.
+     */
+    <T> T getInOrNull(Class<T> type);
+
+    /**
      * Sets the endpoint which originated this message exchange. This method should typically only be called by
      * {@link Endpoint} implementations
      */
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java
index 6e5daa6..e4f1cdb 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/AbstractExchange.java
@@ -347,6 +347,18 @@ class AbstractExchange implements ExtendedExchange {
     }
 
     @Override
+    public <T> T getInOrNull(Class<T> type) {
+        if (in == null) {
+            return null;
+        }
+        if (type.isInstance(in)) {
+            return type.cast(in);
+        }
+
+        return null;
+    }
+
+    @Override
     public void setIn(Message in) {
         this.in = in;
         configureMessage(in);