This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch exchange-factory
in repository
https://gitbox.apache.org/repos/asf/camel.gitThe following commit(s) were added to refs/heads/exchange-factory by this push:
new a69e747 CAMEL-16222: PooledExchangeFactory experiment
a69e747 is described below
commit a69e747a2c42d720c9c447b778720ad58b36aba9
Author: Claus Ibsen <
[hidden email]>
AuthorDate: Tue Feb 23 08:37:34 2021 +0100
CAMEL-16222: PooledExchangeFactory experiment
---
.../org/apache/camel/component/rabbitmq/RabbitConsumer.java | 11 ++++++-----
.../org/apache/camel/component/rabbitmq/RabbitMQConsumer.java | 9 +++++++++
2 files changed, 15 insertions(+), 5 deletions(-)
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
index f1bd416..4ffb480 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitConsumer.java
@@ -69,7 +69,7 @@ class RabbitConsumer extends ServiceSupport implements com.rabbitmq.client.Consu
lock.acquire();
}
// Channel might be open because while we were waiting for the lock,
- // stop() has been succesfully called.
+ // stop() has been successfully called.
if (!channel.isOpen()) {
// we could not open the channel so release the lock
if (!consumer.getEndpoint().isAutoAck()) {
@@ -78,12 +78,15 @@ class RabbitConsumer extends ServiceSupport implements com.rabbitmq.client.Consu
return;
}
+ Exchange exchange = consumer.createExchange(envelope, properties, body);
try {
- doHandleDelivery(consumerTag, envelope, properties, body);
+ consumer.getEndpoint().getMessageConverter().mergeAmqpProperties(exchange, properties);
+ doHandleDelivery(exchange, envelope, properties);
} finally {
if (!consumer.getEndpoint().isAutoAck()) {
lock.release();
}
+ consumer.releaseExchange(exchange, false);
}
} catch (InterruptedException e) {
@@ -91,10 +94,8 @@ class RabbitConsumer extends ServiceSupport implements com.rabbitmq.client.Consu
}
}
- public void doHandleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
+ public void doHandleDelivery(Exchange exchange, Envelope envelope, AMQP.BasicProperties properties)
throws IOException {
- Exchange exchange = consumer.getEndpoint().createRabbitExchange(envelope, properties, body);
- consumer.getEndpoint().getMessageConverter().mergeAmqpProperties(exchange, properties);
boolean sendReply = properties.getReplyTo() != null;
if (sendReply && !exchange.getPattern().isOutCapable()) {
diff --git a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
index bda9900..1e85c18 100644
--- a/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
+++ b/components/camel-rabbitmq/src/main/java/org/apache/camel/component/rabbitmq/RabbitMQConsumer.java
@@ -24,7 +24,10 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
+import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Connection;
+import com.rabbitmq.client.Envelope;
+import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.Suspendable;
import org.apache.camel.support.DefaultConsumer;
@@ -126,6 +129,12 @@ public class RabbitMQConsumer extends DefaultConsumer implements Suspendable {
this.consumers.add(consumer);
}
+ public Exchange createExchange(Envelope envelope, AMQP.BasicProperties properties, byte[] body) {
+ Exchange exchange = createExchange(false);
+ endpoint.getMessageConverter().populateRabbitExchange(exchange, envelope, properties, body, false, endpoint.isAllowMessageBodySerialization());
+ return exchange;
+ }
+
private synchronized void reconnect() {
if (startConsumerCallable != null) {
return;