Camel-Kafka Producer Problem

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Camel-Kafka Producer Problem

Burkard Stephan
Hi

I have built a tiny example project to demonstrate a problem of camel-kafka component I stumbled upon.

When a Kafka producer is used in a global exception handler, typically to route error messages to a dead letter topic, the producer is registered at the broker for every route.

Are the individual routes "merged" at runtime with the global exception handlers? It is quite surprising to have 5 Kafka producers because there are 5 Routes (4 of them "private" routes to divide the logic).

However, during startup this multi-registration takes place, but always with the same Kafka client-id. Therefore, starting from the second producer, an InstanceAlreadyExistsException is thrown because the JMX beans the producer creates have all the same name.

Camel 2.24.3
Kafka Client 2.0.1

The most simple workaround is to move the producer to a direct route and call that route from the global error handler. In this case, the problem does not occur and there seems to be only one producer.

Regards
Stephan



javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=producerExample
        at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) ~[na:1.8.0_144]
        at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898) ~[na:1.8.0_144]
        at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966) ~[na:1.8.0_144]
        at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900) ~[na:1.8.0_144]
        at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324) ~[na:1.8.0_144]
        at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522) ~[na:1.8.0_144]
        at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62) ~[kafka-clients-2.0.1.jar:na]
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:451) [kafka-clients-2.0.1.jar:na]
        at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:304) [kafka-clients-2.0.1.jar:na]
        at org.apache.camel.component.kafka.KafkaProducer.doStart(KafkaProducer.java:109) [camel-kafka-2.24.3.jar:2.24.3]
        at org.apache.camel.support.ServiceSupport.start(ServiceSupport.java:72) [camel-core-2.24.3.jar:2.24.3]
        at org.apache.camel.util.ServiceHelper.startService(ServiceHelper.java:75) [camel-core-2.24.3.jar:2.24.3]
        at org.apache.camel.impl.DeferServiceStartupListener.onCamelContextStarted(DeferServiceStartupListener.java:49) [camel-core-2.24.3.jar:2.24.3]
        at org.apache.camel.impl.DefaultCamelContext.safelyStartRouteServices(DefaultCamelContext.java:3868) [camel-core-2.24.3.jar:2.24.3]
        at org.apache.camel.impl.DefaultCamelContext.doStartOrResumeRoutes(DefaultCamelContext.java:3647) [camel-core-2.24.3.jar:2.24.3]
        at org.apache.camel.impl.DefaultCamelContext.doStartCamel(DefaultCamelContext.java:3488) [camel-core-2.24.3.jar:2.24.3]
        at org.apache.camel.impl.DefaultCamelContext$4.call(DefaultCamelContext.java:3247) [camel-core-2.24.3.jar:2.24.3]
        at org.apache.camel.impl.DefaultCamelContext$4.call(DefaultCamelContext.java:3243) [camel-core-2.24.3.jar:2.24.3]
        at org.apache.camel.impl.DefaultCamelContext.doWithDefinedClassLoader(DefaultCamelContext.java:3266) [camel-core-2.24.3.jar:2.24.3]
        at org.apache.camel.impl.DefaultCamelContext.doStart(DefaultCamelContext.java:3243) [camel-core-2.24.3.jar:2.24.3]
        at org.apache.camel.support.ServiceSupport.start(ServiceSupport.java:72) [camel-core-2.24.3.jar:2.24.3]
        at org.apache.camel.impl.DefaultCamelContext.start(DefaultCamelContext.java:3159) [camel-core-2.24.3.jar:2.24.3]
        at org.apache.camel.spring.SpringCamelContext.start(SpringCamelContext.java:133) [camel-spring-2.24.3.jar:2.24.3]
        at org.apache.camel.spring.SpringCamelContext.onApplicationEvent(SpringCamelContext.java:174) [camel-spring-2.24.3.jar:2.24.3]
        at org.springframework.context.event.SimpleApplicationEventMulticaster.doInvokeListener(SimpleApplicationEventMulticaster.java:172) [spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
        at org.springframework.context.event.SimpleApplicationEventMulticaster.invokeListener(SimpleApplicationEventMulticaster.java:165) [spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
        at org.springframework.context.event.SimpleApplicationEventMulticaster.multicastEvent(SimpleApplicationEventMulticaster.java:139) [spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
        at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:402) [spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
        at org.springframework.context.support.AbstractApplicationContext.publishEvent(AbstractApplicationContext.java:359) [spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
        at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:896) [spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
        at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:162) [spring-boot-2.1.7.RELEASE.jar:2.1.7.RELEASE]
        at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:552) [spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
        at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:141) [spring-boot-2.1.7.RELEASE.jar:2.1.7.RELEASE]
        at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:743) [spring-boot-2.1.7.RELEASE.jar:2.1.7.RELEASE]
        at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:390) [spring-boot-2.1.7.RELEASE.jar:2.1.7.RELEASE]
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:312) [spring-boot-2.1.7.RELEASE.jar:2.1.7.RELEASE]
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:1214) [spring-boot-2.1.7.RELEASE.jar:2.1.7.RELEASE]
        at org.springframework.boot.SpringApplication.run(SpringApplication.java:1203) [spring-boot-2.1.7.RELEASE.jar:2.1.7.RELEASE]
        at org.apache.camel.example.Application.main(Application.java:9) [classes/:na]



KafkaProducerProblem.zip (5K) Download Attachment