This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch exchange-factory
in repository
https://gitbox.apache.org/repos/asf/camel.gitThe following commit(s) were added to refs/heads/exchange-factory by this push:
new 95a2dea CAMEL-16222: PooledExchangeFactory experiment
95a2dea is described below
commit 95a2deacb6965a7ae3a5ea873ffd8d75e2063d84
Author: Claus Ibsen <
[hidden email]>
AuthorDate: Tue Feb 23 09:40:19 2021 +0100
CAMEL-16222: PooledExchangeFactory experiment
---
.../component/pulsar/PulsarMessageListener.java | 42 ++++++++--------------
1 file changed, 14 insertions(+), 28 deletions(-)
diff --git a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageListener.java b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageListener.java
index 0607bda..9038c35 100644
--- a/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageListener.java
+++ b/components/camel-pulsar/src/main/java/org/apache/camel/component/pulsar/PulsarMessageListener.java
@@ -16,7 +16,6 @@
*/
package org.apache.camel.component.pulsar;
-import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.component.pulsar.utils.message.PulsarMessageHeaders;
import org.apache.camel.component.pulsar.utils.message.PulsarMessageUtils;
@@ -24,13 +23,9 @@ import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.PulsarClientException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public class PulsarMessageListener implements MessageListener<byte[]> {
- private static final Logger LOGGER = LoggerFactory.getLogger(PulsarMessageListener.class);
-
private final PulsarEndpoint endpoint;
private final PulsarConsumer pulsarConsumer;
@@ -41,33 +36,32 @@ public class PulsarMessageListener implements MessageListener<byte[]> {
@Override
public void received(final Consumer<byte[]> consumer, final Message<byte[]> message) {
- final Exchange exchange = PulsarMessageUtils.updateExchange(message, endpoint.createExchange());
+ final Exchange exchange = PulsarMessageUtils.updateExchange(message, pulsarConsumer.createExchange(false));
- try {
- if (endpoint.getPulsarConfiguration().isAllowManualAcknowledgement()) {
- exchange.getIn().setHeader(PulsarMessageHeaders.MESSAGE_RECEIPT,
- endpoint.getComponent().getPulsarMessageReceiptFactory()
- .newInstance(exchange, message, consumer));
- }
- processAsync(exchange, consumer, message);
- } catch (Exception exception) {
- handleProcessorException(exchange, exception);
+ if (endpoint.getPulsarConfiguration().isAllowManualAcknowledgement()) {
+ exchange.getIn().setHeader(PulsarMessageHeaders.MESSAGE_RECEIPT,
+ endpoint.getComponent().getPulsarMessageReceiptFactory()
+ .newInstance(exchange, message, consumer));
}
+ processAsync(exchange, consumer, message);
}
private void processAsync(final Exchange exchange, final Consumer<byte[]> consumer, final Message<byte[]> message) {
- pulsarConsumer.getAsyncProcessor().process(exchange, new AsyncCallback() {
- @Override
- public void done(boolean doneSync) {
+ pulsarConsumer.getAsyncProcessor().process(exchange, doneSync -> {
+ try {
if (exchange.getException() != null) {
- handleProcessorException(exchange, exchange.getException());
+ pulsarConsumer.getExceptionHandler().handleException("Error processing exchange", exchange,
+ exchange.getException());
} else {
try {
acknowledge(consumer, message);
} catch (Exception e) {
- handleProcessorException(exchange, e);
+ pulsarConsumer.getExceptionHandler().handleException("Error processing exchange", exchange,
+ exchange.getException());
}
}
+ } finally {
+ pulsarConsumer.releaseExchange(exchange, false);
}
});
}
@@ -79,12 +73,4 @@ public class PulsarMessageListener implements MessageListener<byte[]> {
}
}
- private void handleProcessorException(final Exchange exchange, final Exception exception) {
- final Exchange exchangeWithException = PulsarMessageUtils
- .updateExchangeWithException(exception, exchange);
-
- pulsarConsumer.getExceptionHandler()
- .handleException("An error occurred", exchangeWithException, exception);
- }
-
}