Kafka component error handling - consumer keeps leaving and rejoining the group

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Kafka component error handling - consumer keeps leaving and rejoining the group

Joseph M'BIMBI-BENE
Hello everyone,

I'm having a problem with the Kafka component:
When the kafka consumer can't read a message (caused by some avro errors
after investigation), it continuously leaves the group and joins again.

I would like it to just throw an exception and let me decide how to handle
it: dlq, ignore, etc.

I configured the parameter  `bridgeErrorHandler` but ot no avail. The
behaviour is still the same

Am i doing something wrong? Please help. Thank you

----------

Here is the route definition :

@Component
public class CamelConfiguration extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        LocalDateTime now = LocalDateTime.now();
        String kafkaCamelUri = String.format("kafka:cont_hist" +
                        "?brokers={{bootstrap-servers}}" +
                        "&schemaRegistryURL=http://localhost:8081" +
                        "&specificAvroReader=true" +
                        "&bridgeErrorHandler=true" +
                        "&keyDeserializer=%s" +
                        "&valueDeserializer=%s",
                StringDeserializer.class.getName(),
                KafkaAvroDeserializer.class.getName());
        from(kafkaCamelUri)
                .errorHandler(defaultErrorHandler().disableRedelivery())
                .to("log:coucou")
                .to("sql-stored:classpath:procstoc.sql" +
                        "?outputHeader=outError"
                )
                .to("log:output")
                .log("coucou ${headers.outError}");
    }

}

-----------

And here are some log excerpts :

2020-04-26 20:16:53.193  INFO 28096 --- [umer[cont_hist]]
o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.3.1
2020-04-26 20:16:53.193  INFO 28096 --- [umer[cont_hist]]
o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 18a913733fb71c01
2020-04-26 20:16:53.193  INFO 28096 --- [umer[cont_hist]]
o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1587925013193
2020-04-26 20:16:53.194  INFO 28096 --- [umer[cont_hist]]
o.a.camel.component.kafka.KafkaConsumer  : Reconnecting cont_hist-Thread 0
to topic cont_hist after 5000 ms
2020-04-26 20:16:58.194  INFO 28096 --- [umer[cont_hist]]
o.a.camel.component.kafka.KafkaConsumer  : Subscribing cont_hist-Thread 0
to topic cont_hist
2020-04-26 20:16:58.194  INFO 28096 --- [umer[cont_hist]]
o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-10,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Subscribed to topic(s):
cont_hist
2020-04-26 20:16:58.208  INFO 28096 --- [umer[cont_hist]]
org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-10,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Cluster ID:
yyC1KuR2Sv2BVVRNLdTnsg
2020-04-26 20:16:58.209  INFO 28096 --- [umer[cont_hist]]
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Discovered group coordinator
localhost:9092 (id: 2147483646 rack: null)
2020-04-26 20:16:58.210  INFO 28096 --- [umer[cont_hist]]
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-10,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Revoking previously assigned
partitions []
2020-04-26 20:16:58.211  INFO 28096 --- [umer[cont_hist]]
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
2020-04-26 20:16:58.221  INFO 28096 --- [umer[cont_hist]]
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
2020-04-26 20:16:58.229  INFO 28096 --- [umer[cont_hist]]
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Successfully joined group
with generation 19
2020-04-26 20:16:58.232  INFO 28096 --- [umer[cont_hist]]
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-10,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting newly assigned
partitions: cont_hist-0
2020-04-26 20:16:58.236  INFO 28096 --- [umer[cont_hist]]
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-10,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting offset for partition
cont_hist-0 to the committed offset FetchPosition{offset=4,
offsetEpoch=Optional.empty,
currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1 rack: null),
epoch=0}}
2020-04-26 20:16:58.251  INFO 28096 --- [umer[cont_hist]]
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Member
consumer-10-187aed26-5575-4285-a567-3deca28e099c sending LeaveGroup request
to coordinator localhost:9092 (id: 2147483646 rack: null)
2020-04-26 20:16:58.274  INFO 28096 --- [umer[cont_hist]]
o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values:
allow.auto.create.topics = true
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [http://localhost:9092]

[...]

2020-04-26 20:16:58.293  WARN 28096 --- [umer[cont_hist]]
o.a.k.clients.consumer.ConsumerConfig    : The configuration
'sasl.kerberos.principal.to.local.rules' was supplied but isn't a known
config.
2020-04-26 20:16:58.294  INFO 28096 --- [umer[cont_hist]]
o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.3.1
2020-04-26 20:16:58.294  INFO 28096 --- [umer[cont_hist]]
o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 18a913733fb71c01
2020-04-26 20:16:58.294  INFO 28096 --- [umer[cont_hist]]
o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1587925018294
2020-04-26 20:16:58.294  INFO 28096 --- [umer[cont_hist]]
o.a.camel.component.kafka.KafkaConsumer  : Reconnecting cont_hist-Thread 0
to topic cont_hist after 5000 ms
2020-04-26 20:17:03.295  INFO 28096 --- [umer[cont_hist]]
o.a.camel.component.kafka.KafkaConsumer  : Subscribing cont_hist-Thread 0
to topic cont_hist
2020-04-26 20:17:03.295  INFO 28096 --- [umer[cont_hist]]
o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-11,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Subscribed to topic(s):
cont_hist
2020-04-26 20:17:03.305  INFO 28096 --- [umer[cont_hist]]
org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-11,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Cluster ID:
yyC1KuR2Sv2BVVRNLdTnsg
2020-04-26 20:17:03.305  INFO 28096 --- [umer[cont_hist]]
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-11,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Discovered group coordinator
localhost:9092 (id: 2147483646 rack: null)
2020-04-26 20:17:03.306  INFO 28096 --- [umer[cont_hist]]
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-11,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Revoking previously assigned
partitions []
2020-04-26 20:17:03.306  INFO 28096 --- [umer[cont_hist]]
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-11,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
2020-04-26 20:17:03.312  INFO 28096 --- [umer[cont_hist]]
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-11,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
2020-04-26 20:17:03.319  INFO 28096 --- [umer[cont_hist]]
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-11,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Successfully joined group
with generation 21
2020-04-26 20:17:03.320  INFO 28096 --- [umer[cont_hist]]
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-11,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting newly assigned
partitions: cont_hist-0
2020-04-26 20:17:03.324  INFO 28096 --- [umer[cont_hist]]
o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-11,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting offset for partition
cont_hist-0 to the committed offset FetchPosition{offset=4,
offsetEpoch=Optional.empty,
currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1 rack: null),
epoch=0}}
2020-04-26 20:17:03.347  INFO 28096 --- [umer[cont_hist]]
o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-11,
groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Member
consumer-11-936dbb66-cbce-4d6f-a740-b3764822db42 sending LeaveGroup request
to coordinator localhost:9092 (id: 2147483646 rack: null)
2020-04-26 20:17:03.400  INFO 28096 --- [umer[cont_hist]]
o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values:
allow.auto.create.topics = true
Reply | Threaded
Open this post in threaded view
|

