Messages remain unacknowledged when exception thrown during RabbitMQ InOut

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view

Messages remain unacknowledged when exception thrown during RabbitMQ InOut

E Wong

We’re using the new rabbitMQ InOut feature in camel version 2.16.0
(with rabbitMQ v3.5.5) and have set up a flow like the following:

Producer -> route1 -> rabbitMQ -> route2 -> rabbitMQ -> route3 -> service bean

The service bean will return a response which the caller can choose
whether or not to be interested in, so essentially we want the flow to
be able to work asynchronously and synchronously.  Ideally we’d like
to reuse the same routes, where the caller can just indicate whether
it cares about a response by setting the exchange pattern to InOnly or

The rabbitMQ to and from endpoints are configured like the following:

/* route1 */
from( "direct:route1" )
    .to( "rabbitmq://localhost:5672/testExchange?exchangeType=topic&autoDelete=false&declare=false"

/* route2 */
from( "rabbitmq://localhost:5672/testExchange?"
        "&" +
        "&autoAck=false&transferException=true" )
    .throwException( new RuntimeException( "test exception" ) );

We have set autoAck=false so that, for an InOnly exchange, if there
are any exceptions, the message will be rejected and pushed to a
deadletter queue.

However for an InOut exchange, if an exception is thrown, given we
have autoAck=false and transferException=true, the original message
will remain unacknowledged forever on rabbitMQ, even though the caller
will receive and handle the exception. If the server is restarted, the
application will attempt to reprocess the message.

We would like to propose a patch to include a basicAck in the rabbitMQ
consumer in this scenario.  A basicAck seems more appropriate (rather
than a rejection) since the caller would be able to handle the
exception and therefore there should be no need to send the message to
a deadletter queue.

//In the RabbitMQConsumer
else if (endpoint.isTransferException() &&
exchange.getPattern().isOutCapable()) {
    // the inOut exchange failed so put the exception in the body and send back
    endpoint.publishExchangeToChannel(exchange, channel,

    //Add something like this
    if (!consumer.endpoint.isAutoAck()) {
        channel.basicAck(deliveryTag, false);

Would a solution like this be acceptable or was there a specific
reason why no ack or reject was included in this scenario?