[camel-kafka-connector] branch master updated: Defer topic deletion to the extensions to prevent an Exception on shutdown

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[camel-kafka-connector] branch master updated: Defer topic deletion to the extensions to prevent an Exception on shutdown

acosentino
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 7cee66a  Defer topic deletion to the extensions to prevent an Exception on shutdown
     new 188c727  Merge pull request #374 from orpiske/topic-cleanup
7cee66a is described below

commit 7cee66ad2a492edb6fd14ad3ae13b5ae8df70b36
Author: Otavio Rodolfo Piske <[hidden email]>
AuthorDate: Wed Aug 12 17:30:18 2020 +0200

    Defer topic deletion to the extensions to prevent an Exception on shutdown
---
 .../aws/v1/kinesis/source/CamelSourceAWSKinesisITCase.java |  2 --
 .../aws/v1/s3/source/CamelSourceAWSS3ITCase.java           |  2 --
 .../aws/v1/sns/sink/CamelSinkAWSSNSITCase.java             |  6 ------
 .../aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java             |  1 -
 .../aws/v1/sqs/source/CamelSourceAWSSQSITCase.java         |  2 --
 .../aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java |  2 --
 .../cassandra/sink/CamelSinkCassandraITCase.java           |  2 --
 .../cassandra/source/CamelSourceCassandraITCase.java       |  2 --
 .../camel/kafkaconnector/common/AbstractKafkaTest.java     | 14 --------------
 .../common/services/kafkaconnect/KafkaConnectEmbedded.java |  6 ++++++
 .../common/services/kafkaconnect/KafkaConnectRunner.java   |  8 ++++++++
 .../elasticsearch/sink/CamelSinkElasticSearchITCase.java   |  6 ------
 .../kafkaconnector/file/sink/CamelSinkFileITCase.java      |  1 -
 .../kafkaconnector/http/sink/CamelSinkHTTPITCase.java      |  2 --
 .../salesforce/sink/CamelSinkSalesforceITCase.java         |  2 --
 .../salesforce/source/CamelSourceSalesforceITCase.java     |  2 --
 .../kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java      |  6 ------
 .../kafkaconnector/sjms2/source/CamelSourceJMSITCase.java  |  6 ------
 .../kafkaconnector/slack/sink/CamelSinkSlackITCase.java    |  6 ------
 .../kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java  |  6 ------
 .../syslog/source/CamelSourceSyslogITCase.java             |  8 --------
 .../timer/source/CamelSourceTimerITCase.java               |  7 -------
 22 files changed, 14 insertions(+), 85 deletions(-)

diff --git a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/kinesis/source/CamelSourceAWSKinesisITCase.java b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/kinesis/source/CamelSourceAWSKinesisITCase.java
index 3df34f1..4413b2d 100644
--- a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/kinesis/source/CamelSourceAWSKinesisITCase.java
+++ b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/kinesis/source/CamelSourceAWSKinesisITCase.java
@@ -137,8 +137,6 @@ public class CamelSourceAWSKinesisITCase extends AbstractKafkaTest {
         deleteStream();
 
         awsKinesisClient.shutdown();
-
-        deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()));
     }
 
 
diff --git a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/s3/source/CamelSourceAWSS3ITCase.java b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/s3/source/CamelSourceAWSS3ITCase.java
index 4930870..965803f 100644
--- a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/s3/source/CamelSourceAWSS3ITCase.java
+++ b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/s3/source/CamelSourceAWSS3ITCase.java
@@ -82,8 +82,6 @@ public class CamelSourceAWSS3ITCase extends AbstractKafkaTest {
         } catch (Exception e) {
             LOG.warn("Unable to delete bucked: {}", e.getMessage(), e);
         }