Re: Kafka component error handling - consumer keeps leaving and rejoining the group

Joseph M'BIMBI-BENE
I  forgot to tell i am using version 3.2.0

On Sun, 26 Apr 2020 at 20:45, Joseph M'BIMBI-BENE <[hidden email]>
wrote:

> Hello everyone,
>
> I'm having a problem with the Kafka component:
> When the kafka consumer can't read a message (caused by some avro errors
> after investigation), it continuously leaves the group and joins again.
>
> I would like it to just throw an exception and let me decide how to handle
> it: dlq, ignore, etc.
>
> I configured the parameter  `bridgeErrorHandler` but ot no avail. The
> behaviour is still the same
>
> Am i doing something wrong? Please help. Thank you
>
> ----------
>
> Here is the route definition :
>
> @Component
> public class CamelConfiguration extends RouteBuilder {
>
>     @Override
>     public void configure() throws Exception {
>         LocalDateTime now = LocalDateTime.now();
>         String kafkaCamelUri = String.format("kafka:cont_hist" +
>                         "?brokers={{bootstrap-servers}}" +
>                         "&schemaRegistryURL=http://localhost:8081" +
>                         "&specificAvroReader=true" +
>                         "&bridgeErrorHandler=true" +
>                         "&keyDeserializer=%s" +
>                         "&valueDeserializer=%s",
>                 StringDeserializer.class.getName(),
>                 KafkaAvroDeserializer.class.getName());
>         from(kafkaCamelUri)
>                 .errorHandler(defaultErrorHandler().disableRedelivery())
>                 .to("log:coucou")
>                 .to("sql-stored:classpath:procstoc.sql" +
>                         "?outputHeader=outError"
>                 )
>                 .to("log:output")
>                 .log("coucou ${headers.outError}");
>     }
>
> }
>
> -----------
>
> And here are some log excerpts :
>
> 2020-04-26 20:16:53.193  INFO 28096 --- [umer[cont_hist]]
> o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.3.1
> 2020-04-26 20:16:53.193  INFO 28096 --- [umer[cont_hist]]
> o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 18a913733fb71c01
> 2020-04-26 20:16:53.193  INFO 28096 --- [umer[cont_hist]]
> o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1587925013193
> 2020-04-26 20:16:53.194  INFO 28096 --- [umer[cont_hist]]
> o.a.camel.component.kafka.KafkaConsumer  : Reconnecting cont_hist-Thread 0
> to topic cont_hist after 5000 ms
> 2020-04-26 20:16:58.194  INFO 28096 --- [umer[cont_hist]]
> o.a.camel.component.kafka.KafkaConsumer  : Subscribing cont_hist-Thread 0
> to topic cont_hist
> 2020-04-26 20:16:58.194  INFO 28096 --- [umer[cont_hist]]
> o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-10,
> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Subscribed to topic(s):
> cont_hist
> 2020-04-26 20:16:58.208  INFO 28096 --- [umer[cont_hist]]
> org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-10,
> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Cluster ID:
> yyC1KuR2Sv2BVVRNLdTnsg
> 2020-04-26 20:16:58.209  INFO 28096 --- [umer[cont_hist]]
> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10,
> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Discovered group coordinator
> localhost:9092 (id: 2147483646 rack: null)
> 2020-04-26 20:16:58.210  INFO 28096 --- [umer[cont_hist]]
> o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-10,
> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Revoking previously assigned
> partitions []
> 2020-04-26 20:16:58.211  INFO 28096 --- [umer[cont_hist]]
> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10,
> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
> 2020-04-26 20:16:58.221  INFO 28096 --- [umer[cont_hist]]
> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10,
> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
> 2020-04-26 20:16:58.229  INFO 28096 --- [umer[cont_hist]]
> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10,
> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Successfully joined group
> with generation 19
> 2020-04-26 20:16:58.232  INFO 28096 --- [umer[cont_hist]]
> o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-10,
> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting newly assigned
> partitions: cont_hist-0
> 2020-04-26 20:16:58.236  INFO 28096 --- [umer[cont_hist]]
> o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-10,
> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting offset for partition
> cont_hist-0 to the committed offset FetchPosition{offset=4,
> offsetEpoch=Optional.empty,
> currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1 rack: null),
> epoch=0}}
> 2020-04-26 20:16:58.251  INFO 28096 --- [umer[cont_hist]]
> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10,
> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Member
> consumer-10-187aed26-5575-4285-a567-3deca28e099c sending LeaveGroup request
> to coordinator localhost:9092 (id: 2147483646 rack: null)
> 2020-04-26 20:16:58.274  INFO 28096 --- [umer[cont_hist]]
> o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values:
> allow.auto.create.topics = true
> auto.commit.interval.ms = 5000
> auto.offset.reset = latest
> bootstrap.servers = [http://localhost:9092]
>
> [...]
>
> 2020-04-26 20:16:58.293  WARN 28096 --- [umer[cont_hist]]
> o.a.k.clients.consumer.ConsumerConfig    : The configuration
> 'sasl.kerberos.principal.to.local.rules' was supplied but isn't a known
> config.
> 2020-04-26 20:16:58.294  INFO 28096 --- [umer[cont_hist]]
> o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.3.1
> 2020-04-26 20:16:58.294  INFO 28096 --- [umer[cont_hist]]
> o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 18a913733fb71c01
> 2020-04-26 20:16:58.294  INFO 28096 --- [umer[cont_hist]]
> o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1587925018294
> 2020-04-26 20:16:58.294  INFO 28096 --- [umer[cont_hist]]
> o.a.camel.component.kafka.KafkaConsumer  : Reconnecting cont_hist-Thread 0
> to topic cont_hist after 5000 ms
> 2020-04-26 20:17:03.295  INFO 28096 --- [umer[cont_hist]]
> o.a.camel.component.kafka.KafkaConsumer  : Subscribing cont_hist-Thread 0
> to topic cont_hist
> 2020-04-26 20:17:03.295  INFO 28096 --- [umer[cont_hist]]
> o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-11,
> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Subscribed to topic(s):
> cont_hist
> 2020-04-26 20:17:03.305  INFO 28096 --- [umer[cont_hist]]
> org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-11,
> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Cluster ID:
> yyC1KuR2Sv2BVVRNLdTnsg
> 2020-04-26 20:17:03.305  INFO 28096 --- [umer[cont_hist]]
> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-11,
> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Discovered group coordinator
> localhost:9092 (id: 2147483646 rack: null)
> 2020-04-26 20:17:03.306  INFO 28096 --- [umer[cont_hist]]
> o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-11,
> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Revoking previously assigned
> partitions []
> 2020-04-26 20:17:03.306  INFO 28096 --- [umer[cont_hist]]
> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-11,
> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
> 2020-04-26 20:17:03.312  INFO 28096 --- [umer[cont_hist]]
> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-11,
> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
> 2020-04-26 20:17:03.319  INFO 28096 --- [umer[cont_hist]]
> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-11,
> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Successfully joined group
> with generation 21
> 2020-04-26 20:17:03.320  INFO 28096 --- [umer[cont_hist]]
> o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-11,
> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting newly assigned
> partitions: cont_hist-0
> 2020-04-26 20:17:03.324  INFO 28096 --- [umer[cont_hist]]
> o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-11,
> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting offset for partition
> cont_hist-0 to the committed offset FetchPosition{offset=4,
> offsetEpoch=Optional.empty,
> currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1 rack: null),
> epoch=0}}
> 2020-04-26 20:17:03.347  INFO 28096 --- [umer[cont_hist]]
> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-11,
> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Member
> consumer-11-936dbb66-cbce-4d6f-a740-b3764822db42 sending LeaveGroup request
> to coordinator localhost:9092 (id: 2147483646 rack: null)
> 2020-04-26 20:17:03.400  INFO 28096 --- [umer[cont_hist]]
> o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values:
> allow.auto.create.topics = true
>
Reply | Threaded
Open this post in threaded view
|

Re: Kafka component error handling - consumer keeps leaving and rejoining the group

Joseph M'BIMBI-BENE
digging into the code (version 3.2.0 i repeat),

i can see in the class
`org.apache.camel.component.kafka.KafkaConsumer.KafkaFetchRecords.doRun`,
lign 406:

```
catch (KafkaException e) {
  // some kind of error in kafka, it may happen during
  // unsubscribing or during normal processing
  if (unsubscribing) {
    getExceptionHandler().handleException("Error unsubscribing " + threadId
+ " from kafka topic " + topicName, e);
  } else {
    LOG.debug("KafkaException consuming {} from topic {} causedby {}. Will
attempt to re-connect on next run", threadId, topicName, e.getMessage());
    reConnect = true;
  }
}
```

A `SerializationException` occurs, which extends KafkaException.
It definitely is not normal processing. And logging with debug level hides
the true cause.
I guess one would have to narrow down the classes of exception to be caught
in that catch clause, or as a quickfix, explicitly catch the
serializationException.

How to proceed ?
I am not super familiar with Camel, and overall with open source
contributions ^^.

Do i just open a ticket in some bug tracker, github maybe ?
Do you want me to open a pull request ?

I am toying with camel for a couple of weeks now, i would like to introduce
it in the projects i work with. But i am by no mean a camel guru,

And this bug might be a showstopper, so i would like to help fix it

Thank you

On Sun, 26 Apr 2020 at 20:56, Joseph M'BIMBI-BENE <[hidden email]>
wrote:

> I  forgot to tell i am using version 3.2.0
>
> On Sun, 26 Apr 2020 at 20:45, Joseph M'BIMBI-BENE <[hidden email]>
> wrote:
>
>> Hello everyone,
>>
>> I'm having a problem with the Kafka component:
>> When the kafka consumer can't read a message (caused by some avro errors
>> after investigation), it continuously leaves the group and joins again.
>>
>> I would like it to just throw an exception and let me decide how to
>> handle it: dlq, ignore, etc.
>>
>> I configured the parameter  `bridgeErrorHandler` but ot no avail. The
>> behaviour is still the same
>>
>> Am i doing something wrong? Please help. Thank you
>>
>> ----------
>>
>> Here is the route definition :
>>
>> @Component
>> public class CamelConfiguration extends RouteBuilder {
>>
>>     @Override
>>     public void configure() throws Exception {
>>         LocalDateTime now = LocalDateTime.now();
>>         String kafkaCamelUri = String.format("kafka:cont_hist" +
>>                         "?brokers={{bootstrap-servers}}" +
>>                         "&schemaRegistryURL=http://localhost:8081" +
>>                         "&specificAvroReader=true" +
>>                         "&bridgeErrorHandler=true" +
>>                         "&keyDeserializer=%s" +
>>                         "&valueDeserializer=%s",
>>                 StringDeserializer.class.getName(),
>>                 KafkaAvroDeserializer.class.getName());
>>         from(kafkaCamelUri)
>>                 .errorHandler(defaultErrorHandler().disableRedelivery())
>>                 .to("log:coucou")
>>                 .to("sql-stored:classpath:procstoc.sql" +
>>                         "?outputHeader=outError"
>>                 )
>>                 .to("log:output")
>>                 .log("coucou ${headers.outError}");
>>     }
>>
>> }
>>
>> -----------
>>
>> And here are some log excerpts :
>>
>> 2020-04-26 20:16:53.193  INFO 28096 --- [umer[cont_hist]]
>> o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.3.1
>> 2020-04-26 20:16:53.193  INFO 28096 --- [umer[cont_hist]]
>> o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 18a913733fb71c01
>> 2020-04-26 20:16:53.193  INFO 28096 --- [umer[cont_hist]]
>> o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1587925013193
>> 2020-04-26 20:16:53.194  INFO 28096 --- [umer[cont_hist]]
>> o.a.camel.component.kafka.KafkaConsumer  : Reconnecting cont_hist-Thread 0
>> to topic cont_hist after 5000 ms
>> 2020-04-26 20:16:58.194  INFO 28096 --- [umer[cont_hist]]
>> o.a.camel.component.kafka.KafkaConsumer  : Subscribing cont_hist-Thread 0
>> to topic cont_hist
>> 2020-04-26 20:16:58.194  INFO 28096 --- [umer[cont_hist]]
>> o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-10,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Subscribed to topic(s):
>> cont_hist
>> 2020-04-26 20:16:58.208  INFO 28096 --- [umer[cont_hist]]
>> org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-10,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Cluster ID:
>> yyC1KuR2Sv2BVVRNLdTnsg
>> 2020-04-26 20:16:58.209  INFO 28096 --- [umer[cont_hist]]
>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Discovered group coordinator
>> localhost:9092 (id: 2147483646 rack: null)
>> 2020-04-26 20:16:58.210  INFO 28096 --- [umer[cont_hist]]
>> o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-10,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Revoking previously assigned
>> partitions []
>> 2020-04-26 20:16:58.211  INFO 28096 --- [umer[cont_hist]]
>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
>> 2020-04-26 20:16:58.221  INFO 28096 --- [umer[cont_hist]]
>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
>> 2020-04-26 20:16:58.229  INFO 28096 --- [umer[cont_hist]]
>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Successfully joined group
>> with generation 19
>> 2020-04-26 20:16:58.232  INFO 28096 --- [umer[cont_hist]]
>> o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-10,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting newly assigned
>> partitions: cont_hist-0
>> 2020-04-26 20:16:58.236  INFO 28096 --- [umer[cont_hist]]
>> o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-10,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting offset for partition
>> cont_hist-0 to the committed offset FetchPosition{offset=4,
>> offsetEpoch=Optional.empty,
>> currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1 rack: null),
>> epoch=0}}
>> 2020-04-26 20:16:58.251  INFO 28096 --- [umer[cont_hist]]
>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Member
>> consumer-10-187aed26-5575-4285-a567-3deca28e099c sending LeaveGroup request
>> to coordinator localhost:9092 (id: 2147483646 rack: null)
>> 2020-04-26 20:16:58.274  INFO 28096 --- [umer[cont_hist]]
>> o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values:
>> allow.auto.create.topics = true
>> auto.commit.interval.ms = 5000
>> auto.offset.reset = latest
>> bootstrap.servers = [http://localhost:9092]
>>
>> [...]
>>
>> 2020-04-26 20:16:58.293  WARN 28096 --- [umer[cont_hist]]
>> o.a.k.clients.consumer.ConsumerConfig    : The configuration
>> 'sasl.kerberos.principal.to.local.rules' was supplied but isn't a known
>> config.
>> 2020-04-26 20:16:58.294  INFO 28096 --- [umer[cont_hist]]
>> o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.3.1
>> 2020-04-26 20:16:58.294  INFO 28096 --- [umer[cont_hist]]
>> o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 18a913733fb71c01
>> 2020-04-26 20:16:58.294  INFO 28096 --- [umer[cont_hist]]
>> o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1587925018294
>> 2020-04-26 20:16:58.294  INFO 28096 --- [umer[cont_hist]]
>> o.a.camel.component.kafka.KafkaConsumer  : Reconnecting cont_hist-Thread 0
>> to topic cont_hist after 5000 ms
>> 2020-04-26 20:17:03.295  INFO 28096 --- [umer[cont_hist]]
>> o.a.camel.component.kafka.KafkaConsumer  : Subscribing cont_hist-Thread 0
>> to topic cont_hist
>> 2020-04-26 20:17:03.295  INFO 28096 --- [umer[cont_hist]]
>> o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-11,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Subscribed to topic(s):
>> cont_hist
>> 2020-04-26 20:17:03.305  INFO 28096 --- [umer[cont_hist]]
>> org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-11,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Cluster ID:
>> yyC1KuR2Sv2BVVRNLdTnsg
>> 2020-04-26 20:17:03.305  INFO 28096 --- [umer[cont_hist]]
>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-11,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Discovered group coordinator
>> localhost:9092 (id: 2147483646 rack: null)
>> 2020-04-26 20:17:03.306  INFO 28096 --- [umer[cont_hist]]
>> o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-11,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Revoking previously assigned
>> partitions []
>> 2020-04-26 20:17:03.306  INFO 28096 --- [umer[cont_hist]]
>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-11,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
>> 2020-04-26 20:17:03.312  INFO 28096 --- [umer[cont_hist]]
>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-11,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
>> 2020-04-26 20:17:03.319  INFO 28096 --- [umer[cont_hist]]
>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-11,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Successfully joined group
>> with generation 21
>> 2020-04-26 20:17:03.320  INFO 28096 --- [umer[cont_hist]]
>> o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-11,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting newly assigned
>> partitions: cont_hist-0
>> 2020-04-26 20:17:03.324  INFO 28096 --- [umer[cont_hist]]
>> o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-11,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting offset for partition
>> cont_hist-0 to the committed offset FetchPosition{offset=4,
>> offsetEpoch=Optional.empty,
>> currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1 rack: null),
>> epoch=0}}
>> 2020-04-26 20:17:03.347  INFO 28096 --- [umer[cont_hist]]
>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-11,
>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Member
>> consumer-11-936dbb66-cbce-4d6f-a740-b3764822db42 sending LeaveGroup request
>> to coordinator localhost:9092 (id: 2147483646 rack: null)
>> 2020-04-26 20:17:03.400  INFO 28096 --- [umer[cont_hist]]
>> o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values:
>> allow.auto.create.topics = true
>>
>
Reply | Threaded
Open this post in threaded view
|