-
-        deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()));
     }
 
     private boolean checkRecord(ConsumerRecord<String, String> record) {
diff --git a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sns/sink/CamelSinkAWSSNSITCase.java b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sns/sink/CamelSinkAWSSNSITCase.java
index 93b07a5..1199ddb 100644
--- a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sns/sink/CamelSinkAWSSNSITCase.java
+++ b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sns/sink/CamelSinkAWSSNSITCase.java
@@ -36,7 +36,6 @@ import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
-import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
@@ -80,11 +79,6 @@ public class CamelSinkAWSSNSITCase extends AbstractKafkaTest  {
         received = 0;
     }
 
-    @AfterEach
-    public void tearDown() {
-        deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()));
-    }
-
     private boolean checkMessages(List<Message> messages) {
         for (Message message : messages) {
             LOG.info("Received: {}", message.getBody());
diff --git a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java
index 0125bd3..db2dd6d 100644
--- a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java
+++ b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/sink/CamelSinkAWSSQSITCase.java
@@ -82,7 +82,6 @@ public class CamelSinkAWSSQSITCase extends AbstractKafkaTest {
 
     @AfterEach
     public void tearDown() {
-        deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()));
         if (!awssqsClient.deleteQueue(queueName)) {
             fail("Failed to delete queue");
         }
diff --git a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/source/CamelSourceAWSSQSITCase.java b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/source/CamelSourceAWSSQSITCase.java
index cbef4be..2c26d09 100644
--- a/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/source/CamelSourceAWSSQSITCase.java
+++ b/tests/itests-aws-v1/src/test/java/org/apache/camel/kafkaconnector/aws/v1/sqs/source/CamelSourceAWSSQSITCase.java
@@ -74,8 +74,6 @@ public class CamelSourceAWSSQSITCase extends AbstractKafkaTest {
 
     @AfterEach
     public void tearDown() {
-        deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()));
-
         if (!awssqsClient.deleteQueue(queueName)) {
             fail("Failed to delete queue");
         }
diff --git a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java
index ac8dcd3..33b8792 100644
--- a/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java
+++ b/tests/itests-aws-v2/src/test/java/org/apache/camel/kafkaconnector/aws/v2/kinesis/source/CamelSourceAWSKinesisITCase.java
@@ -164,8 +164,6 @@ public class CamelSourceAWSKinesisITCase extends AbstractKafkaTest {
     @AfterEach
     public void tearDown() {
         deleteStream();
-
-        deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()));
     }
 
     private boolean checkRecord(ConsumerRecord<String, String> record) {
diff --git a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java
index e0374be..0cdf1b6 100644
--- a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java
+++ b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/sink/CamelSinkCassandraITCase.java
@@ -79,8 +79,6 @@ public class CamelSinkCassandraITCase extends AbstractKafkaTest {
         if (testDataDao != null) {
             testDataDao.dropTable();
         }
-
-        deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()));
     }
 
     private void putRecords(CountDownLatch latch) {
diff --git a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/source/CamelSourceCassandraITCase.java b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/source/CamelSourceCassandraITCase.java
index 3e8f887..707f923 100644
--- a/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/source/CamelSourceCassandraITCase.java
+++ b/tests/itests-cassandra/src/test/java/org/apache/camel/kafkaconnector/cassandra/source/CamelSourceCassandraITCase.java
@@ -80,8 +80,6 @@ public class CamelSourceCassandraITCase extends AbstractKafkaTest {
         if (testDataDao != null) {
             testDataDao.dropTable();
         }
-
-        deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()));
     }
 
     private <T> boolean checkRecord(ConsumerRecord<String, T> record) {
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/AbstractKafkaTest.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/AbstractKafkaTest.java
index d1aa6c6..db9e2d2 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/AbstractKafkaTest.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/AbstractKafkaTest.java
@@ -17,21 +17,16 @@
 
 package org.apache.camel.kafkaconnector.common;
 
-import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
 import org.apache.camel.kafkaconnector.common.services.kafka.KafkaService;
 import org.apache.camel.kafkaconnector.common.services.kafka.KafkaServiceFactory;
 import org.apache.camel.kafkaconnector.common.services.kafkaconnect.KafkaConnectRunnerFactory;
 import org.apache.camel.kafkaconnector.common.services.kafkaconnect.KafkaConnectService;
 import org.apache.camel.kafkaconnector.common.utils.PropertyUtils;
 import org.junit.jupiter.api.extension.RegisterExtension;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.testcontainers.junit.jupiter.Testcontainers;
 
 @Testcontainers
 public abstract class AbstractKafkaTest {
-    private static final Logger LOG = LoggerFactory.getLogger(AbstractKafkaTest.class);
-
     @RegisterExtension
     public final KafkaService kafkaService;
 
@@ -61,13 +56,4 @@ public abstract class AbstractKafkaTest {
     public KafkaConnectService getKafkaConnectService() {
         return kafkaConnectService;
     }
-
-    protected void deleteKafkaTopic(String topic) {
-        try {
-            KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
-            kafkaClient.deleteTopic(topic);
-        } catch (Throwable t) {
-            LOG.warn("Topic not deleted (probably the Kafka test cluster was already shutting down?).", t);
-        }
-    }
 }
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectEmbedded.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectEmbedded.java
index fe5d9b5..dadab5a 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectEmbedded.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectEmbedded.java
@@ -23,6 +23,7 @@ import java.util.Map;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.services.kafka.EmbeddedKafkaService;
 import org.apache.camel.kafkaconnector.common.services.kafka.KafkaService;
+import org.apache.kafka.clients.admin.Admin;
 import org.apache.kafka.connect.runtime.AbstractStatus;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
@@ -81,6 +82,11 @@ public class KafkaConnectEmbedded implements KafkaConnectService {
     public void stop() {
         if (connectorName != null) {
             try {
+                LOG.info("Removing topics used during the test");
+                Admin client = cluster.kafka().createAdminClient();
+
+                client.deleteTopics(cluster.connectorTopics(connectorName).topics());
+
                 LOG.info("Removing connector {}", connectorName);
                 cluster.deleteConnector(connectorName);
             } finally {
diff --git a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunner.java b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunner.java
index 754dbd7..121be12 100644
--- a/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunner.java
+++ b/tests/itests-common/src/test/java/org/apache/camel/kafkaconnector/common/services/kafkaconnect/KafkaConnectRunner.java
@@ -27,6 +27,7 @@ import java.util.function.BiConsumer;
 import java.util.function.Consumer;
 
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
+import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.connector.policy.AllConnectorClientConfigOverridePolicy;
@@ -214,6 +215,13 @@ class KafkaConnectRunner {
      */
     public void stop() {
         if (connect != null) {
+            LOG.info("Removing topics used during the test");
+            KafkaClient kafkaClient = new KafkaClient(bootstrapServer);
+
+            for (String connector : herder.connectors()) {
+                herder.connectorActiveTopics(connector).topics().forEach(t -> kafkaClient.deleteTopic(t));
+            }
+
             connect.stop();
         } else {
             LOG.warn("Trying to stop an uninitialized Kafka Connect Runner");
diff --git a/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/sink/CamelSinkElasticSearchITCase.java b/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/sink/CamelSinkElasticSearchITCase.java
index aca155e..80834dd 100644
--- a/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/sink/CamelSinkElasticSearchITCase.java
+++ b/tests/itests-elasticsearch/src/test/java/org/apache/camel/kafkaconnector/elasticsearch/sink/CamelSinkElasticSearchITCase.java
@@ -33,7 +33,6 @@ import org.apache.camel.kafkaconnector.elasticsearch.services.ElasticSearchServi
 import org.apache.camel.kafkaconnector.elasticsearch.services.ElasticSearchServiceFactory;
 import org.elasticsearch.search.SearchHit;
 import org.elasticsearch.search.SearchHits;
-import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
@@ -72,11 +71,6 @@ public class CamelSinkElasticSearchITCase extends AbstractKafkaTest {
         received = 0;
     }
 
-    @AfterEach
-    public void tearDown() {
-        deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()));
-    }
-
     private void putRecords(CountDownLatch latch) {
         KafkaClient<String, String> kafkaClient = new KafkaClient<>(getKafkaService().getBootstrapServers());
 
diff --git a/tests/itests-file/src/test/java/org/apache/camel/kafkaconnector/file/sink/CamelSinkFileITCase.java b/tests/itests-file/src/test/java/org/apache/camel/kafkaconnector/file/sink/CamelSinkFileITCase.java
index 3300571..adbc9fd 100644
--- a/tests/itests-file/src/test/java/org/apache/camel/kafkaconnector/file/sink/CamelSinkFileITCase.java
+++ b/tests/itests-file/src/test/java/org/apache/camel/kafkaconnector/file/sink/CamelSinkFileITCase.java
@@ -69,7 +69,6 @@ public class CamelSinkFileITCase extends AbstractKafkaTest {
     @AfterEach
     public void tearDown() {
         cleanup();
-        deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()));
     }
 
     private void cleanup() {
diff --git a/tests/itests-http/src/test/java/org/apache/camel/kafkaconnector/http/sink/CamelSinkHTTPITCase.java b/tests/itests-http/src/test/java/org/apache/camel/kafkaconnector/http/sink/CamelSinkHTTPITCase.java
index 7151484..f23ac4d 100644
--- a/tests/itests-http/src/test/java/org/apache/camel/kafkaconnector/http/sink/CamelSinkHTTPITCase.java
+++ b/tests/itests-http/src/test/java/org/apache/camel/kafkaconnector/http/sink/CamelSinkHTTPITCase.java
@@ -81,8 +81,6 @@ public class CamelSinkHTTPITCase extends AbstractKafkaTest {
         } finally {
             localServer.shutdown(2, TimeUnit.SECONDS);
         }
-
-        deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()));
     }
 
 
diff --git a/tests/itests-salesforce/src/test/java/org/apache/camel/kafkaconnector/salesforce/sink/CamelSinkSalesforceITCase.java b/tests/itests-salesforce/src/test/java/org/apache/camel/kafkaconnector/salesforce/sink/CamelSinkSalesforceITCase.java
index f49635d..6f05bde 100644
--- a/tests/itests-salesforce/src/test/java/org/apache/camel/kafkaconnector/salesforce/sink/CamelSinkSalesforceITCase.java
+++ b/tests/itests-salesforce/src/test/java/org/apache/camel/kafkaconnector/salesforce/sink/CamelSinkSalesforceITCase.java
@@ -119,8 +119,6 @@ public class CamelSinkSalesforceITCase extends AbstractKafkaTest {
         }
 
         accountName = null;
-
-        deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()));
     }
 
 
diff --git a/tests/itests-salesforce/src/test/java/org/apache/camel/kafkaconnector/salesforce/source/CamelSourceSalesforceITCase.java b/tests/itests-salesforce/src/test/java/org/apache/camel/kafkaconnector/salesforce/source/CamelSourceSalesforceITCase.java
index 40cb51e..d0e542a 100644
--- a/tests/itests-salesforce/src/test/java/org/apache/camel/kafkaconnector/salesforce/source/CamelSourceSalesforceITCase.java
+++ b/tests/itests-salesforce/src/test/java/org/apache/camel/kafkaconnector/salesforce/source/CamelSourceSalesforceITCase.java
@@ -133,8 +133,6 @@ public class CamelSourceSalesforceITCase extends AbstractKafkaTest  {
             fail("Unable to delete the test account on Salesforce");
         }
         account = null;
-
-        deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()));
     }
 
     private <T> boolean checkRecord(ConsumerRecord<String, T> record) {
diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java
index 12d2198..0f3db92 100644
--- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java
+++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/sink/CamelSinkJMSITCase.java
@@ -35,7 +35,6 @@ import org.apache.camel.kafkaconnector.sjms2.clients.JMSClient;
 import org.apache.camel.kafkaconnector.sjms2.common.SJMS2Common;
 import org.apache.camel.kafkaconnector.sjms2.services.JMSService;
 import org.apache.camel.kafkaconnector.sjms2.services.JMSServiceFactory;
-import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
@@ -71,11 +70,6 @@ public class CamelSinkJMSITCase extends AbstractKafkaTest {
         received = 0;
     }
 
-    @AfterEach
-    public void tearDown() {
-        deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()));
-    }
-
     private boolean checkRecord(Message jmsMessage) {
         if (jmsMessage instanceof TextMessage) {
             try {
diff --git a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSITCase.java b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSITCase.java
index 40896c4..1dd8cae 100644
--- a/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSITCase.java
+++ b/tests/itests-sjms2/src/test/java/org/apache/camel/kafkaconnector/sjms2/source/CamelSourceJMSITCase.java
@@ -30,7 +30,6 @@ import org.apache.camel.kafkaconnector.sjms2.common.SJMS2Common;
 import org.apache.camel.kafkaconnector.sjms2.services.JMSService;
 import org.apache.camel.kafkaconnector.sjms2.services.JMSServiceFactory;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
@@ -67,11 +66,6 @@ public class CamelSourceJMSITCase extends AbstractKafkaTest {
         received = 0;
     }
 
-    @AfterEach
-    public void tearDown() {
-        deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()));
-    }
-
     private <T> boolean checkRecord(ConsumerRecord<String, T> record) {
         LOG.debug("Received: {}", record.value());
         received++;
diff --git a/tests/itests-slack/src/test/java/org/apache/camel/kafkaconnector/slack/sink/CamelSinkSlackITCase.java b/tests/itests-slack/src/test/java/org/apache/camel/kafkaconnector/slack/sink/CamelSinkSlackITCase.java
index 171f3ba..4c9d7de 100644
--- a/tests/itests-slack/src/test/java/org/apache/camel/kafkaconnector/slack/sink/CamelSinkSlackITCase.java
+++ b/tests/itests-slack/src/test/java/org/apache/camel/kafkaconnector/slack/sink/CamelSinkSlackITCase.java
@@ -23,7 +23,6 @@ import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
 import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
-import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
 import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
@@ -55,11 +54,6 @@ public class CamelSinkSlackITCase extends AbstractKafkaTest {
         return new String[]{"camel-slack-kafka-connector"};
     }
 
-    @AfterEach
-    public void tearDown() {
-        deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()));
-    }
-
     private void runTest(ConnectorPropertyFactory connectorPropertyFactory, String message) throws ExecutionException, InterruptedException {
         connectorPropertyFactory.log();
         getKafkaConnectService().initializeConnector(connectorPropertyFactory);
diff --git a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java
index 24587b7..a6a1ff5 100644
--- a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java
+++ b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/sink/CamelSinkSyslogITCase.java
@@ -22,7 +22,6 @@ import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
 import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.camel.kafkaconnector.syslog.services.SyslogService;
-import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
@@ -61,11 +60,6 @@ public class CamelSinkSyslogITCase extends AbstractKafkaTest {
         received = 0;
     }
 
-    @AfterEach
-    public void tearDown() {
-        deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()));
-    }
-
     private void runBasicProduceTest(ConnectorPropertyFactory connectorPropertyFactory) throws Exception {
         connectorPropertyFactory.log();
         getKafkaConnectService().initializeConnectorBlocking(connectorPropertyFactory, 1);
diff --git a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/source/CamelSourceSyslogITCase.java b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/source/CamelSourceSyslogITCase.java
index a54d5cd..16df742 100644
--- a/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/source/CamelSourceSyslogITCase.java
+++ b/tests/itests-syslog/src/test/java/org/apache/camel/kafkaconnector/syslog/source/CamelSourceSyslogITCase.java
@@ -17,8 +17,6 @@
 
 package org.apache.camel.kafkaconnector.syslog.source;
 
-import java.io.IOException;
-
 import org.apache.camel.CamelContext;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.syslog.SyslogDataFormat;
@@ -30,7 +28,6 @@ import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
 import org.apache.camel.kafkaconnector.common.utils.NetworkUtils;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
@@ -65,11 +62,6 @@ public class CamelSourceSyslogITCase extends AbstractKafkaTest {
         received = 0;
     }
 
-    @AfterEach
-    public void tearDown() throws IOException, InterruptedException {
-        deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()));
-    }
-
     private void produceLogMessages(String protocol, String host, String port, String message) throws Exception {
         CamelContext camelContext = new DefaultCamelContext();
 
diff --git a/tests/itests-timer/src/test/java/org/apache/camel/kafkaconnector/timer/source/CamelSourceTimerITCase.java b/tests/itests-timer/src/test/java/org/apache/camel/kafkaconnector/timer/source/CamelSourceTimerITCase.java
index b9f6f2a..9f89ff1 100644
--- a/tests/itests-timer/src/test/java/org/apache/camel/kafkaconnector/timer/source/CamelSourceTimerITCase.java
+++ b/tests/itests-timer/src/test/java/org/apache/camel/kafkaconnector/timer/source/CamelSourceTimerITCase.java
@@ -17,7 +17,6 @@
 
 package org.apache.camel.kafkaconnector.timer.source;
 
-import java.io.IOException;
 import java.util.concurrent.ExecutionException;
 
 import org.apache.camel.kafkaconnector.common.AbstractKafkaTest;
@@ -25,7 +24,6 @@ import org.apache.camel.kafkaconnector.common.ConnectorPropertyFactory;
 import org.apache.camel.kafkaconnector.common.clients.kafka.KafkaClient;
 import org.apache.camel.kafkaconnector.common.utils.TestUtils;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
@@ -56,11 +54,6 @@ public class CamelSourceTimerITCase extends AbstractKafkaTest {
         received = 0;
     }
 
-    @AfterEach
-    public void tearDown() throws IOException, InterruptedException {
-        deleteKafkaTopic(TestUtils.getDefaultTestTopic(this.getClass()));
-    }
-
     private boolean checkRecord(ConsumerRecord<String, String> record) {
         received++;