Re: Kafka component error handling - consumer keeps leaving and rejoining the group

Joseph M'BIMBI-BENE
I also realize that the property "bridgeErrorHandler" seems to never be
used, while another one, like "breakOnFirstError".

Also, going back to the exception handling, at least a couple other
subclasses of KafkaException would deserve not to be retried on. Just a few
examples:


   - ConfigException: "Thrown if the user supplies an invalid
   configuration" -> a retry will not solve that
   - OAuthBearerConfigException: "Exception thrown when there is a problem
   with the configuration (an invalid option in a JAAS config, for example)" :
   this one seems to fall under the same category
   - and obviously the serializationException



On Sun, 26 Apr 2020 at 21:55, Joseph M'BIMBI-BENE <[hidden email]>
wrote:

> digging into the code (version 3.2.0 i repeat),
>
> i can see in the class
> `org.apache.camel.component.kafka.KafkaConsumer.KafkaFetchRecords.doRun`,
> lign 406:
>
> ```
> catch (KafkaException e) {
>   // some kind of error in kafka, it may happen during
>   // unsubscribing or during normal processing
>   if (unsubscribing) {
>     getExceptionHandler().handleException("Error unsubscribing " +
> threadId + " from kafka topic " + topicName, e);
>   } else {
>     LOG.debug("KafkaException consuming {} from topic {} causedby {}. Will
> attempt to re-connect on next run", threadId, topicName, e.getMessage());
>     reConnect = true;
>   }
> }
> ```
>
> A `SerializationException` occurs, which extends KafkaException.
> It definitely is not normal processing. And logging with debug level hides
> the true cause.
> I guess one would have to narrow down the classes of exception to be
> caught in that catch clause, or as a quickfix, explicitly catch the
> serializationException.
>
> How to proceed ?
> I am not super familiar with Camel, and overall with open source
> contributions ^^.
>
> Do i just open a ticket in some bug tracker, github maybe ?
> Do you want me to open a pull request ?
>
> I am toying with camel for a couple of weeks now, i would like to
> introduce it in the projects i work with. But i am by no mean a camel guru,
>
> And this bug might be a showstopper, so i would like to help fix it
>
> Thank you
>
> On Sun, 26 Apr 2020 at 20:56, Joseph M'BIMBI-BENE <[hidden email]>
> wrote:
>
>> I  forgot to tell i am using version 3.2.0
>>
>> On Sun, 26 Apr 2020 at 20:45, Joseph M'BIMBI-BENE <
>> [hidden email]> wrote:
>>
>>> Hello everyone,
>>>
>>> I'm having a problem with the Kafka component:
>>> When the kafka consumer can't read a message (caused by some avro errors
>>> after investigation), it continuously leaves the group and joins again.
>>>
>>> I would like it to just throw an exception and let me decide how to
>>> handle it: dlq, ignore, etc.
>>>
>>> I configured the parameter  `bridgeErrorHandler` but ot no avail. The
>>> behaviour is still the same
>>>
>>> Am i doing something wrong? Please help. Thank you
>>>
>>> ----------
>>>
>>> Here is the route definition :
>>>
>>> @Component
>>> public class CamelConfiguration extends RouteBuilder {
>>>
>>>     @Override
>>>     public void configure() throws Exception {
>>>         LocalDateTime now = LocalDateTime.now();
>>>         String kafkaCamelUri = String.format("kafka:cont_hist" +
>>>                         "?brokers={{bootstrap-servers}}" +
>>>                         "&schemaRegistryURL=http://localhost:8081" +
>>>                         "&specificAvroReader=true" +
>>>                         "&bridgeErrorHandler=true" +
>>>                         "&keyDeserializer=%s" +
>>>                         "&valueDeserializer=%s",
>>>                 StringDeserializer.class.getName(),
>>>                 KafkaAvroDeserializer.class.getName());
>>>         from(kafkaCamelUri)
>>>                 .errorHandler(defaultErrorHandler().disableRedelivery())
>>>                 .to("log:coucou")
>>>                 .to("sql-stored:classpath:procstoc.sql" +
>>>                         "?outputHeader=outError"
>>>                 )
>>>                 .to("log:output")
>>>                 .log("coucou ${headers.outError}");
>>>     }
>>>
>>> }
>>>
>>> -----------
>>>
>>> And here are some log excerpts :
>>>
>>> 2020-04-26 20:16:53.193  INFO 28096 --- [umer[cont_hist]]
>>> o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.3.1
>>> 2020-04-26 20:16:53.193  INFO 28096 --- [umer[cont_hist]]
>>> o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 18a913733fb71c01
>>> 2020-04-26 20:16:53.193  INFO 28096 --- [umer[cont_hist]]
>>> o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1587925013193
>>> 2020-04-26 20:16:53.194  INFO 28096 --- [umer[cont_hist]]
>>> o.a.camel.component.kafka.KafkaConsumer  : Reconnecting cont_hist-Thread 0
>>> to topic cont_hist after 5000 ms
>>> 2020-04-26 20:16:58.194  INFO 28096 --- [umer[cont_hist]]
>>> o.a.camel.component.kafka.KafkaConsumer  : Subscribing cont_hist-Thread 0
>>> to topic cont_hist
>>> 2020-04-26 20:16:58.194  INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-10,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Subscribed to topic(s):
>>> cont_hist
>>> 2020-04-26 20:16:58.208  INFO 28096 --- [umer[cont_hist]]
>>> org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-10,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Cluster ID:
>>> yyC1KuR2Sv2BVVRNLdTnsg
>>> 2020-04-26 20:16:58.209  INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Discovered group coordinator
>>> localhost:9092 (id: 2147483646 rack: null)
>>> 2020-04-26 20:16:58.210  INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-10,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Revoking previously assigned
>>> partitions []
>>> 2020-04-26 20:16:58.211  INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
>>> 2020-04-26 20:16:58.221  INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
>>> 2020-04-26 20:16:58.229  INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Successfully joined group
>>> with generation 19
>>> 2020-04-26 20:16:58.232  INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-10,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting newly assigned
>>> partitions: cont_hist-0
>>> 2020-04-26 20:16:58.236  INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-10,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting offset for partition
>>> cont_hist-0 to the committed offset FetchPosition{offset=4,
>>> offsetEpoch=Optional.empty,
>>> currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1 rack: null),
>>> epoch=0}}
>>> 2020-04-26 20:16:58.251  INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Member
>>> consumer-10-187aed26-5575-4285-a567-3deca28e099c sending LeaveGroup request
>>> to coordinator localhost:9092 (id: 2147483646 rack: null)
>>> 2020-04-26 20:16:58.274  INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values:
>>> allow.auto.create.topics = true
>>> auto.commit.interval.ms = 5000
>>> auto.offset.reset = latest
>>> bootstrap.servers = [http://localhost:9092]
>>>
>>> [...]
>>>
>>> 2020-04-26 20:16:58.293  WARN 28096 --- [umer[cont_hist]]
>>> o.a.k.clients.consumer.ConsumerConfig    : The configuration
>>> 'sasl.kerberos.principal.to.local.rules' was supplied but isn't a known
>>> config.
>>> 2020-04-26 20:16:58.294  INFO 28096 --- [umer[cont_hist]]
>>> o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.3.1
>>> 2020-04-26 20:16:58.294  INFO 28096 --- [umer[cont_hist]]
>>> o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 18a913733fb71c01
>>> 2020-04-26 20:16:58.294  INFO 28096 --- [umer[cont_hist]]
>>> o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1587925018294
>>> 2020-04-26 20:16:58.294  INFO 28096 --- [umer[cont_hist]]
>>> o.a.camel.component.kafka.KafkaConsumer  : Reconnecting cont_hist-Thread 0
>>> to topic cont_hist after 5000 ms
>>> 2020-04-26 20:17:03.295  INFO 28096 --- [umer[cont_hist]]
>>> o.a.camel.component.kafka.KafkaConsumer  : Subscribing cont_hist-Thread 0
>>> to topic cont_hist
>>> 2020-04-26 20:17:03.295  INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-11,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Subscribed to topic(s):
>>> cont_hist
>>> 2020-04-26 20:17:03.305  INFO 28096 --- [umer[cont_hist]]
>>> org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-11,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Cluster ID:
>>> yyC1KuR2Sv2BVVRNLdTnsg
>>> 2020-04-26 20:17:03.305  INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-11,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Discovered group coordinator
>>> localhost:9092 (id: 2147483646 rack: null)
>>> 2020-04-26 20:17:03.306  INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-11,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Revoking previously assigned
>>> partitions []
>>> 2020-04-26 20:17:03.306  INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-11,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
>>> 2020-04-26 20:17:03.312  INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-11,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
>>> 2020-04-26 20:17:03.319  INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-11,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Successfully joined group
>>> with generation 21
>>> 2020-04-26 20:17:03.320  INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-11,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting newly assigned
>>> partitions: cont_hist-0
>>> 2020-04-26 20:17:03.324  INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-11,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting offset for partition
>>> cont_hist-0 to the committed offset FetchPosition{offset=4,
>>> offsetEpoch=Optional.empty,
>>> currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1 rack: null),
>>> epoch=0}}
>>> 2020-04-26 20:17:03.347  INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-11,
>>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Member
>>> consumer-11-936dbb66-cbce-4d6f-a740-b3764822db42 sending LeaveGroup request
>>> to coordinator localhost:9092 (id: 2147483646 rack: null)
>>> 2020-04-26 20:17:03.400  INFO 28096 --- [umer[cont_hist]]
>>> o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values:
>>> allow.auto.create.topics = true
>>>
>>
Reply | Threaded
Open this post in threaded view
|

Re: Kafka component error handling - consumer keeps leaving and rejoining the group

Claus Ibsen-2
Hi

Thanks for all your findings, this is great insight. You are surely
welcome to create a JIRA ticket about this bug.
We can then together work on a fix, and you can help test it.

Yeah it seems catching KafkaException is maybe too wide. And lets see
if we also can incorporate bridge error handler.

On Mon, Apr 27, 2020 at 10:06 AM Joseph M'BIMBI-BENE
<[hidden email]> wrote:

>
> I also realize that the property "bridgeErrorHandler" seems to never be
> used, while another one, like "breakOnFirstError".
>
> Also, going back to the exception handling, at least a couple other
> subclasses of KafkaException would deserve not to be retried on. Just a few
> examples:
>
>
>    - ConfigException: "Thrown if the user supplies an invalid
>    configuration" -> a retry will not solve that
>    - OAuthBearerConfigException: "Exception thrown when there is a problem
>    with the configuration (an invalid option in a JAAS config, for example)" :
>    this one seems to fall under the same category
>    - and obviously the serializationException
>
>
>
> On Sun, 26 Apr 2020 at 21:55, Joseph M'BIMBI-BENE <[hidden email]>
> wrote:
>
> > digging into the code (version 3.2.0 i repeat),
> >
> > i can see in the class
> > `org.apache.camel.component.kafka.KafkaConsumer.KafkaFetchRecords.doRun`,
> > lign 406:
> >
> > ```
> > catch (KafkaException e) {
> >   // some kind of error in kafka, it may happen during
> >   // unsubscribing or during normal processing
> >   if (unsubscribing) {
> >     getExceptionHandler().handleException("Error unsubscribing " +
> > threadId + " from kafka topic " + topicName, e);
> >   } else {
> >     LOG.debug("KafkaException consuming {} from topic {} causedby {}. Will
> > attempt to re-connect on next run", threadId, topicName, e.getMessage());
> >     reConnect = true;
> >   }
> > }
> > ```
> >
> > A `SerializationException` occurs, which extends KafkaException.
> > It definitely is not normal processing. And logging with debug level hides
> > the true cause.
> > I guess one would have to narrow down the classes of exception to be
> > caught in that catch clause, or as a quickfix, explicitly catch the
> > serializationException.
> >
> > How to proceed ?
> > I am not super familiar with Camel, and overall with open source
> > contributions ^^.
> >
> > Do i just open a ticket in some bug tracker, github maybe ?
> > Do you want me to open a pull request ?
> >
> > I am toying with camel for a couple of weeks now, i would like to
> > introduce it in the projects i work with. But i am by no mean a camel guru,
> >
> > And this bug might be a showstopper, so i would like to help fix it
> >
> > Thank you
> >
> > On Sun, 26 Apr 2020 at 20:56, Joseph M'BIMBI-BENE <[hidden email]>
> > wrote:
> >
> >> I  forgot to tell i am using version 3.2.0
> >>
> >> On Sun, 26 Apr 2020 at 20:45, Joseph M'BIMBI-BENE <
> >> [hidden email]> wrote:
> >>
> >>> Hello everyone,
> >>>
> >>> I'm having a problem with the Kafka component:
> >>> When the kafka consumer can't read a message (caused by some avro errors
> >>> after investigation), it continuously leaves the group and joins again.
> >>>
> >>> I would like it to just throw an exception and let me decide how to
> >>> handle it: dlq, ignore, etc.
> >>>
> >>> I configured the parameter  `bridgeErrorHandler` but ot no avail. The
> >>> behaviour is still the same
> >>>
> >>> Am i doing something wrong? Please help. Thank you
> >>>
> >>> ----------
> >>>
> >>> Here is the route definition :
> >>>
> >>> @Component
> >>> public class CamelConfiguration extends RouteBuilder {
> >>>
> >>>     @Override
> >>>     public void configure() throws Exception {
> >>>         LocalDateTime now = LocalDateTime.now();
> >>>         String kafkaCamelUri = String.format("kafka:cont_hist" +
> >>>                         "?brokers={{bootstrap-servers}}" +
> >>>                         "&schemaRegistryURL=http://localhost:8081" +
> >>>                         "&specificAvroReader=true" +
> >>>                         "&bridgeErrorHandler=true" +
> >>>                         "&keyDeserializer=%s" +
> >>>                         "&valueDeserializer=%s",
> >>>                 StringDeserializer.class.getName(),
> >>>                 KafkaAvroDeserializer.class.getName());
> >>>         from(kafkaCamelUri)
> >>>                 .errorHandler(defaultErrorHandler().disableRedelivery())
> >>>                 .to("log:coucou")
> >>>                 .to("sql-stored:classpath:procstoc.sql" +
> >>>                         "?outputHeader=outError"
> >>>                 )
> >>>                 .to("log:output")
> >>>                 .log("coucou ${headers.outError}");
> >>>     }
> >>>
> >>> }
> >>>
> >>> -----------
> >>>
> >>> And here are some log excerpts :
> >>>
> >>> 2020-04-26 20:16:53.193  INFO 28096 --- [umer[cont_hist]]
> >>> o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.3.1
> >>> 2020-04-26 20:16:53.193  INFO 28096 --- [umer[cont_hist]]
> >>> o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 18a913733fb71c01
> >>> 2020-04-26 20:16:53.193  INFO 28096 --- [umer[cont_hist]]
> >>> o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1587925013193
> >>> 2020-04-26 20:16:53.194  INFO 28096 --- [umer[cont_hist]]
> >>> o.a.camel.component.kafka.KafkaConsumer  : Reconnecting cont_hist-Thread 0
> >>> to topic cont_hist after 5000 ms
> >>> 2020-04-26 20:16:58.194  INFO 28096 --- [umer[cont_hist]]
> >>> o.a.camel.component.kafka.KafkaConsumer  : Subscribing cont_hist-Thread 0
> >>> to topic cont_hist
> >>> 2020-04-26 20:16:58.194  INFO 28096 --- [umer[cont_hist]]
> >>> o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-10,
> >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Subscribed to topic(s):
> >>> cont_hist
> >>> 2020-04-26 20:16:58.208  INFO 28096 --- [umer[cont_hist]]
> >>> org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-10,
> >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Cluster ID:
> >>> yyC1KuR2Sv2BVVRNLdTnsg
> >>> 2020-04-26 20:16:58.209  INFO 28096 --- [umer[cont_hist]]
> >>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10,
> >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Discovered group coordinator
> >>> localhost:9092 (id: 2147483646 rack: null)
> >>> 2020-04-26 20:16:58.210  INFO 28096 --- [umer[cont_hist]]
> >>> o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-10,
> >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Revoking previously assigned
> >>> partitions []
> >>> 2020-04-26 20:16:58.211  INFO 28096 --- [umer[cont_hist]]
> >>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10,
> >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
> >>> 2020-04-26 20:16:58.221  INFO 28096 --- [umer[cont_hist]]
> >>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10,
> >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
> >>> 2020-04-26 20:16:58.229  INFO 28096 --- [umer[cont_hist]]
> >>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10,
> >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Successfully joined group
> >>> with generation 19
> >>> 2020-04-26 20:16:58.232  INFO 28096 --- [umer[cont_hist]]
> >>> o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-10,
> >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting newly assigned
> >>> partitions: cont_hist-0
> >>> 2020-04-26 20:16:58.236  INFO 28096 --- [umer[cont_hist]]
> >>> o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-10,
> >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting offset for partition
> >>> cont_hist-0 to the committed offset FetchPosition{offset=4,
> >>> offsetEpoch=Optional.empty,
> >>> currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1 rack: null),
> >>> epoch=0}}
> >>> 2020-04-26 20:16:58.251  INFO 28096 --- [umer[cont_hist]]
> >>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-10,
> >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Member
> >>> consumer-10-187aed26-5575-4285-a567-3deca28e099c sending LeaveGroup request
> >>> to coordinator localhost:9092 (id: 2147483646 rack: null)
> >>> 2020-04-26 20:16:58.274  INFO 28096 --- [umer[cont_hist]]
> >>> o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values:
> >>> allow.auto.create.topics = true
> >>> auto.commit.interval.ms = 5000
> >>> auto.offset.reset = latest
> >>> bootstrap.servers = [http://localhost:9092]
> >>>
> >>> [...]
> >>>
> >>> 2020-04-26 20:16:58.293  WARN 28096 --- [umer[cont_hist]]
> >>> o.a.k.clients.consumer.ConsumerConfig    : The configuration
> >>> 'sasl.kerberos.principal.to.local.rules' was supplied but isn't a known
> >>> config.
> >>> 2020-04-26 20:16:58.294  INFO 28096 --- [umer[cont_hist]]
> >>> o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.3.1
> >>> 2020-04-26 20:16:58.294  INFO 28096 --- [umer[cont_hist]]
> >>> o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 18a913733fb71c01
> >>> 2020-04-26 20:16:58.294  INFO 28096 --- [umer[cont_hist]]
> >>> o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1587925018294
> >>> 2020-04-26 20:16:58.294  INFO 28096 --- [umer[cont_hist]]
> >>> o.a.camel.component.kafka.KafkaConsumer  : Reconnecting cont_hist-Thread 0
> >>> to topic cont_hist after 5000 ms
> >>> 2020-04-26 20:17:03.295  INFO 28096 --- [umer[cont_hist]]
> >>> o.a.camel.component.kafka.KafkaConsumer  : Subscribing cont_hist-Thread 0
> >>> to topic cont_hist
> >>> 2020-04-26 20:17:03.295  INFO 28096 --- [umer[cont_hist]]
> >>> o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-11,
> >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Subscribed to topic(s):
> >>> cont_hist
> >>> 2020-04-26 20:17:03.305  INFO 28096 --- [umer[cont_hist]]
> >>> org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-11,
> >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Cluster ID:
> >>> yyC1KuR2Sv2BVVRNLdTnsg
> >>> 2020-04-26 20:17:03.305  INFO 28096 --- [umer[cont_hist]]
> >>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-11,
> >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Discovered group coordinator
> >>> localhost:9092 (id: 2147483646 rack: null)
> >>> 2020-04-26 20:17:03.306  INFO 28096 --- [umer[cont_hist]]
> >>> o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-11,
> >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Revoking previously assigned
> >>> partitions []
> >>> 2020-04-26 20:17:03.306  INFO 28096 --- [umer[cont_hist]]
> >>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-11,
> >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
> >>> 2020-04-26 20:17:03.312  INFO 28096 --- [umer[cont_hist]]
> >>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-11,
> >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] (Re-)joining group
> >>> 2020-04-26 20:17:03.319  INFO 28096 --- [umer[cont_hist]]
> >>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-11,
> >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Successfully joined group
> >>> with generation 21
> >>> 2020-04-26 20:17:03.320  INFO 28096 --- [umer[cont_hist]]
> >>> o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-11,
> >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting newly assigned
> >>> partitions: cont_hist-0
> >>> 2020-04-26 20:17:03.324  INFO 28096 --- [umer[cont_hist]]
> >>> o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-11,
> >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Setting offset for partition
> >>> cont_hist-0 to the committed offset FetchPosition{offset=4,
> >>> offsetEpoch=Optional.empty,
> >>> currentLeader=LeaderAndEpoch{leader=localhost:9092 (id: 1 rack: null),
> >>> epoch=0}}
> >>> 2020-04-26 20:17:03.347  INFO 28096 --- [umer[cont_hist]]
> >>> o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-11,
> >>> groupId=e906486e-7c73-408a-916e-ac6eb3c6da4a] Member
> >>> consumer-11-936dbb66-cbce-4d6f-a740-b3764822db42 sending LeaveGroup request
> >>> to coordinator localhost:9092 (id: 2147483646 rack: null)
> >>> 2020-04-26 20:17:03.400  INFO 28096 --- [umer[cont_hist]]
> >>> o.a.k.clients.consumer.ConsumerConfig    : ConsumerConfig values:
> >>> allow.auto.create.topics = true
> >>>
> >>



--
Claus Ibsen
-----------------
http://davsclaus.com @davsclaus
Camel in Action 2: https://www.manning.com/ibsen2