[camel] branch master updated (689ba28 -> c3fe401)

classic Classic list List threaded Threaded
14 messages Options
Reply | Threaded
Open this post in threaded view
|

[camel] branch master updated (689ba28 -> c3fe401)

acosentino
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from 689ba28  Regen docs
     new c347a90  CAMEL-12543: Add camel-debezium component to Camel
     new 477c50a  Enhance the documentation with more information
     new 125d386  Update components/camel-debezium/src/main/docs/debezium-component.adoc
     new 129ba31  Update components/camel-debezium/src/main/docs/debezium-component.adoc
     new 75e4798  Update components/camel-debezium/src/main/docs/debezium-component.adoc
     new 075d9398 Update components/camel-debezium/src/main/docs/debezium-component.adoc
     new 858676c  Update components/camel-debezium/src/main/docs/debezium-component.adoc
     new 4c2a80b  Update components/camel-debezium/src/main/docs/debezium-component.adoc
     new 3f73837  Update components/camel-debezium/src/main/docs/debezium-component.adoc
     new 03c775e  Update components/camel-debezium/src/main/docs/debezium-component.adoc
     new 811f03e  Improve the documentation and fix debezium links
     new 24961a9  CAMEL-12543 - Fixed log configuration
     new c3fe401  CAMEL-12543 - Regen docs

The 13 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 apache-camel/pom.xml                               |  10 +
 apache-camel/src/main/descriptors/common-bin.xml   |   2 +
 bom/camel-bom/pom.xml                              |  10 +
 components/camel-debezium/pom.xml                  | 107 ++++
 .../src/main/docs/debezium-component.adoc          | 219 +++++++
 .../component/debezium/DebeziumComponent.java      |  93 +++
 .../component/debezium/DebeziumConnectorTypes.java |  42 ++
 .../component/debezium/DebeziumConstants.java      |  41 ++
 .../camel/component/debezium/DebeziumConsumer.java |  89 +++
 .../camel/component/debezium/DebeziumEndpoint.java | 157 +++++
 .../component/debezium/DebeziumTypeConverter.java  |  52 ++
 .../configuration/ConfigurationValidation.java     |  49 ++
 .../EmbeddedDebeziumConfiguration.java             | 345 +++++++++++
 ...ySqlConnectorEmbeddedDebeziumConfiguration.java | 663 +++++++++++++++++++++
 .../component/debezium/DebeziumComponentTest.java  | 147 +++++
 .../debezium/DebeziumConnectorTypesTest.java       |  39 ++
 .../component/debezium/DebeziumConsumerTest.java   | 200 +++++++
 .../component/debezium/DebeziumEndpointTest.java   | 292 +++++++++
 .../debezium/DebeziumTypeConverterTest.java        |  63 ++
 .../EmbeddedDebeziumConfigurationTest.java         |  63 ++
 ...FileConnectorEmbeddedDebeziumConfiguration.java |  69 +++
 ...ConnectorEmbeddedDebeziumConfigurationTest.java |  98 +++
 .../TestEmbeddedDebeziumConfiguration.java         |  50 ++
 .../src/test/resources/log4j2.properties           |  31 +
 components/pom.xml                                 |   1 +
 components/readme.adoc                             |   5 +-
 .../dsl/DebeziumEndpointBuilderFactory.java        | 126 ++++
 docs/components/modules/ROOT/nav.adoc              |   1 +
 .../modules/ROOT/pages/debezium-component.adoc     | 220 +++++++
 parent/pom.xml                                     |  11 +
 .../karaf/features/src/main/resources/features.xml |   9 +
 .../camel-debezium-starter/pom.xml                 |  53 ++
 .../DebeziumComponentAutoConfiguration.java        | 128 ++++
 .../springboot/DebeziumComponentConfiguration.java |  68 +++
 .../src/main/resources/META-INF/LICENSE.txt        |   0
 .../src/main/resources/META-INF/NOTICE.txt         |   0
 .../src/main/resources/META-INF/spring.factories   |  19 +
 .../src/main/resources/META-INF/spring.provides    |  17 +
 platforms/spring-boot/components-starter/pom.xml   |   1 +
 .../camel-spring-boot-dependencies/pom.xml         |  10 +
 .../camel/itest/karaf/CamelDebeziumTest.java       |  33 +
 .../camel/itest/springboot/CamelDebeziumTest.java  |  46 ++
 42 files changed, 3678 insertions(+), 1 deletion(-)
 create mode 100644 components/camel-debezium/pom.xml
 create mode 100644 components/camel-debezium/src/main/docs/debezium-component.adoc
 create mode 100644 components/camel-debezium/src/main/java/org/apache/camel/component/debezium/DebeziumComponent.java
 create mode 100644 components/camel-debezium/src/main/java/org/apache/camel/component/debezium/DebeziumConnectorTypes.java
 create mode 100644 components/camel-debezium/src/main/java/org/apache/camel/component/debezium/DebeziumConstants.java
 create mode 100644 components/camel-debezium/src/main/java/org/apache/camel/component/debezium/DebeziumConsumer.java
 create mode 100644 components/camel-debezium/src/main/java/org/apache/camel/component/debezium/DebeziumEndpoint.java
 create mode 100644 components/camel-debezium/src/main/java/org/apache/camel/component/debezium/DebeziumTypeConverter.java
 create mode 100644 components/camel-debezium/src/main/java/org/apache/camel/component/debezium/configuration/ConfigurationValidation.java
 create mode 100644 components/camel-debezium/src/main/java/org/apache/camel/component/debezium/configuration/EmbeddedDebeziumConfiguration.java
 create mode 100644 components/camel-debezium/src/main/java/org/apache/camel/component/debezium/configuration/MySqlConnectorEmbeddedDebeziumConfiguration.java
 create mode 100644 components/camel-debezium/src/test/java/org/apache/camel/component/debezium/DebeziumComponentTest.java
 create mode 100644 components/camel-debezium/src/test/java/org/apache/camel/component/debezium/DebeziumConnectorTypesTest.java
 create mode 100644 components/camel-debezium/src/test/java/org/apache/camel/component/debezium/DebeziumConsumerTest.java
 create mode 100644 components/camel-debezium/src/test/java/org/apache/camel/component/debezium/DebeziumEndpointTest.java
 create mode 100644 components/camel-debezium/src/test/java/org/apache/camel/component/debezium/DebeziumTypeConverterTest.java
 create mode 100644 components/camel-debezium/src/test/java/org/apache/camel/component/debezium/configuration/EmbeddedDebeziumConfigurationTest.java
 create mode 100644 components/camel-debezium/src/test/java/org/apache/camel/component/debezium/configuration/FileConnectorEmbeddedDebeziumConfiguration.java
 create mode 100644 components/camel-debezium/src/test/java/org/apache/camel/component/debezium/configuration/MySqlConnectorEmbeddedDebeziumConfigurationTest.java
 create mode 100644 components/camel-debezium/src/test/java/org/apache/camel/component/debezium/configuration/TestEmbeddedDebeziumConfiguration.java
 create mode 100644 components/camel-debezium/src/test/resources/log4j2.properties
 create mode 100644 core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/DebeziumEndpointBuilderFactory.java
 create mode 100644 docs/components/modules/ROOT/pages/debezium-component.adoc
 create mode 100644 platforms/spring-boot/components-starter/camel-debezium-starter/pom.xml
 create mode 100644 platforms/spring-boot/components-starter/camel-debezium-starter/src/main/java/org/apache/camel/component/debezium/springboot/DebeziumComponentAutoConfiguration.java
 create mode 100644 platforms/spring-boot/components-starter/camel-debezium-starter/src/main/java/org/apache/camel/component/debezium/springboot/DebeziumComponentConfiguration.java
 copy {tooling/maven/camel-package-maven-plugin => platforms/spring-boot/components-starter/camel-debezium-starter}/src/main/resources/META-INF/LICENSE.txt (100%)
 copy {tooling/maven/camel-package-maven-plugin => platforms/spring-boot/components-starter/camel-debezium-starter}/src/main/resources/META-INF/NOTICE.txt (100%)
 create mode 100644 platforms/spring-boot/components-starter/camel-debezium-starter/src/main/resources/META-INF/spring.factories
 create mode 100644 platforms/spring-boot/components-starter/camel-debezium-starter/src/main/resources/META-INF/spring.provides
 create mode 100644 tests/camel-itest-karaf/src/test/java/org/apache/camel/itest/karaf/CamelDebeziumTest.java
 create mode 100644 tests/camel-itest-spring-boot/src/test/java/org/apache/camel/itest/springboot/CamelDebeziumTest.java

Reply | Threaded
Open this post in threaded view
|

[camel] 01/13: CAMEL-12543: Add camel-debezium component to Camel

acosentino
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit c347a90c01fdc515609c657c09f79f56564bf8fa
Author: Omar Al-Safi <[hidden email]>
AuthorDate: Wed Aug 21 15:49:16 2019 +0200

    CAMEL-12543: Add camel-debezium component to Camel
---
 apache-camel/pom.xml                               |  10 +
 apache-camel/src/main/descriptors/common-bin.xml   |   2 +
 bom/camel-bom/pom.xml                              |  10 +
 components/camel-debezium/pom.xml                  | 107 ++++
 .../src/main/docs/debezium-component.adoc          | 210 +++++++
 .../component/debezium/DebeziumComponent.java      |  93 +++
 .../component/debezium/DebeziumConnectorTypes.java |  42 ++
 .../component/debezium/DebeziumConstants.java      |  41 ++
 .../camel/component/debezium/DebeziumConsumer.java |  89 +++
 .../camel/component/debezium/DebeziumEndpoint.java | 157 +++++
 .../component/debezium/DebeziumTypeConverter.java  |  52 ++
 .../configuration/ConfigurationValidation.java     |  49 ++
 .../EmbeddedDebeziumConfiguration.java             | 345 +++++++++++
 ...ySqlConnectorEmbeddedDebeziumConfiguration.java | 663 +++++++++++++++++++++
 .../component/debezium/DebeziumComponentTest.java  | 147 +++++
 .../debezium/DebeziumConnectorTypesTest.java       |  39 ++
 .../component/debezium/DebeziumConsumerTest.java   | 200 +++++++
 .../component/debezium/DebeziumEndpointTest.java   | 292 +++++++++
 .../debezium/DebeziumTypeConverterTest.java        |  63 ++
 .../EmbeddedDebeziumConfigurationTest.java         |  63 ++
 ...FileConnectorEmbeddedDebeziumConfiguration.java |  69 +++
 ...ConnectorEmbeddedDebeziumConfigurationTest.java |  98 +++
 .../TestEmbeddedDebeziumConfiguration.java         |  50 ++
 .../src/test/resources/log4j2.properties           |  40 ++
 components/pom.xml                                 |   1 +
 .../dsl/DebeziumEndpointBuilderFactory.java        | 126 ++++
 parent/pom.xml                                     |  11 +
 .../karaf/features/src/main/resources/features.xml |   9 +
 .../camel-debezium-starter/pom.xml                 |  53 ++
 .../DebeziumComponentAutoConfiguration.java        | 128 ++++
 .../springboot/DebeziumComponentConfiguration.java |  68 +++
 .../src/main/resources/META-INF/LICENSE.txt        | 203 +++++++
 .../src/main/resources/META-INF/NOTICE.txt         |  11 +
 .../src/main/resources/META-INF/spring.factories   |  19 +
 .../src/main/resources/META-INF/spring.provides    |  17 +
 platforms/spring-boot/components-starter/pom.xml   |   1 +
 .../camel-spring-boot-dependencies/pom.xml         |  10 +
 .../camel/itest/karaf/CamelDebeziumTest.java       |  33 +
 .../camel/itest/springboot/CamelDebeziumTest.java  |  46 ++
 39 files changed, 3667 insertions(+)

diff --git a/apache-camel/pom.xml b/apache-camel/pom.xml
index 2593dcd..b35aba0 100644
--- a/apache-camel/pom.xml
+++ b/apache-camel/pom.xml
@@ -463,6 +463,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.camel</groupId>
+      <artifactId>camel-debezium</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.camel</groupId>
       <artifactId>camel-digitalocean</artifactId>
       <version>${project.version}</version>
     </dependency>
@@ -2002,6 +2007,11 @@
     </dependency>
     <dependency>
       <groupId>org.apache.camel</groupId>
+      <artifactId>camel-debezium-starter</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.camel</groupId>
       <artifactId>camel-digitalocean-starter</artifactId>
       <version>${project.version}</version>
     </dependency>
diff --git a/apache-camel/src/main/descriptors/common-bin.xml b/apache-camel/src/main/descriptors/common-bin.xml
index edb9712..09b0844 100644
--- a/apache-camel/src/main/descriptors/common-bin.xml
+++ b/apache-camel/src/main/descriptors/common-bin.xml
@@ -113,6 +113,7 @@
         <include>org.apache.camel:camel-cxf-transport</include>
         <include>org.apache.camel:camel-dataformat</include>
         <include>org.apache.camel:camel-dataset</include>
+        <include>org.apache.camel:camel-debezium</include>
         <include>org.apache.camel:camel-digitalocean</include>
         <include>org.apache.camel:camel-direct</include>
         <include>org.apache.camel:camel-directvm</include>
@@ -460,6 +461,7 @@
         <include>org.apache.camel:camel-cxf-transport-starter</include>
         <include>org.apache.camel:camel-dataformat-starter</include>
         <include>org.apache.camel:camel-dataset-starter</include>
+        <include>org.apache.camel:camel-debezium-starter</include>
         <include>org.apache.camel:camel-digitalocean-starter</include>
         <include>org.apache.camel:camel-direct-starter</include>
         <include>org.apache.camel:camel-directvm-starter</include>
diff --git a/bom/camel-bom/pom.xml b/bom/camel-bom/pom.xml
index 2e8693e..518d5d6 100644
--- a/bom/camel-bom/pom.xml
+++ b/bom/camel-bom/pom.xml
@@ -804,6 +804,16 @@
       </dependency>
       <dependency>
         <groupId>org.apache.camel</groupId>
+        <artifactId>camel-debezium</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.camel</groupId>
+        <artifactId>camel-debezium-starter</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.camel</groupId>
         <artifactId>camel-digitalocean</artifactId>
         <version>${project.version}</version>
       </dependency>
diff --git a/components/camel-debezium/pom.xml b/components/camel-debezium/pom.xml
new file mode 100644
index 0000000..a3fee77
--- /dev/null
+++ b/components/camel-debezium/pom.xml
@@ -0,0 +1,107 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+         http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.camel</groupId>
+        <artifactId>components</artifactId>
+        <version>3.0.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>camel-debezium</artifactId>
+    <packaging>jar</packaging>
+    <name>Camel :: Debezium</name>
+    <description>Camel Debezium support</description>
+
+    <dependencies>
+
+        <!-- camel -->
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-support</artifactId>
+        </dependency>
+
+        <!-- debezium embedded engine -->
+        <dependency>
+            <groupId>io.debezium</groupId>
+            <artifactId>debezium-embedded</artifactId>
+            <version>${debezium-version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <!-- debezium connectors -->
+        <dependency>
+            <groupId>io.debezium</groupId>
+            <artifactId>debezium-connector-mysql</artifactId>
+            <version>${debezium-version}</version>
+        </dependency>
+
+        <!-- test -->
+        <dependency>
+            <groupId>org.apache.camel</groupId>
+            <artifactId>camel-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-slf4j-impl</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.logging.log4j</groupId>
+            <artifactId>log4j-1.2-api</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>commons-lang</groupId>
+            <artifactId>commons-lang</artifactId>
+            <version>${commons-lang-version}</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/components/camel-debezium/src/main/docs/debezium-component.adoc b/components/camel-debezium/src/main/docs/debezium-component.adoc
new file mode 100644
index 0000000..ba137cc
--- /dev/null
+++ b/components/camel-debezium/src/main/docs/debezium-component.adoc
@@ -0,0 +1,210 @@
+[[debezium-component]]
+= Debezium Component
+
+*Available as of Camel version 3.0*
+
+The Debezium component is wrapper around https://debezium.io/[Debezium] using https://debezium.io/docs/embedded/[Debezium Embedded], which enabled Change Data Capture from various databases without the need of Kafka or Kafka Connect.
+
+*Note on handling failures:* Per https://debezium.io/docs/embedded/#handling_failures[Debezium Embedded Engine] documentation, the engines is actively recording source offsets and periodically flushes these offsets to a persistent storage, so when the application is restarted or crashed, the engine will resume from the last recorded offset.
+Thus, at normal operation, your downstream routes will receive exactly once event, however in case of application crash (not having a graceful shutdown), the application will resume from the last recorded offset,
+which may result of receiving duplicate events immediately after the restart. Therefore, your downstream routes should be tolerant enough of such case and deduplicate events if needed.
+
+Maven users will need to add the following dependency to their `pom.xml`
+for this component.
+
+[source,xml]
+------------------------------------------------------------
+<dependency>
+    <groupId>org.apache.camel</groupId>
+    <artifactId>camel-debezium</artifactId>
+    <version>x.x.x</version>
+    <!-- use the same version as your Camel core version -->
+</dependency>
+------------------------------------------------------------
+
+== URI format
+
+[source,java]
+---------------------------
+debezium:connector-type[?options]
+
+---------------------------
+
+== Supported Debezium Connectors
+- https://debezium.io/docs/connectors/mysql/[MySql].
+
+
+== Options
+
+
+// component options: START
+The Debezium component supports 2 options, which are listed below.
+
+
+
+[width="100%",cols="2,5,^1,2",options="header"]
+|===
+| Name | Description | Default | Type
+| *configuration* (consumer) | Allow pre-configured Configurations to be set, you will need to extend EmbeddedDebeziumConfiguration in order to create the configuration for the component |  | EmbeddedDebezium Configuration
+| *basicPropertyBinding* (advanced) | Whether the component should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities | false | boolean
+|===
+// component options: END
+
+
+// endpoint options: START
+The Debezium endpoint is configured using URI syntax:
+
+----
+debezium:connectorType
+----
+
+with the following path and query parameters:
+
+=== Path Parameters (1 parameters):
+
+
+[width="100%",cols="2,5,^1,2",options="header"]
+|===
+| Name | Description | Default | Type
+| *connectorType* | *Required* The Debezium connector type that is supported by Camel Debezium component. |  | String
+|===
+
+
+=== Query Parameters (47 parameters):
+
+
+[width="100%",cols="2,5,^1,2",options="header"]
+|===
+| Name | Description | Default | Type
+| *bridgeErrorHandler* (consumer) | Allows for bridging the consumer to the Camel routing Error Handler, which mean any exceptions occurred while the consumer is trying to pickup incoming messages, or the likes, will now be processed as a message and handled by the routing Error Handler. By default the consumer will use the org.apache.camel.spi.ExceptionHandler to deal with exceptions, that will be logged at WARN or ERROR level and ignored. | false | boolean
+| *internalKeyConverter* (consumer) | The Converter class that should be used to serialize and deserialize key data for offsets. The default is JSON converter. | org.apache.kafka.connect.json.JsonConverter | String
+| *internalValueConverter* (consumer) | The Converter class that should be used to serialize and deserialize value data for offsets. The default is JSON converter. | org.apache.kafka.connect.json.JsonConverter | String
+| *name* (consumer) | *Required* Unique name for the connector. Attempting to register again with the same name will fail. |  | String
+| *offsetCommitPolicy* (consumer) | The name of the Java class of the commit policy. It defines when offsets commit has to be triggered based on the number of events processed and the time elapsed since the last commit. This class must implement the interface .OffsetCommitPolicy. The default is a periodic commit policy based upon time intervals. | io.debezium.embedded.spi.OffsetCommitPolicy.PeriodicCommitOffsetPolicy | String
+| *offsetCommitTimeoutMs* (consumer) | Maximum number of milliseconds to wait for records to flush and partition offset data to be committed to offset storage before cancelling the process and restoring the offset data to be committed in a future attempt. The default is 5 seconds. | 5000 | long
+| *offsetFlushIntervalMs* (consumer) | Interval at which to try committing offsets. The default is 1 minute. | 60000 | long
+| *offsetStorage* (consumer) | The name of the Java class that is responsible for persistence of connector offsets. | org.apache.kafka.connect.storage.FileOffsetBackingStore | String
+| *offsetStorageFileName* (consumer) | Path to file where offsets are to be stored. Required when offset.storage is set to the FileOffsetBackingStore |  | String
+| *offsetStoragePartitions* (consumer) | The number of partitions used when creating the offset storage topic. Required when offset.storage is set to the .KafkaOffsetBackingStore. |  | int
+| *offsetStorageReplication Factor* (consumer) | Replication factor used when creating the offset storage topic. Required when offset.storage is set to the KafkaOffsetBackingStore |  | int
+| *offsetStorageTopic* (consumer) | The name of the Kafka topic where offsets are to be stored. Required when offset.storage is set to the KafkaOffsetBackingStore. |  | String
+| *exceptionHandler* (consumer) | To let the consumer use a custom ExceptionHandler. Notice if the option bridgeErrorHandler is enabled then this option is not in use. By default the consumer will deal with exceptions, that will be logged at WARN or ERROR level and ignored. |  | ExceptionHandler
+| *exchangePattern* (consumer) | Sets the exchange pattern when the consumer creates an exchange. |  | ExchangePattern
+| *basicPropertyBinding* (advanced) | Whether the endpoint should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities | false | boolean
+| *synchronous* (advanced) | Sets whether synchronous processing should be strictly used, or Camel is allowed to use asynchronous processing (if supported). | false | boolean
+| *bigintUnsignedHandlingMode* (mysql) | Specifies how BIGINT UNSIGNED columns should be represented in change events, including: precise uses java.math.BigDecimal to represent values, which are encoded in the change events using a binary representation and Kafka Connects org.apache.kafka.connect.data.Decimal type; long (the default) represents values using Javas long, which may not offer the precision but will be far easier to use in consumers. long is usually the preferable setting. On [...]
+| *columnBlacklist* (mysql) | An optional comma-separated list of regular expressions that match the fully-qualified names of columns that should be excluded from change event message values. Fully-qualified names for columns are of the form databaseName.tableName.columnName, or databaseName.schemaName.tableName.columnName. |  | String
+| *connectTimeoutMs* (mysql) | A positive integer value that specifies the maximum time in milliseconds this connector should wait after trying to connect to the MySQL database server before timing out. Defaults to 30 seconds. | 30000 | long
+| *databaseBlacklist* (mysql) | An optional comma-separated list of regular expressions that match database names to be excluded from monitoring; any database name not included in the blacklist will be monitored. May not be used with database.whitelist. |  | String
+| *databaseHistory* (mysql) | The name of the DatabaseHistory class that should be used to store and recover database schema changes. | io.debezium.relational.history.FileDatabaseHistory | String
+| *databaseHistoryFileName* (mysql) | The path to the file that will be used to record the database history |  | String
+| *databaseHistoryKafka BootstrapServers* (mysql) | The full name of the Kafka topic where the connector will store the database schema history. |  | String
+| *databaseHistoryKafkaTopic* (mysql) | The full name of the Kafka topic where the connector will store the database schema history. |  | String
+| *databaseHostName* (mysql) | *Required* IP address or hostname of the target database server. |  | String
+| *databasePassword* (mysql) | *Required* Password to use when connecting to the database server. |  | String
+| *databasePort* (mysql) | Integer port number of the database server. | 3306 | int
+| *databaseServerId* (mysql) | *Required* A numeric ID of this database client, which must be unique across all currently-running database processes in the database cluster. This connector joins the database cluster as another server (with this unique ID) so it can read the binlog. |  | int
+| *databaseServerName* (mysql) | *Required* Logical name that identifies and provides a namespace for the particular database server/cluster being monitored. |  | String
+| *databaseUser* (mysql) | *Required* Name of the MySQL database to use when connecting to the database server. |  | String
+| *databaseWhitelist* (mysql) | An optional comma-separated list of regular expressions that match database names to be monitored; any database name not included in the whitelist will be excluded from monitoring. By default all databases will be monitored. May not be used with database.blacklist. |  | String
+| *ddlParserMode* (mysql) | Controls which parser should be used for parsing DDL statements when building up the meta-model of the captured database structure. Can be one of legacy (for the legacy hand-written parser implementation) or antlr (for new Antlr based implementation introduced in Debezium 0.8.0). While the legacy parser remains the default for Debezium 0.8.x, please try out the new implementation and report back any issues you encounter. The new parser is the default as of 0.9 [...]
+| *decimalHandlingMode* (mysql) | Specifies how the connector should handle values for DECIMAL and NUMERIC columns: precise (the default) represents them precisely using java.math.BigDecimal values represented in change events in a binary form; or double represents them using double values, which may result in a loss of precision but will be far easier to use. string option encodes values as formatted string which is easy to consume but a semantic information about the real type is lost. [...]
+| *eventDeserializationFailure HandlingMode* (mysql) | Specifies how the connector should react to exceptions during deserialization of binlog events. fail will propagate the exception (indicating the problematic event and its binlog offset), causing the connector to stop. warn will cause the problematic event to be skipped and the problematic event and its binlog offset to be logged (make sure that the logger is set to the WARN or ERROR level). ignore will cause problematic event will b [...]
+| *gtidNewChannelPosition* (mysql) | When set to latest, when the connector sees a new GTID channel, it will start consuming from the last executed transaction in that GTID channel. If set to earliest, the connector starts reading that channel from the first available (not purged) GTID position. earliest is useful when you have a active-passive MySQL setup where Debezium is connected to master, in this case during failover the slave with new UUID (and GTID channel) starts receiving write [...]
+| *gtidSourceExcludes* (mysql) | A comma-separated list of regular expressions that match source UUIDs in the GTID set used to find the binlog position in the MySQL server. Only the GTID ranges that have sources matching none of these exclude patterns will be used. May not be used with gtid.source.includes. |  | String
+| *gtidSourceIncludes* (mysql) | A comma-separated list of regular expressions that match source UUIDs in the GTID set used to find the binlog position in the MySQL server. Only the GTID ranges that have sources matching one of these include patterns will be used. May not be used with gtid.source.excludes. |  | String
+| *includeQuery* (mysql) | Boolean value that specifies whether the connector should include the original SQL query that generated the change event. Note: This option requires MySQL be configured with the binlog_rows_query_log_events option set to ON. Query will not be present for events generated from the snapshot process. Warning: Enabling this option may expose tables or fields explicitly blacklisted or masked by including the original SQL statement in the change event. For this reaso [...]
+| *includeSchemaChanges* (mysql) | Boolean value that specifies whether the connector should publish changes in the database schema to a Kafka topic with the same name as the database server ID. Each schema change will be recorded using a key that contains the database name and whose value includes the DDL statement(s). This is independent of how the connector internally records database history. The default is true. | true | boolean
+| *inconsistentSchemaHandling Mode* (mysql) | Specifies how the connector should react to binlog events that relate to tables that are not present in internal schema representation (i.e. internal representation is not consistent with database) fail will throw an exception (indicating the problematic event and its binlog offset), causing the connector to stop. warn will cause the problematic event to be skipped and the problematic event and its binlog offset to be logged (make sure that t [...]
+| *maxBatchSize* (mysql) | Positive integer value that specifies the maximum size of each batch of events that should be processed during each iteration of this connector. Defaults to 2048. | 2048 | int
+| *maxQueueSize* (mysql) | Positive integer value that specifies the maximum size of the blocking queue into which change events read from the database log are placed before they are written to Kafka. This queue can provide backpressure to the binlog reader when, for example, writes to Kafka are slower or if Kafka is not available. Events that appear in the queue are not included in the offsets periodically recorded by this connector. Defaults to 8192, and should always be larger than th [...]
+| *pollIntervalMs* (mysql) | Positive integer value that specifies the number of milliseconds the connector should wait during each iteration for new change events to appear. Defaults to 1000 milliseconds, or 1 second. | 1000 | long
+| *tableBlacklist* (mysql) | An optional comma-separated list of regular expressions that match fully-qualified table identifiers for tables to be excluded from monitoring; any table not included in the blacklist will be monitored. Each identifier is of the form databaseName.tableName. May not be used with table.whitelist. |  | String
+| *tableWhitelist* (mysql) | An optional comma-separated list of regular expressions that match fully-qualified table identifiers for tables to be monitored; any table not included in the whitelist will be excluded from monitoring. Each identifier is of the form databaseName.tableName. By default the connector will monitor every non-system table in each monitored database. May not be used with table.blacklist. |  | String
+| *timePrecisionMode* (mysql) | Time, date, and timestamps can be represented with different kinds of precision, including: adaptive_time_microseconds (the default) captures the date, datetime and timestamp values exactly as in the database using either millisecond, microsecond, or nanosecond precision values based on the database columns type, with the exception of TIME type fields, which are always captured as microseconds; adaptive (deprecated) captures the time and timestamp values e [...]
+| *tombstonesOnDelete* (mysql) | Controls whether a tombstone event should be generated after a delete event. When true the delete operations are represented by a delete event and a subsequent tombstone event. When false only a delete event is sent. Emitting the tombstone event (the default behavior) allows Kafka to completely delete all events pertaining to the given key once the source record got deleted. | false | boolean
+|===
+// endpoint options: END
+// spring-boot-auto-configure options: START
+== Spring Boot Auto-Configuration
+
+When using Spring Boot make sure to use the following Maven dependency to have support for auto configuration:
+
+[source,xml]
+----
+<dependency>
+  <groupId>org.apache.camel</groupId>
+  <artifactId>camel-debezium-starter</artifactId>
+  <version>x.x.x</version>
+  <!-- use the same version as your Camel core version -->
+</dependency>
+----
+
+
+The component supports 3 options, which are listed below.
+
+
+
+[width="100%",cols="2,5,^1,2",options="header"]
+|===
+| Name | Description | Default | Type
+| *camel.component.debezium.basic-property-binding* | Whether the component should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities | false | Boolean
+| *camel.component.debezium.configuration* | Allow pre-configured Configurations to be set, you will need to extend EmbeddedDebeziumConfiguration in order to create the configuration for the component. The option is a org.apache.camel.component.debezium.configuration.EmbeddedDebeziumConfiguration type. |  | String
+| *camel.component.debezium.enabled* | Whether to enable auto configuration of the debezium component. This is enabled by default. |  | Boolean
+|===
+// spring-boot-auto-configure options: END
+
+For more information about configuration:
+https://debezium.io/docs/embedded/#engine-properties[https://debezium.io/docs/embedded/#engine-properties]
+https://debezium.io/docs/connectors/mysql/#connector-properties[https://debezium.io/docs/connectors/mysql/#connector-properties]
+
+== Message headers
+
+=== Consumer headers
+
+The following headers are available when consuming messages from Kafka.
+[width="100%",cols="2m,2m,1m,5",options="header"]
+|===
+| Header constant                           | Header value                                   | Type        | Description
+| DebeziumConstants.HEADER_IDENTIFIER       | "CamelDebeziumIdentifier"                      | String      | The identifier of the connector, normally is this format "{server-name}.{database-name}.{table-name}".
+| DebeziumConstants.HEADER_KEY              | "CamelDebeziumKey"                             | Object      | The key of the event, normally is the table Primary Key.
+| DebeziumConstants.HEADER_SOURCE_METADATA  | "CamelDebeziumSourceMetadata"                  | Map         | The metadata about the source event, for example `table` name, database `name`, log position, etc, please refer to debezium documentation for more info.
+| DebeziumConstants.HEADER_OPERATION        | "CamelDebeziumOperation"                       | String      | If presents, the type of event operation. Values for the connector are c for create (or insert), u for update, d for delete.
+| DebeziumConstants.HEADER_TIMESTAMP        | "CamelDebeziumTimestamp"                       | Long        | If presents, the time (using the system clock in the JVM) at which the connector processed the event.
+| DebeziumConstants.HEADER_BEFORE           | "CamelDebeziumBefore"                          | Map/Struct  | If presents, contains the state of the row before the event occurred.
+|===
+
+== Samples
+
+=== Consuming events
+
+Here is a very simple route that you can use in order to listen to Debezium events from MySql connector.
+[source,java]
+----
+from("debezium:mysql?name=dbz-test-1&offsetStorageFileName=/usr/offset-file-1.dat&databaseHostName=localhost&databaseUser=debezium&databasePassword=dbz&databaseServerName=my-app-connector&databaseHistoryFileName=/usr/history-file-1.dat")
+    .log("Event received from Debezium : ${body}")
+    .log("    with this identifier ${headers.CamelDebeziumIdentifier}")
+    .log("    with these source metadata ${headers.CamelDebeziumSourceMetadata}")
+    .log("    the event occured upon this operation '${headers.CamelDebeziumSourceOperation}'")
+    .log("    on this database '${headers.CamelDebeziumSourceMetadata[db]}' and this table '${headers.CamelDebeziumSourceMetadata[table]}'")
+    .log("    with the key ${headers.CamelDebeziumKey}")
+    .log("    the previous value is ${headers.CamelDebeziumBefore}")
+----
+
+You can query the body as normal `Map` since this component contains a https://camel.apache.org/manual/latest/type-converter.html[Type Converter] that converts from
+from default output type of https://kafka.apache.org/22/javadoc/org/apache/kafka/connect/data/Struct.html[`Struct`] to `Map`. However, sometimes you may want to access the schema of the value, especially if you will perform special data conversion (to protobuf, avro .. etc), you can obtain https://kafka.apache.org/22/javadoc/org/apache/kafka/connect/data/Schema.html[`Schema`] type from `Struct` like this:
+[source,java]
+----
+from("debezium:[connectorType]?[options]])
+    .process(exchange -> {
+        final Struct bodyValue = exchange.getIn().getBody(Struct.class);
+        final Schema schemaValue = bodyValue.schema();
+
+        log.info("Body value is :" + bodyValue);
+        log.info("With Schema : " + schemaValue);
+        log.info("And fields of :" + schemaValue.fields());
+    });
+----
+
+
+*Important Note:* This component is a thin wrapper around Debezium Engine as mentioned, therefore before using this component in production, you need to understand how Debezium works and how configurations can reflect the expected behavior, especially in regards to https://debezium.io/docs/embedded/#handling_failures[handling failures].
diff --git a/components/camel-debezium/src/main/java/org/apache/camel/component/debezium/DebeziumComponent.java b/components/camel-debezium/src/main/java/org/apache/camel/component/debezium/DebeziumComponent.java
new file mode 100644
index 0000000..6ab3bea
--- /dev/null
+++ b/components/camel-debezium/src/main/java/org/apache/camel/component/debezium/DebeziumComponent.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.debezium;
+
+import java.util.Map;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.component.debezium.configuration.ConfigurationValidation;
+import org.apache.camel.component.debezium.configuration.EmbeddedDebeziumConfiguration;
+import org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration;
+import org.apache.camel.spi.annotations.Component;
+import org.apache.camel.support.DefaultComponent;
+import org.apache.camel.util.ObjectHelper;
+
+/**
+ * Represents the component that manages {@link DebeziumEndpoint}.
+ */
+@Component("debezium")
+public class DebeziumComponent extends DefaultComponent {
+
+    private EmbeddedDebeziumConfiguration configuration;
+
+    public DebeziumComponent() {
+    }
+
+    public DebeziumComponent(CamelContext context) {
+        super(context);
+    }
+
+    @Override
+    protected DebeziumEndpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters)
+        throws Exception {
+        // check for type when configurations are not set explicitly
+        if (ObjectHelper.isEmpty(remaining) && configuration == null) {
+            throw new IllegalArgumentException("Connector type must be configured on endpoint using syntax debezium:type");
+        }
+
+        if (configuration == null) {
+            // we will change to factory strategy in order to create the configurations once
+            // we have more than one connector supported
+            final DebeziumConnectorTypes connectorTypes = DebeziumConnectorTypes.fromString(remaining);
+            if (connectorTypes == DebeziumConnectorTypes.MYSQL) {
+                configuration = new MySqlConnectorEmbeddedDebeziumConfiguration();
+            } else {
+                throw new IllegalArgumentException(String
+                    .format("Connector of type '%s' is not supported yet.",
+                            connectorTypes.getName().toLowerCase()));
+            }
+        }
+
+        setProperties(configuration, parameters);
+
+        // validate configurations
+        final ConfigurationValidation configurationValidation = configuration.validateConfiguration();
+
+        if (!configurationValidation.isValid()) {
+            throw new IllegalArgumentException(configurationValidation.getReason());
+        }
+
+        return new DebeziumEndpoint(uri, this, configuration);
+    }
+
+    /**
+     * Allow pre-configured Configurations to be set, you will need to extend
+     * {@link EmbeddedDebeziumConfiguration} in order to create the configuration
+     * for the component
+     *
+     * @return {@link EmbeddedDebeziumConfiguration}
+     */
+    public EmbeddedDebeziumConfiguration getConfiguration() {
+        return configuration;
+    }
+
+    public void setConfiguration(EmbeddedDebeziumConfiguration configuration) {
+        if (this.configuration == null) {
+            this.configuration = configuration;
+        }
+    }
+}
diff --git a/components/camel-debezium/src/main/java/org/apache/camel/component/debezium/DebeziumConnectorTypes.java b/components/camel-debezium/src/main/java/org/apache/camel/component/debezium/DebeziumConnectorTypes.java
new file mode 100644
index 0000000..a6cbb63
--- /dev/null
+++ b/components/camel-debezium/src/main/java/org/apache/camel/component/debezium/DebeziumConnectorTypes.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.debezium;
+
+public enum DebeziumConnectorTypes {
+    // for the initial version of the component, we support MySQL
+    MYSQL("mysql");
+
+    private final String name;
+
+    DebeziumConnectorTypes(final String name) {
+        this.name = name;
+    }
+
+    public static DebeziumConnectorTypes fromString(final String typeString) {
+        for (DebeziumConnectorTypes type : DebeziumConnectorTypes.values()) {
+            if (type.getName().equalsIgnoreCase(typeString)) {
+                return type;
+            }
+        }
+        throw new IllegalArgumentException(String.format("No matching Debezium connector type of '%s'",
+                                                         typeString));
+    }
+
+    public String getName() {
+        return name;
+    }
+}
diff --git a/components/camel-debezium/src/main/java/org/apache/camel/component/debezium/DebeziumConstants.java b/components/camel-debezium/src/main/java/org/apache/camel/component/debezium/DebeziumConstants.java
new file mode 100644
index 0000000..15df7a2
--- /dev/null
+++ b/components/camel-debezium/src/main/java/org/apache/camel/component/debezium/DebeziumConstants.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.debezium;
+
+import io.debezium.relational.history.FileDatabaseHistory;
+
+import org.apache.kafka.connect.storage.FileOffsetBackingStore;
+
+public final class DebeziumConstants {
+    // embedded engine constant
+    public static final String DEFAULT_OFFSET_STORAGE = FileOffsetBackingStore.class.getName();
+
+    // mysql constant
+    public static final String DEFAULT_DATABASE_HISTORY = FileDatabaseHistory.class.getName();
+
+    // header names
+    private static final String HEADER_PREFIX = "CamelDebezium";
+    public static final String HEADER_SOURCE_METADATA = HEADER_PREFIX + "SourceMetadata";
+    public static final String HEADER_IDENTIFIER = HEADER_PREFIX + "Identifier";
+    public static final String HEADER_KEY = HEADER_PREFIX + "Key";
+    public static final String HEADER_OPERATION = HEADER_PREFIX + "Operation";
+    public static final String HEADER_TIMESTAMP = HEADER_PREFIX + "Timestamp";
+    public static final String HEADER_BEFORE = HEADER_PREFIX + "Before";
+
+    private DebeziumConstants() {
+    }
+}
diff --git a/components/camel-debezium/src/main/java/org/apache/camel/component/debezium/DebeziumConsumer.java b/components/camel-debezium/src/main/java/org/apache/camel/component/debezium/DebeziumConsumer.java
new file mode 100644
index 0000000..31a2d43
--- /dev/null
+++ b/components/camel-debezium/src/main/java/org/apache/camel/component/debezium/DebeziumConsumer.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.debezium;
+
+import java.util.concurrent.ExecutorService;
+
+import io.debezium.embedded.EmbeddedEngine;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.component.debezium.configuration.EmbeddedDebeziumConfiguration;
+import org.apache.camel.support.DefaultConsumer;
+import org.apache.kafka.connect.source.SourceRecord;
+
+public class DebeziumConsumer extends DefaultConsumer {
+
+    private final DebeziumEndpoint endpoint;
+    private final EmbeddedDebeziumConfiguration configuration;
+
+    private ExecutorService executorService;
+    private EmbeddedEngine dbzEngine;
+
+    public DebeziumConsumer(DebeziumEndpoint endpoint, Processor processor) {
+        super(endpoint, processor);
+        this.endpoint = endpoint;
+        this.configuration = endpoint.getConfiguration();
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+        super.doStart();
+
+        // start a single threaded pool to monitor events
+        executorService = endpoint.createExecutor();
+
+        // create engine
+        dbzEngine = createDbzEngine();
+
+        // submit task to the thread pool
+        executorService.submit(dbzEngine);
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        dbzEngine.stop();
+
+        // shutdown the thread pool gracefully
+        getEndpoint().getCamelContext().getExecutorServiceManager().shutdownGraceful(executorService);
+
+        // shutdown camel consumer
+        super.doStop();
+    }
+
+    private EmbeddedEngine createDbzEngine() {
+        return EmbeddedEngine.create().using(configuration.createDebeziumConfiguration())
+            .notifying(this::onEventListener).build();
+    }
+
+    private void onEventListener(final SourceRecord event) {
+        final Exchange exchange = endpoint.createDbzExchange(event);
+
+        try {
+            // send message to next processor in the route
+            getProcessor().process(exchange);
+        } catch (Exception ex) {
+            exchange.setException(ex);
+        } finally {
+            // log exception if an exception occurred and was not handled
+            if (exchange.getException() != null) {
+                getExceptionHandler().handleException("Error processing exchange", exchange,
+                                                      exchange.getException());
+            }
+        }
+    }
+}
diff --git a/components/camel-debezium/src/main/java/org/apache/camel/component/debezium/DebeziumEndpoint.java b/components/camel-debezium/src/main/java/org/apache/camel/component/debezium/DebeziumEndpoint.java
new file mode 100644
index 0000000..e08abca
--- /dev/null
+++ b/components/camel-debezium/src/main/java/org/apache/camel/component/debezium/DebeziumEndpoint.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.debezium;
+
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+
+import io.debezium.data.Envelope;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.component.debezium.configuration.EmbeddedDebeziumConfiguration;
+import org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.support.DefaultEndpoint;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.DataException;
+import org.apache.kafka.connect.source.SourceRecord;
+
+/**
+ * Represents a Debezium endpoint which is used for interacting with Debezium
+ * embedded engine.
+ */
+@UriEndpoint(firstVersion = "3.0.0", scheme = "debezium", title = "Debezium", syntax = "debezium:connectorType", label = "database,sql,nosql", consumerOnly = true)
+public class DebeziumEndpoint extends DefaultEndpoint {
+
+    @UriParam
+    private EmbeddedDebeziumConfiguration configuration;
+
+    // is only meant to include different configurations for each connector that is
+    // generated by camel-maven plugin
+    @UriParam
+    private MySqlConnectorEmbeddedDebeziumConfiguration mySqlConnectorEmbeddedDebeziumConfiguration;
+
+    public DebeziumEndpoint(String uri, DebeziumComponent component, EmbeddedDebeziumConfiguration config) {
+        super(uri, component);
+        this.configuration = config;
+    }
+
+    public DebeziumEndpoint() {
+    }
+
+    @Override
+    public Producer createProducer() throws Exception {
+        throw new UnsupportedOperationException("Cannot produce from a DebeziumEndpoint: "
+                                                + getEndpointUri());
+    }
+
+    @Override
+    public Consumer createConsumer(Processor processor) throws Exception {
+        return new DebeziumConsumer(this, processor);
+    }
+
+    public EmbeddedDebeziumConfiguration getConfiguration() {
+        return configuration;
+    }
+
+    public void setConfiguration(EmbeddedDebeziumConfiguration configuration) {
+        this.configuration = configuration;
+    }
+
+    public MySqlConnectorEmbeddedDebeziumConfiguration getMySqlConnectorEmbeddedDebeziumConfiguration() {
+        return mySqlConnectorEmbeddedDebeziumConfiguration;
+    }
+
+    public ExecutorService createExecutor() {
+        return getCamelContext().getExecutorServiceManager().newSingleThreadExecutor(this,
+                                                                                     "DebeziumConsumer");
+    }
+
+    public Exchange createDbzExchange(final SourceRecord record) {
+        final Exchange exchange = super.createExchange();
+
+        final Message message = exchange.getIn();
+
+        final Schema valueSchema = record.valueSchema();
+        final Object value = record.value();
+
+        // extract values from SourceRecord
+        final Map<String, Object> sourceMetadata = extractSourceMetadataValueFromValueStruct(valueSchema, value);
+        final Object operation = extractValueFromValueStruct(valueSchema, value, Envelope.FieldName.OPERATION);
+        final Object before = extractValueFromValueStruct(valueSchema, value, Envelope.FieldName.BEFORE);
+        final Object after = extractAfterValueFromValueStruct(valueSchema, value);
+        final Object timestamp = extractValueFromValueStruct(valueSchema, value, Envelope.FieldName.TIMESTAMP);
+
+        // set message headers
+        message.setHeader(DebeziumConstants.HEADER_IDENTIFIER, record.topic());
+        message.setHeader(DebeziumConstants.HEADER_KEY, record.key());
+        message.setHeader(DebeziumConstants.HEADER_SOURCE_METADATA, sourceMetadata);
+        message.setHeader(DebeziumConstants.HEADER_OPERATION, operation);
+        message.setHeader(DebeziumConstants.HEADER_BEFORE, before);
+        message.setHeader(DebeziumConstants.HEADER_TIMESTAMP, timestamp);
+
+        // set 'after' as message body
+        message.setBody(after);
+
+        return exchange;
+    }
+
+    private Map<String, Object> extractSourceMetadataValueFromValueStruct(final Schema schema, final Object value) {
+        // we want to convert metadata to map since it facilitate usage and also struct structure is not needed for the metadata
+        final Object valueExtracted = extractValueFromValueStruct(schema, value, Envelope.FieldName.SOURCE);
+
+        if (valueExtracted != null) {
+            return DebeziumTypeConverter.toMap((Struct) valueExtracted);
+        }
+        return null;
+    }
+
+    private Object extractAfterValueFromValueStruct(final Schema schema, final Object value) {
+        // first we try with normal extraction from value struct
+        final Object valueExtracted = extractValueFromValueStruct(schema, value, Envelope.FieldName.AFTER);
+
+        if (valueExtracted == null && !isSchemaAStructSchema(schema)) { // we could have anything other than struct, we just return that
+            return value;
+        }
+        return valueExtracted;
+    }
+
+    private Object extractValueFromValueStruct(final Schema schema, final Object value, final String fieldName) {
+        // first we check if we have a value and a schema of struct type
+        if (isSchemaAStructSchema(schema) && value != null) {
+            // now we return our desired fieldName
+            try {
+                final Struct valueStruct = (Struct) value;
+                return valueStruct.get(fieldName);
+            } catch (DataException e) {
+                // we return null instead since this exception thrown when no value set or field doesn't exist
+                return null;
+            }
+        }
+        return null;
+    }
+
+    private boolean isSchemaAStructSchema(final Schema schema) {
+        return schema != null && schema.type().equals(Schema.Type.STRUCT);
+    }
+}
diff --git a/components/camel-debezium/src/main/java/org/apache/camel/component/debezium/DebeziumTypeConverter.java b/components/camel-debezium/src/main/java/org/apache/camel/component/debezium/DebeziumTypeConverter.java
new file mode 100644
index 0000000..f7fd63e
--- /dev/null
+++ b/components/camel-debezium/src/main/java/org/apache/camel/component/debezium/DebeziumTypeConverter.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.debezium;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.Converter;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.DataException;
+
+@Converter(generateLoader = true)
+public final class DebeziumTypeConverter {
+
+    private DebeziumTypeConverter() { }
+
+    /**
+     * Convert {@link Struct} to {@link HashMap}, this only works with flat fields and it doesn't handle nested structure.
+     * Also as a result of the conversion, the schema data will be lost which is expected.
+     *
+     * @param struct
+     * @return {@link Map}
+     */
+    @Converter
+    public static Map<String, Object> toMap(final Struct struct) {
+        final HashMap<String, Object> fieldsToValues = new HashMap<>();
+
+        struct.schema().fields().forEach(field -> {
+            try {
+                fieldsToValues.put(field.name(), struct.get(field));
+            } catch (DataException e) {
+                fieldsToValues.put(field.name(), null);
+            }
+        });
+
+        return fieldsToValues;
+    }
+}
diff --git a/components/camel-debezium/src/main/java/org/apache/camel/component/debezium/configuration/ConfigurationValidation.java b/components/camel-debezium/src/main/java/org/apache/camel/component/debezium/configuration/ConfigurationValidation.java
new file mode 100644
index 0000000..d5041fc
--- /dev/null
+++ b/components/camel-debezium/src/main/java/org/apache/camel/component/debezium/configuration/ConfigurationValidation.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.debezium.configuration;
+
+import org.apache.camel.util.ObjectHelper;
+
+public final class ConfigurationValidation {
+    private final boolean isValid;
+    private final String reason;
+
+    private ConfigurationValidation(final boolean isValid, final String reason) {
+        this.isValid = isValid;
+        this.reason = reason;
+    }
+
+    public static ConfigurationValidation valid() {
+        return new ConfigurationValidation(true, "");
+    }
+
+    public static ConfigurationValidation notValid(final String reason) {
+        if (ObjectHelper.isEmpty(reason)) {
+            throw new IllegalArgumentException("You will need to specify a reason why is not valid");
+        }
+        return new ConfigurationValidation(false, reason);
+    }
+
+    public boolean isValid() {
+        return isValid;
+    }
+
+    public String getReason() {
+        return reason;
+    }
+
+}
diff --git a/components/camel-debezium/src/main/java/org/apache/camel/component/debezium/configuration/EmbeddedDebeziumConfiguration.java b/components/camel-debezium/src/main/java/org/apache/camel/component/debezium/configuration/EmbeddedDebeziumConfiguration.java
new file mode 100644
index 0000000..0ac3a15
--- /dev/null
+++ b/components/camel-debezium/src/main/java/org/apache/camel/component/debezium/configuration/EmbeddedDebeziumConfiguration.java
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.debezium.configuration;
+
+import io.debezium.config.Configuration;
+import io.debezium.config.Field;
+import io.debezium.embedded.EmbeddedEngine;
+
+import io.debezium.embedded.spi.OffsetCommitPolicy;
+import org.apache.camel.component.debezium.DebeziumConstants;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriParams;
+import org.apache.camel.spi.UriPath;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.kafka.connect.json.JsonConverter;
+
+@UriParams
+public abstract class EmbeddedDebeziumConfiguration {
+
+    private static final String LABEL_NAME = "consumer";
+
+    @UriPath(label = LABEL_NAME)
+    @Metadata(required = true)
+    private String connectorType;
+
+    private Class<?> connectorClass;
+    // name
+    @UriParam(label = LABEL_NAME)
+    @Metadata(required = true)
+    private String name;
+    // offset.storage
+    @UriParam(label = LABEL_NAME, defaultValue = "org.apache.kafka.connect.storage.FileOffsetBackingStore")
+    private String offsetStorage = DebeziumConstants.DEFAULT_OFFSET_STORAGE;
+    // offset.storage.file.filename
+    @UriParam(label = LABEL_NAME)
+    private String offsetStorageFileName;
+    // offset.storage.topic
+    @UriParam(label = LABEL_NAME)
+    private String offsetStorageTopic;
+    // offset.storage.partitions
+    @UriParam(label = LABEL_NAME)
+    private int offsetStoragePartitions;
+    // offset.storage.replication.factor
+    @UriParam(label = LABEL_NAME)
+    private int offsetStorageReplicationFactor;
+    // offset.commit.policy
+    @UriParam(label = LABEL_NAME, defaultValue = "io.debezium.embedded.spi.OffsetCommitPolicy.PeriodicCommitOffsetPolicy")
+    private String offsetCommitPolicy = OffsetCommitPolicy.PeriodicCommitOffsetPolicy.class.getName();
+    // offset.flush.interval.ms
+    @UriParam(label = LABEL_NAME, defaultValue = "60000")
+    private long offsetFlushIntervalMs = 60000;
+    // offset.commit.timeout.ms
+    @UriParam(label = LABEL_NAME, defaultValue = "5000")
+    private long offsetCommitTimeoutMs = 5000;
+    // internal.key.converter
+    @UriParam(label = LABEL_NAME, defaultValue = "org.apache.kafka.connect.json.JsonConverter")
+    private String internalKeyConverter = JsonConverter.class.getName();
+    // internal.value.converter
+    @UriParam(label = LABEL_NAME, defaultValue = "org.apache.kafka.connect.json.JsonConverter")
+    private String internalValueConverter = JsonConverter.class.getName();
+
+    public EmbeddedDebeziumConfiguration() {
+        ObjectHelper.notNull(configureConnectorClass(), "connectorClass");
+        this.connectorClass = configureConnectorClass();
+    }
+
+    /**
+     * Configure the Debezium connector class that is supported by Debezium
+     *
+     * @return {@link Class}
+     */
+    protected abstract Class<?> configureConnectorClass();
+
+    /**
+     * Create a specific {@link Configuration} for a concrete configuration
+     *
+     * @return {@link Configuration}
+     */
+    protected abstract Configuration createConnectorConfiguration();
+
+    /**
+     * Validate a concrete configuration
+     *
+     * @return {@link ConfigurationValidation}
+     */
+    protected abstract ConfigurationValidation validateConnectorConfiguration();
+
+    /**
+     * Creates a Debezium configuration of type {@link Configuration} in order to be
+     * used in the engine.
+     *
+     * @return {@link Configuration}
+     */
+    public Configuration createDebeziumConfiguration() {
+        final Configuration connectorConfiguration = createConnectorConfiguration();
+
+        ObjectHelper.notNull(connectorConfiguration, "createConnectorConfiguration");
+
+        return Configuration.create().with(createDebeziumEmbeddedEngineConfiguration())
+            .with(createConnectorConfiguration()).build();
+    }
+
+    private Configuration createDebeziumEmbeddedEngineConfiguration() {
+        final Configuration.Builder configBuilder = Configuration.create();
+
+        addPropertyIfNotNull(configBuilder, EmbeddedEngine.ENGINE_NAME, name);
+        addPropertyIfNotNull(configBuilder, EmbeddedEngine.CONNECTOR_CLASS, connectorClass.getName());
+        addPropertyIfNotNull(configBuilder, EmbeddedEngine.OFFSET_STORAGE, offsetStorage);
+        addPropertyIfNotNull(configBuilder, EmbeddedEngine.OFFSET_STORAGE_FILE_FILENAME,
+                             offsetStorageFileName);
+        addPropertyIfNotNull(configBuilder, EmbeddedEngine.OFFSET_STORAGE_KAFKA_TOPIC, offsetStorageTopic);
+        addPropertyIfNotNull(configBuilder, EmbeddedEngine.OFFSET_STORAGE_KAFKA_PARTITIONS,
+                             offsetStoragePartitions);
+        addPropertyIfNotNull(configBuilder, EmbeddedEngine.OFFSET_STORAGE_KAFKA_REPLICATION_FACTOR,
+                             offsetStorageReplicationFactor);
+        addPropertyIfNotNull(configBuilder, EmbeddedEngine.OFFSET_COMMIT_POLICY, offsetCommitPolicy);
+        addPropertyIfNotNull(configBuilder, EmbeddedEngine.OFFSET_FLUSH_INTERVAL_MS, offsetFlushIntervalMs);
+        addPropertyIfNotNull(configBuilder, EmbeddedEngine.OFFSET_COMMIT_TIMEOUT_MS, offsetCommitTimeoutMs);
+
+        if (internalKeyConverter != null && internalValueConverter != null) {
+            configBuilder.with("internal.key.converter", internalKeyConverter);
+            configBuilder.with("internal.value.converter", internalValueConverter);
+        }
+
+        return configBuilder.build();
+    }
+
+    protected static <T> void addPropertyIfNotNull(final Configuration.Builder configBuilder,
+                                                   final Field field, final T value) {
+        if (value != null) {
+            configBuilder.with(field, value);
+        }
+    }
+
+    /**
+     * Validate all configurations defined and return
+     * {@link ConfigurationValidation} instance which contains the validation
+     * results
+     *
+     * @return {@link ConfigurationValidation}
+     */
+    public ConfigurationValidation validateConfiguration() {
+        final ConfigurationValidation embeddedEngineValidation = validateDebeziumEmbeddedEngineConfiguration();
+        // only if embeddedEngineValidation is true, we check the connector validation
+        if (embeddedEngineValidation.isValid()) {
+            final ConfigurationValidation connectorValidation = validateConnectorConfiguration();
+
+            ObjectHelper.notNull(connectorValidation, "validateConnectorConfiguration");
+
+            return connectorValidation;
+        }
+        return embeddedEngineValidation;
+    }
+
+    private ConfigurationValidation validateDebeziumEmbeddedEngineConfiguration() {
+        if (isFieldValueNotSet(name)) {
+            return ConfigurationValidation.notValid("Required field 'name' must be set.");
+        }
+        // check for offsetStorageFileName
+        if (offsetStorage.equals(DebeziumConstants.DEFAULT_OFFSET_STORAGE)
+            && isFieldValueNotSet(offsetStorageFileName)) {
+            return ConfigurationValidation.notValid(String
+                .format("Required field 'offsetStorageFileName' must be set since 'offsetStorage' is set to '%s'",
+                        DebeziumConstants.DEFAULT_OFFSET_STORAGE));
+        }
+        return ConfigurationValidation.valid();
+    }
+
+    protected static boolean isFieldValueNotSet(final Object field) {
+        return ObjectHelper.isEmpty(field);
+    }
+
+    /**
+     * The Debezium connector type that is supported by Camel Debezium component.
+     */
+    public String getConnectorType() {
+        return connectorType;
+    }
+
+    public void setConnectorType(String connectorType) {
+        this.connectorType = connectorType;
+    }
+
+    /**
+     * The name of the Java class for the connector
+     */
+    public Class<?> getConnectorClass() {
+        return connectorClass;
+    }
+
+    public void setConnectorClass(Class<?> connectorClass) {
+        this.connectorClass = connectorClass;
+    }
+
+    /**
+     * Unique name for the connector. Attempting to register again with the same
+     * name will fail.
+     */
+    public String getName() {
+        return name;
+    }
+
+    public void setName(String name) {
+        this.name = name;
+    }
+
+    /**
+     * The name of the Java class that is responsible for persistence of connector
+     * offsets.
+     */
+    public String getOffsetStorage() {
+        return offsetStorage;
+    }
+
+    public void setOffsetStorage(String offsetStorage) {
+        this.offsetStorage = offsetStorage;
+    }
+
+    /**
+     * Path to file where offsets are to be stored. Required when offset.storage is
+     * set to the FileOffsetBackingStore
+     */
+    public String getOffsetStorageFileName() {
+        return offsetStorageFileName;
+    }
+
+    public void setOffsetStorageFileName(String offsetStorageFileName) {
+        this.offsetStorageFileName = offsetStorageFileName;
+    }
+
+    /**
+     * The name of the Kafka topic where offsets are to be stored. Required when
+     * offset.storage is set to the KafkaOffsetBackingStore.
+     */
+    public String getOffsetStorageTopic() {
+        return offsetStorageTopic;
+    }
+
+    public void setOffsetStorageTopic(String offsetStorageTopic) {
+        this.offsetStorageTopic = offsetStorageTopic;
+    }
+
+    /**
+     * Replication factor used when creating the offset storage topic. Required when
+     * offset.storage is set to the KafkaOffsetBackingStore
+     */
+    public int getOffsetStorageReplicationFactor() {
+        return offsetStorageReplicationFactor;
+    }
+
+    public void setOffsetStorageReplicationFactor(int offsetStorageReplicationFactor) {
+        this.offsetStorageReplicationFactor = offsetStorageReplicationFactor;
+    }
+
+    /**
+     * The name of the Java class of the commit policy. It defines when offsets
+     * commit has to be triggered based on the number of events processed and the
+     * time elapsed since the last commit. This class must implement the interface
+     * <…​>.OffsetCommitPolicy. The default is a periodic commit policy based upon
+     * time intervals.
+     */
+    public String getOffsetCommitPolicy() {
+        return offsetCommitPolicy;
+    }
+
+    public void setOffsetCommitPolicy(String offsetCommitPolicy) {
+        this.offsetCommitPolicy = offsetCommitPolicy;
+    }
+
+    /**
+     * Interval at which to try committing offsets. The default is 1 minute.
+     */
+    public long getOffsetFlushIntervalMs() {
+        return offsetFlushIntervalMs;
+    }
+
+    public void setOffsetFlushIntervalMs(long offsetFlushIntervalMs) {
+        this.offsetFlushIntervalMs = offsetFlushIntervalMs;
+    }
+
+    /**
+     * Maximum number of milliseconds to wait for records to flush and partition
+     * offset data to be committed to offset storage before cancelling the process
+     * and restoring the offset data to be committed in a future attempt. The
+     * default is 5 seconds.
+     */
+    public long getOffsetCommitTimeoutMs() {
+        return offsetCommitTimeoutMs;
+    }
+
+    public void setOffsetCommitTimeoutMs(long offsetCommitTimeoutMs) {
+        this.offsetCommitTimeoutMs = offsetCommitTimeoutMs;
+    }
+
+    /**
+     * The number of partitions used when creating the offset storage topic.
+     * Required when offset.storage is set to the <…​>.KafkaOffsetBackingStore.
+     */
+    public int getOffsetStoragePartitions() {
+        return offsetStoragePartitions;
+    }
+
+    public void setOffsetStoragePartitions(int offsetStoragePartitions) {
+        this.offsetStoragePartitions = offsetStoragePartitions;
+    }
+
+    /**
+     * The Converter class that should be used to serialize and deserialize key data
+     * for offsets. The default is JSON converter.
+     */
+    public String getInternalKeyConverter() {
+        return internalKeyConverter;
+    }
+
+    public void setInternalKeyConverter(String internalKeyConverter) {
+        this.internalKeyConverter = internalKeyConverter;
+    }
+
+    /**
+     * The Converter class that should be used to serialize and deserialize value
+     * data for offsets. The default is JSON converter.
+     */
+    public String getInternalValueConverter() {
+        return internalValueConverter;
+    }
+
+    public void setInternalValueConverter(String internalValueConverter) {
+        this.internalValueConverter = internalValueConverter;
+    }
+}
diff --git a/components/camel-debezium/src/main/java/org/apache/camel/component/debezium/configuration/MySqlConnectorEmbeddedDebeziumConfiguration.java b/components/camel-debezium/src/main/java/org/apache/camel/component/debezium/configuration/MySqlConnectorEmbeddedDebeziumConfiguration.java
new file mode 100644
index 0000000..72ff6f8
--- /dev/null
+++ b/components/camel-debezium/src/main/java/org/apache/camel/component/debezium/configuration/MySqlConnectorEmbeddedDebeziumConfiguration.java
@@ -0,0 +1,663 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.debezium.configuration;
+
+import io.debezium.config.CommonConnectorConfig;
+import io.debezium.config.Configuration;
+import io.debezium.connector.mysql.MySqlConnector;
+import io.debezium.connector.mysql.MySqlConnectorConfig;
+import io.debezium.relational.history.FileDatabaseHistory;
+import io.debezium.relational.history.KafkaDatabaseHistory;
+
+import org.apache.camel.component.debezium.DebeziumConstants;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriParams;
+
+@UriParams
+public class MySqlConnectorEmbeddedDebeziumConfiguration extends EmbeddedDebeziumConfiguration {
+
+    private static final String LABEL_NAME = "consumer,mysql";
+
+    // database.hostname
+    @UriParam(label = LABEL_NAME)
+    @Metadata(required = true)
+    private String databaseHostName;
+    // database.port
+    @UriParam(label = LABEL_NAME, defaultValue = "3306")
+    private int databasePort = 3306;
+    // database.user
+    @UriParam(label = LABEL_NAME)
+    @Metadata(required = true)
+    private String databaseUser;
+    // database.password
+    @UriParam(label = LABEL_NAME)
+    @Metadata(required = true, secret = true)
+    private String databasePassword;
+    // database.server.name
+    @UriParam(label = LABEL_NAME)
+    @Metadata(required = true)
+    private String databaseServerName;
+    // database.server.id
+    @UriParam(label = LABEL_NAME)
+    @Metadata(required = true)
+    private int databaseServerId;
+    // database.history
+    @UriParam(label = LABEL_NAME, defaultValue = "io.debezium.relational.history.FileDatabaseHistory")
+    private String databaseHistory = DebeziumConstants.DEFAULT_DATABASE_HISTORY;
+    // database.history.filename
+    @UriParam(label = LABEL_NAME)
+    private String databaseHistoryFileName;
+    // database.history.kafka.topic
+    @UriParam(label = LABEL_NAME)
+    private String databaseHistoryKafkaTopic;
+    // database.history​.kafka.bootstrap.servers
+    @UriParam(label = LABEL_NAME)
+    private String databaseHistoryKafkaBootstrapServers;
+    // database.whitelist
+    @UriParam(label = LABEL_NAME)
+    private String databaseWhitelist;
+    // database.blacklist
+    @UriParam(label = LABEL_NAME)
+    private String databaseBlacklist;
+    // table.whitelist
+    @UriParam(label = LABEL_NAME)
+    private String tableWhitelist;
+    // table.blacklist
+    @UriParam(label = LABEL_NAME)
+    private String tableBlacklist;
+    // column.blacklist
+    @UriParam(label = LABEL_NAME)
+    private String columnBlacklist;
+    // time.precision.mode
+    @UriParam(label = LABEL_NAME, defaultValue = "adaptive_time_microseconds")
+    private String timePrecisionMode = "adaptive_time_microseconds";
+    // decimal.handling.mode
+    @UriParam(label = LABEL_NAME, defaultValue = "precise")
+    private String decimalHandlingMode = "precise";
+    // bigint.unsigned.handling.mode
+    @UriParam(label = LABEL_NAME, defaultValue = "long")
+    private String bigintUnsignedHandlingMode = "long";
+    // include.schema.changes
+    @UriParam(label = LABEL_NAME, defaultValue = "true")
+    private boolean includeSchemaChanges = true;
+    // include.query
+    @UriParam(label = LABEL_NAME, defaultValue = "false")
+    private boolean includeQuery;
+    // event.deserialization​.failure.handling.mode
+    @UriParam(label = LABEL_NAME, defaultValue = "fail")
+    private String eventDeserializationFailureHandlingMode = "fail";
+    // inconsistent.schema.handling.mode
+    @UriParam(label = LABEL_NAME, defaultValue = "fail")
+    private String inconsistentSchemaHandlingMode = "fail";
+    // max.queue.size
+    @UriParam(label = LABEL_NAME, defaultValue = "8192")
+    private int maxQueueSize = 8192;
+    // max.batch.size
+    @UriParam(label = LABEL_NAME, defaultValue = "2048")
+    private int maxBatchSize = 2048;
+    // poll.interval.ms
+    @UriParam(label = LABEL_NAME, defaultValue = "1000")
+    private long pollIntervalMs = 1000;
+    // connect.timeout.ms
+    @UriParam(label = LABEL_NAME, defaultValue = "30000")
+    private long connectTimeoutMs = 30000;
+    // gtid.source.includes
+    @UriParam(label = LABEL_NAME)
+    private String gtidSourceIncludes;
+    // gtid.source.excludes
+    @UriParam(label = LABEL_NAME)
+    private String gtidSourceExcludes;
+    // gtid.new.channel.position
+    @UriParam(label = LABEL_NAME, defaultValue = "latest")
+    private String gtidNewChannelPosition = "latest";
+    // tombstones.on.delete
+    @UriParam(label = LABEL_NAME, defaultValue = "false")
+    private boolean tombstonesOnDelete;
+    // ddl.parser.mode
+    @UriParam(label = LABEL_NAME, defaultValue = "antlr")
+    private String ddlParserMode = "antlr";
+
+    @Override
+    protected Configuration createConnectorConfiguration() {
+        return createDebeziumMySqlConnectorConfiguration();
+    }
+
+    private Configuration createDebeziumMySqlConnectorConfiguration() {
+        final Configuration.Builder configBuilder = Configuration.create();
+
+        addPropertyIfNotNull(configBuilder, MySqlConnectorConfig.HOSTNAME, databaseHostName);
+        addPropertyIfNotNull(configBuilder, MySqlConnectorConfig.PORT, databasePort);
+        addPropertyIfNotNull(configBuilder, MySqlConnectorConfig.USER, databaseUser);
+        addPropertyIfNotNull(configBuilder, MySqlConnectorConfig.PASSWORD, databasePassword);
+        addPropertyIfNotNull(configBuilder, MySqlConnectorConfig.SERVER_ID, databaseServerId);
+        addPropertyIfNotNull(configBuilder, MySqlConnectorConfig.SERVER_NAME, databaseServerName);
+        addPropertyIfNotNull(configBuilder, MySqlConnectorConfig.DATABASE_WHITELIST, databaseWhitelist);
+        addPropertyIfNotNull(configBuilder, MySqlConnectorConfig.DATABASE_HISTORY, databaseHistory);
+        addPropertyIfNotNull(configBuilder, FileDatabaseHistory.FILE_PATH, databaseHistoryFileName);
+        addPropertyIfNotNull(configBuilder, KafkaDatabaseHistory.TOPIC, databaseHistoryKafkaTopic);
+        addPropertyIfNotNull(configBuilder, KafkaDatabaseHistory.BOOTSTRAP_SERVERS,
+                             databaseHistoryKafkaBootstrapServers);
+        addPropertyIfNotNull(configBuilder, MySqlConnectorConfig.DATABASE_WHITELIST, databaseWhitelist);
+        addPropertyIfNotNull(configBuilder, MySqlConnectorConfig.DATABASE_BLACKLIST, databaseBlacklist);
+        addPropertyIfNotNull(configBuilder, MySqlConnectorConfig.TABLE_WHITELIST, tableWhitelist);
+        addPropertyIfNotNull(configBuilder, MySqlConnectorConfig.TABLE_BLACKLIST, tableBlacklist);
+        addPropertyIfNotNull(configBuilder, MySqlConnectorConfig.COLUMN_BLACKLIST, columnBlacklist);
+        addPropertyIfNotNull(configBuilder, MySqlConnectorConfig.TIME_PRECISION_MODE, timePrecisionMode);
+        addPropertyIfNotNull(configBuilder, MySqlConnectorConfig.DECIMAL_HANDLING_MODE, decimalHandlingMode);
+        addPropertyIfNotNull(configBuilder, MySqlConnectorConfig.BIGINT_UNSIGNED_HANDLING_MODE,
+                             bigintUnsignedHandlingMode);
+        addPropertyIfNotNull(configBuilder, MySqlConnectorConfig.INCLUDE_SCHEMA_CHANGES,
+                             includeSchemaChanges);
+        addPropertyIfNotNull(configBuilder, MySqlConnectorConfig.INCLUDE_SQL_QUERY, includeQuery);
+        addPropertyIfNotNull(configBuilder, MySqlConnectorConfig.EVENT_DESERIALIZATION_FAILURE_HANDLING_MODE,
+                             eventDeserializationFailureHandlingMode);
+        addPropertyIfNotNull(configBuilder, MySqlConnectorConfig.INCONSISTENT_SCHEMA_HANDLING_MODE,
+                             inconsistentSchemaHandlingMode);
+        addPropertyIfNotNull(configBuilder, CommonConnectorConfig.MAX_QUEUE_SIZE, maxQueueSize);
+        addPropertyIfNotNull(configBuilder, MySqlConnectorConfig.MAX_BATCH_SIZE, maxBatchSize);
+        addPropertyIfNotNull(configBuilder, MySqlConnectorConfig.POLL_INTERVAL_MS, pollIntervalMs);
+        addPropertyIfNotNull(configBuilder, MySqlConnectorConfig.CONNECTION_TIMEOUT_MS, connectTimeoutMs);
+        addPropertyIfNotNull(configBuilder, MySqlConnectorConfig.GTID_SOURCE_INCLUDES, gtidSourceIncludes);
+        addPropertyIfNotNull(configBuilder, MySqlConnectorConfig.GTID_SOURCE_EXCLUDES, gtidSourceExcludes);
+        addPropertyIfNotNull(configBuilder, MySqlConnectorConfig.GTID_NEW_CHANNEL_POSITION,
+                             gtidNewChannelPosition);
+        addPropertyIfNotNull(configBuilder, MySqlConnectorConfig.TOMBSTONES_ON_DELETE, tombstonesOnDelete);
+        addPropertyIfNotNull(configBuilder, MySqlConnectorConfig.DDL_PARSER_MODE, ddlParserMode);
+
+        return configBuilder.build();
+    }
+
+    @Override
+    protected ConfigurationValidation validateConnectorConfiguration() {
+        return validateMySqlConnectorConfiguration();
+    }
+
+    private ConfigurationValidation validateMySqlConnectorConfiguration() {
+        if (isFieldValueNotSet(databasePassword)) {
+            return ConfigurationValidation.notValid("Required field 'databasePassword' must be set.");
+        }
+        if (isFieldValueNotSet(databaseServerId)) {
+            return ConfigurationValidation.notValid("Required field 'databaseServerId' must be set.");
+        }
+        if (isFieldValueNotSet(databaseServerName)) {
+            return ConfigurationValidation.notValid("Required field 'databaseServerName' must be set.");
+        }
+        // check for databaseHistory
+        if (databaseHistory.equals(DebeziumConstants.DEFAULT_DATABASE_HISTORY)
+            && isFieldValueNotSet(databaseHistoryFileName)) {
+            return ConfigurationValidation.notValid(String
+                .format("Required field 'databaseHistoryFileName' must be set since 'databaseHistory' is set to '%s'",
+                        DebeziumConstants.DEFAULT_DATABASE_HISTORY));
+        }
+        return ConfigurationValidation.valid();
+    }
+
+    @Override
+    protected Class<?> configureConnectorClass() {
+        return MySqlConnector.class;
+    }
+
+    /**
+     * IP address or hostname of the target database server.
+     */
+    public String getDatabaseHostName() {
+        return databaseHostName;
+    }
+
+    public void setDatabaseHostName(String databaseHostName) {
+        this.databaseHostName = databaseHostName;
+    }
+
+    /**
+     * Integer port number of the database server.
+     */
+    public int getDatabasePort() {
+        return databasePort;
+    }
+
+    public void setDatabasePort(int databasePort) {
+        this.databasePort = databasePort;
+    }
+
+    /**
+     * Name of the MySQL database to use when connecting to the database server.
+     */
+    public String getDatabaseUser() {
+        return databaseUser;
+    }
+
+    public void setDatabaseUser(String databaseUser) {
+        this.databaseUser = databaseUser;
+    }
+
+    /**
+     * Password to use when connecting to the database server.
+     */
+    public String getDatabasePassword() {
+        return databasePassword;
+    }
+
+    public void setDatabasePassword(String databasePassword) {
+        this.databasePassword = databasePassword;
+    }
+
+    /**
+     * Logical name that identifies and provides a namespace for the particular
+     * database server/cluster being monitored.
+     */
+    public String getDatabaseServerName() {
+        return databaseServerName;
+    }
+
+    public void setDatabaseServerName(String databaseServerName) {
+        this.databaseServerName = databaseServerName;
+    }
+
+    /**
+     * A numeric ID of this database client, which must be unique across all
+     * currently-running database processes in the database cluster. This connector
+     * joins the database cluster as another server (with this unique ID) so it can
+     * read the binlog.
+     */
+    public int getDatabaseServerId() {
+        return databaseServerId;
+    }
+
+    public void setDatabaseServerId(int databaseServerId) {
+        this.databaseServerId = databaseServerId;
+    }
+
+    /**
+     * The full name of the Kafka topic where the connector will store the database
+     * schema history.
+     */
+    public String getDatabaseHistoryKafkaTopic() {
+        return databaseHistoryKafkaTopic;
+    }
+
+    public void setDatabaseHistoryKafkaTopic(String databaseHistoryKafkaTopic) {
+        this.databaseHistoryKafkaTopic = databaseHistoryKafkaTopic;
+    }
+
+    /**
+     * An optional comma-separated list of regular expressions that match database
+     * names to be monitored; any database name not included in the whitelist will
+     * be excluded from monitoring. By default all databases will be monitored. May
+     * not be used with database.blacklist.
+     */
+    public String getDatabaseWhitelist() {
+        return databaseWhitelist;
+    }
+
+    public void setDatabaseWhitelist(String databaseWhitelist) {
+        this.databaseWhitelist = databaseWhitelist;
+    }
+
+    /**
+     * The name of the DatabaseHistory class that should be used to store and
+     * recover database schema changes.
+     */
+    public String getDatabaseHistory() {
+        return databaseHistory;
+    }
+
+    public void setDatabaseHistory(String databaseHistory) {
+        this.databaseHistory = databaseHistory;
+    }
+
+    /**
+     * The path to the file that will be used to record the database history
+     */
+    public String getDatabaseHistoryFileName() {
+        return databaseHistoryFileName;
+    }
+
+    public void setDatabaseHistoryFileName(String databaseHistoryFileName) {
+        this.databaseHistoryFileName = databaseHistoryFileName;
+    }
+
+    /**
+     * The full name of the Kafka topic where the connector will store the database
+     * schema history.
+     */
+    public String getDatabaseHistoryKafkaBootstrapServers() {
+        return databaseHistoryKafkaBootstrapServers;
+    }
+
+    public void setDatabaseHistoryKafkaBootstrapServers(String databaseHistoryKafkaBootstrapServers) {
+        this.databaseHistoryKafkaBootstrapServers = databaseHistoryKafkaBootstrapServers;
+    }
+
+    /**
+     * An optional comma-separated list of regular expressions that match database
+     * names to be excluded from monitoring; any database name not included in the
+     * blacklist will be monitored. May not be used with database.whitelist.
+     */
+
+    public String getDatabaseBlacklist() {
+        return databaseBlacklist;
+    }
+
+    public void setDatabaseBlacklist(String databaseBlacklist) {
+        this.databaseBlacklist = databaseBlacklist;
+    }
+
+    /**
+     * An optional comma-separated list of regular expressions that match
+     * fully-qualified table identifiers for tables to be monitored; any table not
+     * included in the whitelist will be excluded from monitoring. Each identifier
+     * is of the form databaseName.tableName. By default the connector will monitor
+     * every non-system table in each monitored database. May not be used with
+     * table.blacklist.
+     */
+    public String getTableWhitelist() {
+        return tableWhitelist;
+    }
+
+    public void setTableWhitelist(String tableWhitelist) {
+        this.tableWhitelist = tableWhitelist;
+    }
+
+    /**
+     * An optional comma-separated list of regular expressions that match
+     * fully-qualified table identifiers for tables to be excluded from monitoring;
+     * any table not included in the blacklist will be monitored. Each identifier is
+     * of the form databaseName.tableName. May not be used with table.whitelist.
+     */
+    public String getTableBlacklist() {
+        return tableBlacklist;
+    }
+
+    public void setTableBlacklist(String tableBlacklist) {
+        this.tableBlacklist = tableBlacklist;
+    }
+
+    /**
+     * An optional comma-separated list of regular expressions that match the
+     * fully-qualified names of columns that should be excluded from change event
+     * message values. Fully-qualified names for columns are of the form
+     * databaseName.tableName.columnName, or
+     * databaseName.schemaName.tableName.columnName.
+     */
+    public String getColumnBlacklist() {
+        return columnBlacklist;
+    }
+
+    public void setColumnBlacklist(String columnBlacklist) {
+        this.columnBlacklist = columnBlacklist;
+    }
+
+    /**
+     * Time, date, and timestamps can be represented with different kinds of
+     * precision, including: adaptive_time_microseconds (the default) captures the
+     * date, datetime and timestamp values exactly as in the database using either
+     * millisecond, microsecond, or nanosecond precision values based on the
+     * database column’s type, with the exception of TIME type fields, which are
+     * always captured as microseconds; adaptive (deprecated) captures the time and
+     * timestamp values exactly as in the database using either millisecond,
+     * microsecond, or nanosecond precision values based on the database column’s
+     * type; or connect always represents time and timestamp values using Kafka
+     * Connect’s built-in representations for Time, Date, and Timestamp, which uses
+     * millisecond precision regardless of the database columns' precision. See
+     * Temporal values.
+     */
+    public String getTimePrecisionMode() {
+        return timePrecisionMode;
+    }
+
+    public void setTimePrecisionMode(String timePrecisionMode) {
+        this.timePrecisionMode = timePrecisionMode;
+    }
+
+    /**
+     * Specifies how the connector should handle values for DECIMAL and NUMERIC
+     * columns: precise (the default) represents them precisely using
+     * java.math.BigDecimal values represented in change events in a binary form; or
+     * double represents them using double values, which may result in a loss of
+     * precision but will be far easier to use. string option encodes values as
+     * formatted string which is easy to consume but a semantic information about
+     * the real type is lost. See Decimal values.
+     */
+    public String getDecimalHandlingMode() {
+        return decimalHandlingMode;
+    }
+
+    public void setDecimalHandlingMode(String decimalHandlingMode) {
+        this.decimalHandlingMode = decimalHandlingMode;
+    }
+
+    /**
+     * Specifies how BIGINT UNSIGNED columns should be represented in change events,
+     * including: precise uses java.math.BigDecimal to represent values, which are
+     * encoded in the change events using a binary representation and Kafka
+     * Connect’s org.apache.kafka.connect.data.Decimal type; long (the default)
+     * represents values using Java’s long, which may not offer the precision but
+     * will be far easier to use in consumers. long is usually the preferable
+     * setting. Only when working with values larger than 2^63, the precise setting
+     * should be used as those values can’t be conveyed using long. See Data types.
+     */
+    public String getBigintUnsignedHandlingMode() {
+        return bigintUnsignedHandlingMode;
+    }
+
+    public void setBigintUnsignedHandlingMode(String bigintUnsignedHandlingMode) {
+        this.bigintUnsignedHandlingMode = bigintUnsignedHandlingMode;
+    }
+
+    /**
+     * Boolean value that specifies whether the connector should publish changes in
+     * the database schema to a Kafka topic with the same name as the database
+     * server ID. Each schema change will be recorded using a key that contains the
+     * database name and whose value includes the DDL statement(s). This is
+     * independent of how the connector internally records database history. The
+     * default is true.
+     */
+    public boolean isIncludeSchemaChanges() {
+        return includeSchemaChanges;
+    }
+
+    public void setIncludeSchemaChanges(boolean includeSchemaChanges) {
+        this.includeSchemaChanges = includeSchemaChanges;
+    }
+
+    /**
+     * Boolean value that specifies whether the connector should include the
+     * original SQL query that generated the change event. Note: This option
+     * requires MySQL be configured with the binlog_rows_query_log_events option set
+     * to ON. Query will not be present for events generated from the snapshot
+     * process. Warning: Enabling this option may expose tables or fields explicitly
+     * blacklisted or masked by including the original SQL statement in the change
+     * event. For this reason this option is defaulted to 'false'.
+     */
+    public boolean isIncludeQuery() {
+        return includeQuery;
+    }
+
+    public void setIncludeQuery(boolean includeQuery) {
+        this.includeQuery = includeQuery;
+    }
+
+    /**
+     * Specifies how the connector should react to exceptions during deserialization
+     * of binlog events. fail will propagate the exception (indicating the
+     * problematic event and its binlog offset), causing the connector to stop. warn
+     * will cause the problematic event to be skipped and the problematic event and
+     * its binlog offset to be logged (make sure that the logger is set to the WARN
+     * or ERROR level). ignore will cause problematic event will be skipped.
+     */
+    public String getEventDeserializationFailureHandlingMode() {
+        return eventDeserializationFailureHandlingMode;
+    }
+
+    public void setEventDeserializationFailureHandlingMode(String eventDeserializationFailureHandlingMode) {
+        this.eventDeserializationFailureHandlingMode = eventDeserializationFailureHandlingMode;
+    }
+
+    /**
+     * Specifies how the connector should react to binlog events that relate to
+     * tables that are not present in internal schema representation (i.e. internal
+     * representation is not consistent with database) fail will throw an exception
+     * (indicating the problematic event and its binlog offset), causing the
+     * connector to stop. warn will cause the problematic event to be skipped and
+     * the problematic event and its binlog offset to be logged (make sure that the
+     * logger is set to the WARN or ERROR level). ignore will cause the problematic
+     * event to be skipped.
+     */
+    public String getInconsistentSchemaHandlingMode() {
+        return inconsistentSchemaHandlingMode;
+    }
+
+    public void setInconsistentSchemaHandlingMode(String inconsistentSchemaHandlingMode) {
+        this.inconsistentSchemaHandlingMode = inconsistentSchemaHandlingMode;
+    }
+
+    /**
+     * Positive integer value that specifies the maximum size of the blocking queue
+     * into which change events read from the database log are placed before they
+     * are written to Kafka. This queue can provide backpressure to the binlog
+     * reader when, for example, writes to Kafka are slower or if Kafka is not
+     * available. Events that appear in the queue are not included in the offsets
+     * periodically recorded by this connector. Defaults to 8192, and should always
+     * be larger than the maximum batch size specified in the max.batch.size
+     * property.
+     */
+    public int getMaxQueueSize() {
+        return maxQueueSize;
+    }
+
+    public void setMaxQueueSize(int maxQueueSize) {
+        this.maxQueueSize = maxQueueSize;
+    }
+
+    /**
+     * Positive integer value that specifies the maximum size of each batch of
+     * events that should be processed during each iteration of this connector.
+     * Defaults to 2048.
+     */
+    public int getMaxBatchSize() {
+        return maxBatchSize;
+    }
+
+    public void setMaxBatchSize(int maxBatchSize) {
+        this.maxBatchSize = maxBatchSize;
+    }
+
+    /**
+     * Positive integer value that specifies the number of milliseconds the
+     * connector should wait during each iteration for new change events to appear.
+     * Defaults to 1000 milliseconds, or 1 second.
+     */
+    public long getPollIntervalMs() {
+        return pollIntervalMs;
+    }
+
+    public void setPollIntervalMs(long pollIntervalMs) {
+        this.pollIntervalMs = pollIntervalMs;
+    }
+
+    /**
+     * A positive integer value that specifies the maximum time in milliseconds this
+     * connector should wait after trying to connect to the MySQL database server
+     * before timing out. Defaults to 30 seconds.
+     */
+    public long getConnectTimeoutMs() {
+        return connectTimeoutMs;
+    }
+
+    public void setConnectTimeoutMs(long connectTimeoutMs) {
+        this.connectTimeoutMs = connectTimeoutMs;
+    }
+
+    /**
+     * A comma-separated list of regular expressions that match source UUIDs in the
+     * GTID set used to find the binlog position in the MySQL server. Only the GTID
+     * ranges that have sources matching one of these include patterns will be used.
+     * May not be used with gtid.source.excludes.
+     */
+    public String getGtidSourceIncludes() {
+        return gtidSourceIncludes;
+    }
+
+    public void setGtidSourceIncludes(String gtidSourceIncludes) {
+        this.gtidSourceIncludes = gtidSourceIncludes;
+    }
+
+    /**
+     * A comma-separated list of regular expressions that match source UUIDs in the
+     * GTID set used to find the binlog position in the MySQL server. Only the GTID
+     * ranges that have sources matching none of these exclude patterns will be
+     * used. May not be used with gtid.source.includes.
+     */
+    public String getGtidSourceExcludes() {
+        return gtidSourceExcludes;
+    }
+
+    public void setGtidSourceExcludes(String gtidSourceExcludes) {
+        this.gtidSourceExcludes = gtidSourceExcludes;
+    }
+
+    /**
+     * When set to latest, when the connector sees a new GTID channel, it will start
+     * consuming from the last executed transaction in that GTID channel. If set to
+     * earliest, the connector starts reading that channel from the first available
+     * (not purged) GTID position. earliest is useful when you have a active-passive
+     * MySQL setup where Debezium is connected to master, in this case during
+     * failover the slave with new UUID (and GTID channel) starts receiving writes
+     * before Debezium is connected. These writes would be lost when using latest.
+     */
+    public String getGtidNewChannelPosition() {
+        return gtidNewChannelPosition;
+    }
+
+    public void setGtidNewChannelPosition(String gtidNewChannelPosition) {
+        this.gtidNewChannelPosition = gtidNewChannelPosition;
+    }
+
+    /**
+     * Controls whether a tombstone event should be generated after a delete event.
+     * When true the delete operations are represented by a delete event and a
+     * subsequent tombstone event. When false only a delete event is sent. Emitting
+     * the tombstone event (the default behavior) allows Kafka to completely delete
+     * all events pertaining to the given key once the source record got deleted.
+     */
+    public boolean isTombstonesOnDelete() {
+        return tombstonesOnDelete;
+    }
+
+    public void setTombstonesOnDelete(boolean tombstonesOnDelete) {
+        this.tombstonesOnDelete = tombstonesOnDelete;
+    }
+
+    /**
+     * Controls which parser should be used for parsing DDL statements when building
+     * up the meta-model of the captured database structure. Can be one of legacy
+     * (for the legacy hand-written parser implementation) or antlr (for new Antlr
+     * based implementation introduced in Debezium 0.8.0). While the legacy parser
+     * remains the default for Debezium 0.8.x, please try out the new implementation
+     * and report back any issues you encounter. The new parser is the default as of
+     * 0.9. The legacy parser as well as this configuration property has been
+     * removed as of 0.10.
+     */
+    public String getDdlParserMode() {
+        return ddlParserMode;
+    }
+
+    public void setDdlParserMode(String ddlParserMode) {
+        this.ddlParserMode = ddlParserMode;
+    }
+}
diff --git a/components/camel-debezium/src/test/java/org/apache/camel/component/debezium/DebeziumComponentTest.java b/components/camel-debezium/src/test/java/org/apache/camel/component/debezium/DebeziumComponentTest.java
new file mode 100644
index 0000000..ece4edb
--- /dev/null
+++ b/components/camel-debezium/src/test/java/org/apache/camel/component/debezium/DebeziumComponentTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.debezium;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration;
+import org.apache.camel.component.debezium.configuration.TestEmbeddedDebeziumConfiguration;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+public class DebeziumComponentTest {
+
+    @Test
+    public void testIfMySqlEndpointCreatedWithConfig() throws Exception {
+        final Map<String, Object> params = new HashMap<>();
+        params.put("name", "test_name");
+        params.put("offsetStorageFileName", "/offset_test_file");
+        params.put("databaseHostName", "localhost");
+        params.put("databaseUser", "dbz");
+        params.put("databasePassword", "pwd");
+        params.put("databaseServerName", "test");
+        params.put("databaseServerId", 1234);
+        params.put("databaseHistoryFileName", "/db_history_file_test");
+
+        final String remaining = "mysql";
+        final String uri = "debezium:mysql?name=test_name&offsetStorageFileName=/test&"
+                           + "databaseHostName=localhost&databaseServerId=1234&databaseUser=dbz&databasePassword=pwd&"
+                           + "databaseServerName=test&databaseHistoryFileName=/test";
+
+        final DebeziumComponent debeziumComponent = new DebeziumComponent(new DefaultCamelContext());
+        final DebeziumEndpoint debeziumEndpoint = debeziumComponent.createEndpoint(uri, remaining, params);
+
+        assertNotNull(debeziumEndpoint);
+
+        // test for config
+        final MySqlConnectorEmbeddedDebeziumConfiguration configuration = (MySqlConnectorEmbeddedDebeziumConfiguration)debeziumEndpoint
+            .getConfiguration();
+        assertEquals("test_name", configuration.getName());
+        assertEquals("/offset_test_file", configuration.getOffsetStorageFileName());
+        assertEquals("localhost", configuration.getDatabaseHostName());
+        assertEquals("dbz", configuration.getDatabaseUser());
+        assertEquals("pwd", configuration.getDatabasePassword());
+        assertEquals("test", configuration.getDatabaseServerName());
+        assertEquals(1234, configuration.getDatabaseServerId());
+        assertEquals("/db_history_file_test", configuration.getDatabaseHistoryFileName());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testIfHandlesInvalidConnectorType() throws Exception {
+        final Map<String, Object> params = new HashMap<>();
+        params.put("name", "test_name");
+        params.put("offsetStorageFileName", "/offset_test_file");
+        params.put("databaseHostName", "localhost");
+        params.put("databaseUser", "dbz");
+        params.put("databasePassword", "pwd");
+        params.put("databaseServerName", "test");
+        params.put("databaseServerId", 1234);
+        params.put("databaseHistoryFileName", "/db_history_file_test");
+
+        final String remaining = "dummy";
+        final String uri = "debezium:dummy?name=test_name&offsetStorageFileName=/test&"
+                           + "databaseHostName=localhost&databaseServerId=1234&databaseUser=dbz&databasePassword=pwd&"
+                           + "databaseServerName=test&databaseHistoryFileName=/test";
+
+        final DebeziumComponent debeziumComponent = new DebeziumComponent(new DefaultCamelContext());
+        final DebeziumEndpoint debeziumEndpoint = debeziumComponent.createEndpoint(uri, remaining, params);
+    }
+
+    @Test
+    public void testIfCreatesComponentWithExternalConfiguration() throws Exception {
+        final TestEmbeddedDebeziumConfiguration configuration = new TestEmbeddedDebeziumConfiguration();
+        configuration.setName("test_config");
+        configuration.setOffsetStorageReplicationFactor(2);
+        configuration.setTestField("test_field");
+        configuration.setOffsetStorageFileName("/file");
+
+        // the component should ignore the type in the remaining as long you set valid
+        // configurations
+        final String remaining = "dummy";
+        final String uri = "debezium:dummy";
+        final DebeziumComponent debeziumComponent = new DebeziumComponent(new DefaultCamelContext());
+
+        // set configurations
+        debeziumComponent.setConfiguration(configuration);
+
+        final DebeziumEndpoint debeziumEndpoint = debeziumComponent.createEndpoint(uri, remaining,
+                                                                                   Collections.emptyMap());
+
+        assertNotNull(debeziumEndpoint);
+
+        // assert configurations
+        final TestEmbeddedDebeziumConfiguration actualConfigurations = (TestEmbeddedDebeziumConfiguration)debeziumEndpoint
+            .getConfiguration();
+        assertNotNull(actualConfigurations);
+        assertEquals(configuration.getTestField(), actualConfigurations.getTestField());
+        assertEquals(configuration.getOffsetStorageReplicationFactor(),
+                     actualConfigurations.getOffsetStorageReplicationFactor());
+        assertEquals(configuration.getName(), actualConfigurations.getName());
+        assertEquals(configuration.getConnectorClass(), actualConfigurations.getConnectorClass());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testIfItHandlesNullExternalConfigurations() throws Exception {
+        final String remaining = "";
+        final String uri = "debezium:";
+        final DebeziumComponent debeziumComponent = new DebeziumComponent(new DefaultCamelContext());
+
+        // set configurations
+        debeziumComponent.setConfiguration(null);
+
+        final DebeziumEndpoint debeziumEndpoint = debeziumComponent.createEndpoint(uri, remaining,
+                                                                                   Collections.emptyMap());
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testIfItHandlesNullExternalConfigurationsWithValidUri() throws Exception {
+        final String remaining = "dummy";
+        final String uri = "debezium:dummy";
+        final DebeziumComponent debeziumComponent = new DebeziumComponent(new DefaultCamelContext());
+
+        // set configurations
+        debeziumComponent.setConfiguration(null);
+
+        final DebeziumEndpoint debeziumEndpoint = debeziumComponent.createEndpoint(uri, remaining,
+                                                                                   Collections.emptyMap());
+    }
+}
diff --git a/components/camel-debezium/src/test/java/org/apache/camel/component/debezium/DebeziumConnectorTypesTest.java b/components/camel-debezium/src/test/java/org/apache/camel/component/debezium/DebeziumConnectorTypesTest.java
new file mode 100644
index 0000000..af1f377
--- /dev/null
+++ b/components/camel-debezium/src/test/java/org/apache/camel/component/debezium/DebeziumConnectorTypesTest.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.debezium;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class DebeziumConnectorTypesTest {
+
+    @Test
+    public void testNormalParseForTheType() {
+        final DebeziumConnectorTypes mysql = DebeziumConnectorTypes.fromString("mysql");
+        final DebeziumConnectorTypes mysqlUpper = DebeziumConnectorTypes.fromString("MYsQL");
+
+        assertEquals("Expecting MySql type", DebeziumConnectorTypes.MYSQL, mysql);
+        assertEquals("Expecting MySql type", DebeziumConnectorTypes.MYSQL, mysqlUpper);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testIfHandleNonValidType() {
+        final DebeziumConnectorTypes type1 = DebeziumConnectorTypes.fromString("myusq");
+    }
+
+}
\ No newline at end of file
diff --git a/components/camel-debezium/src/test/java/org/apache/camel/component/debezium/DebeziumConsumerTest.java b/components/camel-debezium/src/test/java/org/apache/camel/component/debezium/DebeziumConsumerTest.java
new file mode 100644
index 0000000..812ca2d
--- /dev/null
+++ b/components/camel-debezium/src/test/java/org/apache/camel/component/debezium/DebeziumConsumerTest.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.debezium;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.nio.file.StandardOpenOption;
+
+import io.debezium.util.Collect;
+import io.debezium.util.IoUtil;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.debezium.configuration.EmbeddedDebeziumConfiguration;
+import org.apache.camel.component.debezium.configuration.FileConnectorEmbeddedDebeziumConfiguration;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+
+public class DebeziumConsumerTest extends CamelTestSupport {
+
+    private static final int NUMBER_OF_LINES = 5;
+    private static final String DEFAULT_DATA_TESTING_FOLDER = "target/data";
+    private static final Path TEST_FILE_PATH = createTestingPath("camel-debezium-test-file-input.txt").toAbsolutePath();
+    private static final Path TEST_OFFSET_STORE_PATH = createTestingPath("camel-debezium-test-offset-store.txt").toAbsolutePath();
+    private static final String DEFAULT_TOPIC_NAME = "test_name_dummy";
+    private static final String DEFAULT_ROUTE_ID = "foo";
+
+    private File inputFile;
+    private File offsetStore;
+    private int linesAdded;
+
+    @EndpointInject("mock:result")
+    private MockEndpoint to;
+
+
+
+    @Before
+    public void beforeEach() {
+        linesAdded = 0;
+        inputFile = createTestingFile(TEST_FILE_PATH);
+        offsetStore = createTestingFile(TEST_OFFSET_STORE_PATH);
+    }
+
+    @After
+    public void afterEach() {
+        // clean all data files
+        deletePath(TEST_FILE_PATH);
+        deletePath(TEST_OFFSET_STORE_PATH);
+    }
+
+    @AfterClass
+    public static void afterClass() {
+        // make sure to clean all data files
+        deletePath(TEST_FILE_PATH);
+        deletePath(TEST_OFFSET_STORE_PATH);
+    }
+
+    @Test
+    public void camelShouldConsumeDebeziumMessages() throws Exception {
+        // add initial lines to the file
+        appendLinesToSource(NUMBER_OF_LINES);
+
+        // assert exchanges
+        to.expectedMessageCount(linesAdded);
+        to.expectedHeaderReceived(DebeziumConstants.HEADER_IDENTIFIER, DEFAULT_TOPIC_NAME);
+        to.expectedBodiesReceivedInAnyOrder("message-1", "message-2", "message-3", "message-4", "message-5");
+
+        // verify the first records if they being consumed
+        to.assertIsSatisfied(50);
+
+        // send another batch
+        appendLinesToSource(NUMBER_OF_LINES);
+
+        // assert exchanges again
+        to.expectedMessageCount(linesAdded);
+        to.expectedHeaderReceived(DebeziumConstants.HEADER_IDENTIFIER, DEFAULT_TOPIC_NAME);
+
+        to.assertIsSatisfied(50);
+    }
+
+    @Test
+    public void camelShouldContinueConsumeDebeziumMessagesWhenRouteIsOffline() throws Exception {
+        // add initial lines to the file
+        appendLinesToSource(NUMBER_OF_LINES);
+
+        // assert exchanges
+        to.expectedMessageCount(linesAdded);
+
+        // verify the first records if they being consumed
+        to.assertIsSatisfied(50);
+
+        // assert when route if off
+        to.reset();
+
+        // stop route
+        context.getRouteController().stopRoute(DEFAULT_ROUTE_ID);
+
+        // send a batch while the route is off
+        appendLinesToSource(NUMBER_OF_LINES);
+
+        // start route again
+        context.getRouteController().startRoute(DEFAULT_ROUTE_ID);
+
+        // assert exchange messages after restarting, it should continue using the offset file
+        to.expectedMessageCount(NUMBER_OF_LINES);
+        to.expectedHeaderReceived(DebeziumConstants.HEADER_IDENTIFIER, DEFAULT_TOPIC_NAME);
+        to.expectedBodiesReceivedInAnyOrder("message-6", "message-7", "message-8", "message-9", "message-10");
+
+        to.assertIsSatisfied(50);
+
+
+    }
+
+    @Override
+    protected CamelContext createCamelContext() throws Exception {
+        final CamelContext context = super.createCamelContext();
+        final DebeziumComponent component = new DebeziumComponent(context);
+
+        component.setConfiguration(initConfiguration());
+        context.addComponent("debezium", component);
+
+        context.disableJMX();
+
+        return context;
+    }
+
+    @Override
+    protected RoutesBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("debezium")
+                        .to(to);
+            }
+        };
+    }
+
+    private static Path createTestingPath(final String relativePath) {
+        return Paths.get(DEFAULT_DATA_TESTING_FOLDER, relativePath).toAbsolutePath();
+    }
+
+    private static File createTestingFile(final Path relativePath) {
+        return IoUtil.createFile(relativePath.toAbsolutePath());
+    }
+
+    private static void deletePath(final Path path) {
+        try {
+            IoUtil.delete(path);
+        } catch (IOException e) {
+            System.err.println(String.format("Unable to delete %s", path.toAbsolutePath()));
+        }
+    }
+
+    private EmbeddedDebeziumConfiguration initConfiguration() {
+        final FileConnectorEmbeddedDebeziumConfiguration configuration = new FileConnectorEmbeddedDebeziumConfiguration();
+        configuration.setName("test_name_dummy");
+        configuration.setTopicConfig(DEFAULT_TOPIC_NAME);
+        configuration.setOffsetStorageFileName(TEST_OFFSET_STORE_PATH.toAbsolutePath().toString());
+        configuration.setTestFilePath(TEST_FILE_PATH);
+        configuration.setOffsetFlushIntervalMs(0);
+
+        return configuration;
+    }
+
+    private void appendLinesToSource(int numberOfLines) throws IOException {
+        CharSequence[] lines = new CharSequence[numberOfLines];
+        for (int i = 0; i != numberOfLines; ++i) {
+            lines[i] = generateLine(linesAdded + i + 1);
+        }
+        java.nio.file.Files.write(inputFile.toPath(), Collect.arrayListOf(lines), StandardCharsets.UTF_8, StandardOpenOption.APPEND);
+        linesAdded += numberOfLines;
+    }
+
+    private String generateLine(int lineNumber) {
+        return "message-" + lineNumber;
+    }
+}
diff --git a/components/camel-debezium/src/test/java/org/apache/camel/component/debezium/DebeziumEndpointTest.java b/components/camel-debezium/src/test/java/org/apache/camel/component/debezium/DebeziumEndpointTest.java
new file mode 100644
index 0000000..b77b6cb
--- /dev/null
+++ b/components/camel-debezium/src/test/java/org/apache/camel/component/debezium/DebeziumEndpointTest.java
@@ -0,0 +1,292 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.debezium;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import io.debezium.data.Envelope;
+
+import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.component.debezium.configuration.MySqlConnectorEmbeddedDebeziumConfiguration;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DebeziumEndpointTest {
+
+    private DebeziumEndpoint debeziumEndpoint;
+
+    @Mock
+    private Processor processor;
+
+    @Before
+    public void setUp() {
+        debeziumEndpoint = new DebeziumEndpoint("", new DebeziumComponent(new DefaultCamelContext()),
+                                                new MySqlConnectorEmbeddedDebeziumConfiguration());
+    }
+
+    @Test
+    public void testIfCreatesConsumer() throws Exception {
+        final Consumer debeziumConsumer = debeziumEndpoint.createConsumer(processor);
+
+        assertNotNull(debeziumConsumer);
+    }
+
+    @Test(expected = UnsupportedOperationException.class)
+    public void testIfFailsToCreateProducer() throws Exception {
+        final Producer debeziumConsumer = debeziumEndpoint.createProducer();
+    }
+
+    @Test
+    public void testIfCreatesExchangeFromSourceCreateRecord() {
+        final SourceRecord sourceRecord = createCreateRecord();
+
+        final Exchange exchange = debeziumEndpoint.createDbzExchange(sourceRecord);
+        final Message inMessage = exchange.getIn();
+
+        assertNotNull(exchange);
+        // assert headers
+        assertEquals("dummy", inMessage.getHeader(DebeziumConstants.HEADER_IDENTIFIER));
+        assertEquals(Envelope.Operation.CREATE.code(),
+                     inMessage.getHeader(DebeziumConstants.HEADER_OPERATION));
+        final Struct key = (Struct)inMessage.getHeader(DebeziumConstants.HEADER_KEY);
+        assertEquals(12345, key.getInt32("id").intValue());
+        assertSourceMetadata(inMessage);
+        assertNotNull(inMessage.getHeader(DebeziumConstants.HEADER_TIMESTAMP));
+
+        // assert value
+        final Struct body = (Struct)inMessage.getBody();
+        assertNotNull(body);
+        assertEquals((byte)1, body.getInt8("id").byteValue());
+
+        // assert schema
+        assertSchema(body.schema());
+    }
+
+    @Test
+    public void testIfCreatesExchangeFromSourceDeleteRecord() {
+        final SourceRecord sourceRecord = createDeleteRecord();
+
+        final Exchange exchange = debeziumEndpoint.createDbzExchange(sourceRecord);
+        final Message inMessage = exchange.getIn();
+
+        assertNotNull(exchange);
+        // assert headers
+        assertEquals("dummy", inMessage.getHeader(DebeziumConstants.HEADER_IDENTIFIER));
+        assertEquals(Envelope.Operation.DELETE.code(),
+                     inMessage.getHeader(DebeziumConstants.HEADER_OPERATION));
+        final Struct key = (Struct)inMessage.getHeader(DebeziumConstants.HEADER_KEY);
+        assertEquals(12345, key.getInt32("id").intValue());
+        assertNotNull(inMessage.getHeader(DebeziumConstants.HEADER_BEFORE));
+
+        // assert value
+        final Struct body = (Struct)inMessage.getBody();
+        assertNull(body); // we expect body to be null since is a delete
+    }
+
+    @Test
+    public void testIfCreatesExchangeFromSourceDeleteRecordWithNull() {
+        final SourceRecord sourceRecord = createDeleteRecordWithNull();
+
+        final Exchange exchange = debeziumEndpoint.createDbzExchange(sourceRecord);
+        final Message inMessage = exchange.getIn();
+
+        assertNotNull(exchange);
+        // assert headers
+        assertEquals("dummy", inMessage.getHeader(DebeziumConstants.HEADER_IDENTIFIER));
+        final Struct key = (Struct)inMessage.getHeader(DebeziumConstants.HEADER_KEY);
+        assertEquals(12345, key.getInt32("id").intValue());
+
+        // assert value
+        final Struct body = (Struct)inMessage.getBody();
+        assertNull(body);
+    }
+
+    @Test
+    public void testIfCreatesExchangeFromSourceUpdateRecord() {
+        final SourceRecord sourceRecord = createUpdateRecord();
+
+        final Exchange exchange = debeziumEndpoint.createDbzExchange(sourceRecord);
+        final Message inMessage = exchange.getIn();
+
+        assertNotNull(exchange);
+        // assert headers
+        assertEquals("dummy", inMessage.getHeader(DebeziumConstants.HEADER_IDENTIFIER));
+        assertEquals(Envelope.Operation.UPDATE.code(),
+                     inMessage.getHeader(DebeziumConstants.HEADER_OPERATION));
+        final Struct key = (Struct)inMessage.getHeader(DebeziumConstants.HEADER_KEY);
+        assertEquals(12345, key.getInt32("id").intValue());
+        assertSourceMetadata(inMessage);
+
+        // assert value
+        final Struct before = (Struct) inMessage.getHeader(DebeziumConstants.HEADER_BEFORE);
+        final Struct after = (Struct)inMessage.getBody();
+        assertNotNull(before);
+        assertNotNull(after);
+        assertEquals((byte)1, before.getInt8("id").byteValue());
+        assertEquals((byte)2, after.getInt8("id").byteValue());
+    }
+
+    @Test
+    public void testIfCreatesExchangeFromSourceRecordOtherThanStruct() {
+        final SourceRecord sourceRecord = createStringRecord();
+
+        final Exchange exchange = debeziumEndpoint.createDbzExchange(sourceRecord);
+        final Message inMessage = exchange.getIn();
+
+        assertNotNull(exchange);
+
+        // assert headers
+        assertEquals("dummy", inMessage.getHeader(DebeziumConstants.HEADER_IDENTIFIER));
+        assertNull(inMessage.getHeader(DebeziumConstants.HEADER_OPERATION));
+
+        // assert value
+        final String value = (String) inMessage.getBody();
+        assertEquals(sourceRecord.value(), value);
+    }
+
+    @Test
+    public void testIfHandlesUnknownSchema() {
+        final SourceRecord sourceRecord = createUnknownUnnamedSchemaRecord();
+
+        final Exchange exchange = debeziumEndpoint.createDbzExchange(sourceRecord);
+        final Message inMessage = exchange.getIn();
+
+        assertNotNull(exchange);
+        // assert headers
+        assertEquals("dummy", inMessage.getHeader(DebeziumConstants.HEADER_IDENTIFIER));
+        assertNull(inMessage.getHeader(DebeziumConstants.HEADER_OPERATION));
+        assertNull(inMessage.getHeader(DebeziumConstants.HEADER_KEY));
+
+        // assert value
+        final Struct body = (Struct)inMessage.getBody();
+        // we have to get value of after with struct, we are strict about this case
+        assertNull(body);
+    }
+
+    private SourceRecord createCreateRecord() {
+        final Schema recordSchema = SchemaBuilder.struct().field("id", SchemaBuilder.int8()).build();
+        final Schema sourceSchema = SchemaBuilder.struct().field("lsn", SchemaBuilder.int32()).build();
+        Envelope envelope = Envelope.defineSchema().withName("dummy.Envelope").withRecord(recordSchema)
+            .withSource(sourceSchema).build();
+        final Struct after = new Struct(recordSchema);
+        final Struct source = new Struct(sourceSchema);
+
+        after.put("id", (byte)1);
+        source.put("lsn", 1234);
+        final Struct payload = envelope.create(after, source, System.nanoTime());
+        return new SourceRecord(new HashMap<>(), createSourceOffset(), "dummy", createKeySchema(),
+                                createKeyRecord(), envelope.schema(), payload);
+    }
+
+    private SourceRecord createDeleteRecord() {
+        final Schema recordSchema = SchemaBuilder.struct().field("id", SchemaBuilder.int8()).build();
+        Envelope envelope = Envelope.defineSchema().withName("dummy.Envelope").withRecord(recordSchema)
+            .withSource(SchemaBuilder.struct().build()).build();
+        final Struct before = new Struct(recordSchema);
+        before.put("id", (byte)1);
+        final Struct payload = envelope.delete(before, null, System.nanoTime());
+        return new SourceRecord(new HashMap<>(), createSourceOffset(), "dummy", createKeySchema(),
+                                createKeyRecord(), envelope.schema(), payload);
+    }
+
+    private SourceRecord createDeleteRecordWithNull() {
+        final Schema recordSchema = SchemaBuilder.struct().field("id", SchemaBuilder.int8()).build();
+        Envelope envelope = Envelope.defineSchema().withName("dummy.Envelope").withRecord(recordSchema)
+            .withSource(SchemaBuilder.struct().build()).build();
+        final Struct before = new Struct(recordSchema);
+        before.put("id", (byte)1);
+        return new SourceRecord(new HashMap<>(), createSourceOffset(), "dummy", createKeySchema(),
+                                createKeyRecord(), null, null);
+    }
+
+    private SourceRecord createUpdateRecord() {
+        final Schema recordSchema = SchemaBuilder.struct().field("id", SchemaBuilder.int8()).build();
+        final Schema sourceSchema = SchemaBuilder.struct().field("lsn", SchemaBuilder.int32()).build();
+        Envelope envelope = Envelope.defineSchema().withName("dummy.Envelope").withRecord(recordSchema)
+            .withSource(sourceSchema).build();
+        final Struct before = new Struct(recordSchema);
+        final Struct source = new Struct(sourceSchema);
+        final Struct after = new Struct(recordSchema);
+
+        before.put("id", (byte)1);
+        after.put("id", (byte)2);
+        source.put("lsn", 1234);
+        final Struct payload = envelope.update(before, after, source, System.nanoTime());
+        return new SourceRecord(new HashMap<>(), createSourceOffset(), "dummy", createKeySchema(),
+                                createKeyRecord(), envelope.schema(), payload);
+    }
+
+    private SourceRecord createUnknownUnnamedSchemaRecord() {
+        final Schema recordSchema = SchemaBuilder.struct().field("id", SchemaBuilder.int8()).build();
+        final Struct before = new Struct(recordSchema);
+        before.put("id", (byte)1);
+        return new SourceRecord(new HashMap<>(), new HashMap<>(), "dummy", recordSchema, before);
+    }
+
+    private SourceRecord createStringRecord() {
+        final Schema recordSchema = SchemaBuilder.STRING_SCHEMA;
+        return new SourceRecord(new HashMap<>(), createSourceOffset(), "dummy", recordSchema, "test_record");
+    }
+
+    private HashMap<String, ?> createSourceOffset() {
+        final HashMap<String, Integer> sourceOffsets = new HashMap<>();
+        sourceOffsets.put("pos", 111);
+
+        return sourceOffsets;
+    }
+
+    private Schema createKeySchema() {
+        return SchemaBuilder.struct().field("id", SchemaBuilder.int32().build());
+    }
+
+    private Struct createKeyRecord() {
+        final Struct key = new Struct(createKeySchema());
+        key.put("id", 12345);
+
+        return key;
+    }
+
+    private void assertSourceMetadata(final Message inMessage) {
+        @SuppressWarnings("unchecked")
+        final Map<String, Object> source = inMessage.getHeader(DebeziumConstants.HEADER_SOURCE_METADATA, Map.class);
+        assertEquals(1234, source.get("lsn"));
+    }
+
+    private void assertSchema(final Schema schema) {
+        assertNotNull(schema);
+        assertFalse(schema.fields().isEmpty());
+    }
+}
diff --git a/components/camel-debezium/src/test/java/org/apache/camel/component/debezium/DebeziumTypeConverterTest.java b/components/camel-debezium/src/test/java/org/apache/camel/component/debezium/DebeziumTypeConverterTest.java
new file mode 100644
index 0000000..d30ebed
--- /dev/null
+++ b/components/camel-debezium/src/test/java/org/apache/camel/component/debezium/DebeziumTypeConverterTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.debezium;
+
+import java.util.Map;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+public class DebeziumTypeConverterTest {
+
+    @Test
+    public void testToMapFromStruct() {
+        final Struct inputValue = createTestStruct(12, "test-name", true);
+
+        // convert toMap
+        final Map<String, Object> outputValue = DebeziumTypeConverter.toMap(inputValue);
+
+        // assert
+        assertNotNull(outputValue);
+        assertEquals(12, outputValue.get("id"));
+        assertEquals("test-name", outputValue.get("name"));
+        assertNull(outputValue.get("extra"));
+        assertTrue((boolean)outputValue.get("valid"));
+    }
+
+    private Struct createTestStruct(final int id, final String name, final boolean valid) {
+        final Schema schema = SchemaBuilder.struct()
+                                .field("id", SchemaBuilder.INT32_SCHEMA)
+                                .field("name", SchemaBuilder.STRING_SCHEMA)
+                                .field("valid", SchemaBuilder.BOOLEAN_SCHEMA)
+                                .field("extra", SchemaBuilder.STRING_SCHEMA)
+                                .build();
+
+        final Struct value = new Struct(schema);
+        value.put("id", id);
+        value.put("name", name);
+        value.put("valid", valid);
+
+        return value;
+    }
+}
\ No newline at end of file
diff --git a/components/camel-debezium/src/test/java/org/apache/camel/component/debezium/configuration/EmbeddedDebeziumConfigurationTest.java b/components/camel-debezium/src/test/java/org/apache/camel/component/debezium/configuration/EmbeddedDebeziumConfigurationTest.java
new file mode 100644
index 0000000..867842f
--- /dev/null
+++ b/components/camel-debezium/src/test/java/org/apache/camel/component/debezium/configuration/EmbeddedDebeziumConfigurationTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.debezium.configuration;
+
+import io.debezium.config.Configuration;
+import io.debezium.embedded.EmbeddedEngine;
+import org.apache.camel.component.debezium.DebeziumConstants;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class EmbeddedDebeziumConfigurationTest {
+
+    @Test
+    public void testIfCreatesConfig() {
+        final TestEmbeddedDebeziumConfiguration configuration = new TestEmbeddedDebeziumConfiguration();
+        configuration.setName("test_config");
+        configuration.setOffsetStorageReplicationFactor(2);
+        configuration.setTestField("test_field");
+
+        final Configuration dbzEmbeddedConfiguration = configuration.createDebeziumConfiguration();
+
+        assertEquals("Expect the same name", "test_config",
+                dbzEmbeddedConfiguration.getString(EmbeddedEngine.ENGINE_NAME));
+        assertEquals(2, dbzEmbeddedConfiguration
+                .getInteger(EmbeddedEngine.OFFSET_STORAGE_KAFKA_REPLICATION_FACTOR));
+        assertEquals(DebeziumConstants.DEFAULT_OFFSET_STORAGE,
+                dbzEmbeddedConfiguration.getString(EmbeddedEngine.OFFSET_STORAGE));
+        assertEquals("test_field", configuration.getTestField());
+        assertEquals(Class.class.getName(), dbzEmbeddedConfiguration.getString(EmbeddedEngine.CONNECTOR_CLASS));
+    }
+
+    @Test
+    public void testIfValidatesConfigurationCorrectly() {
+        final TestEmbeddedDebeziumConfiguration configuration = new TestEmbeddedDebeziumConfiguration();
+        configuration.setName("test_config");
+        configuration.setOffsetStorageReplicationFactor(2);
+        configuration.setOffsetStorageFileName("/file");
+
+        // not all required fields being set
+        assertFalse(configuration.validateConfiguration().isValid());
+
+        // all required fields being set
+        configuration.setTestField("test_field");
+
+        assertTrue(configuration.validateConfiguration().isValid());
+    }
+
+}
\ No newline at end of file
diff --git a/components/camel-debezium/src/test/java/org/apache/camel/component/debezium/configuration/FileConnectorEmbeddedDebeziumConfiguration.java b/components/camel-debezium/src/test/java/org/apache/camel/component/debezium/configuration/FileConnectorEmbeddedDebeziumConfiguration.java
new file mode 100644
index 0000000..3606139
--- /dev/null
+++ b/components/camel-debezium/src/test/java/org/apache/camel/component/debezium/configuration/FileConnectorEmbeddedDebeziumConfiguration.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.debezium.configuration;
+
+import java.nio.file.Path;
+
+import io.debezium.config.Configuration;
+
+import org.apache.kafka.connect.file.FileStreamSourceConnector;
+
+public class FileConnectorEmbeddedDebeziumConfiguration extends EmbeddedDebeziumConfiguration {
+
+    private Path testFilePath;
+    private String topicConfig;
+
+    @Override
+    protected Configuration createConnectorConfiguration() {
+        return Configuration.create()
+                .with(FileStreamSourceConnector.FILE_CONFIG, testFilePath)
+                .with(FileStreamSourceConnector.TOPIC_CONFIG, topicConfig)
+                .build();
+    }
+
+    @Override
+    protected ConfigurationValidation validateConnectorConfiguration() {
+        if (isFieldValueNotSet(testFilePath)) {
+            return ConfigurationValidation.notValid("testFilePath is not set");
+        }
+        if (isFieldValueNotSet(topicConfig)) {
+            return ConfigurationValidation.notValid("topicConfig is not set");
+        }
+        return ConfigurationValidation.valid();
+    }
+
+    @Override
+    protected Class<?> configureConnectorClass() {
+        return FileStreamSourceConnector.class;
+    }
+
+    public Path getTestFilePath() {
+        return testFilePath;
+    }
+
+    public void setTestFilePath(Path testFilePath) {
+        this.testFilePath = testFilePath;
+    }
+
+    public String getTopicConfig() {
+        return topicConfig;
+    }
+
+    public void setTopicConfig(String topicConfig) {
+        this.topicConfig = topicConfig;
+    }
+}
diff --git a/components/camel-debezium/src/test/java/org/apache/camel/component/debezium/configuration/MySqlConnectorEmbeddedDebeziumConfigurationTest.java b/components/camel-debezium/src/test/java/org/apache/camel/component/debezium/configuration/MySqlConnectorEmbeddedDebeziumConfigurationTest.java
new file mode 100644
index 0000000..f180a70
--- /dev/null
+++ b/components/camel-debezium/src/test/java/org/apache/camel/component/debezium/configuration/MySqlConnectorEmbeddedDebeziumConfigurationTest.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.debezium.configuration;
+
+import io.debezium.config.Configuration;
+import io.debezium.connector.mysql.MySqlConnector;
+import io.debezium.connector.mysql.MySqlConnectorConfig;
+import io.debezium.embedded.EmbeddedEngine;
+import org.apache.camel.component.debezium.DebeziumConstants;
+import org.junit.Test;
+
+import static org.junit.Assert.*;
+
+public class MySqlConnectorEmbeddedDebeziumConfigurationTest {
+
+    @Test
+    public void testIfCreatesConfig() {
+        final MySqlConnectorEmbeddedDebeziumConfiguration configuration = new MySqlConnectorEmbeddedDebeziumConfiguration();
+        configuration.setName("test_config");
+        configuration.setDatabaseUser("test_user");
+        configuration.setIncludeQuery(true);
+        configuration.setMaxQueueSize(1212);
+
+        final Configuration dbzMysqlConfigurations = configuration.createDebeziumConfiguration();
+
+        assertEquals("test_config", dbzMysqlConfigurations.getString(EmbeddedEngine.ENGINE_NAME));
+        assertEquals("test_user", dbzMysqlConfigurations.getString(MySqlConnectorConfig.USER));
+        assertTrue(dbzMysqlConfigurations.getBoolean(MySqlConnectorConfig.INCLUDE_SQL_QUERY));
+        assertEquals(1212, dbzMysqlConfigurations.getInteger(MySqlConnectorConfig.MAX_QUEUE_SIZE));
+        assertEquals(30000,
+                dbzMysqlConfigurations.getLong(MySqlConnectorConfig.CONNECTION_TIMEOUT_MS));
+        assertEquals(MySqlConnector.class.getName(), dbzMysqlConfigurations.getString(EmbeddedEngine.CONNECTOR_CLASS));
+        assertTrue(dbzMysqlConfigurations.getBoolean(MySqlConnectorConfig.INCLUDE_SQL_QUERY));
+        assertEquals(DebeziumConstants.DEFAULT_OFFSET_STORAGE,
+                dbzMysqlConfigurations.getString(EmbeddedEngine.OFFSET_STORAGE));
+        assertEquals(30000,
+                dbzMysqlConfigurations.getLong(MySqlConnectorConfig.CONNECTION_TIMEOUT_MS));
+    }
+
+    @Test
+    public void testIfValidatesConfigurationCorrectly() {
+        final MySqlConnectorEmbeddedDebeziumConfiguration configuration = new MySqlConnectorEmbeddedDebeziumConfiguration();
+
+        configuration.setName("test_config");
+        configuration.setDatabaseUser("test_db");
+        configuration.setDatabaseServerId(123);
+        configuration.setDatabaseServerName("test_server");
+        configuration.setOffsetStorageFileName("/offset/file");
+        configuration.setDatabaseHistoryFileName("/database_history/file");
+
+        assertFalse(configuration.validateConfiguration().isValid());
+
+        configuration.setDatabaseHostName("localhost");
+        configuration.setDatabasePassword("test_pwd");
+
+        assertTrue(configuration.validateConfiguration().isValid());
+    }
+
+    @Test
+    public void testValidateConfigurationsForAllRequiredFields() {
+        final MySqlConnectorEmbeddedDebeziumConfiguration configuration = new MySqlConnectorEmbeddedDebeziumConfiguration();
+        configuration.setName("test_config");
+        configuration.setDatabaseUser("test_db");
+        configuration.setDatabaseHostName("localhost");
+        configuration.setDatabasePassword("test_pwd");
+        configuration.setDatabaseServerId(123);
+        configuration.setDatabaseServerName("test_server");
+        configuration.setOffsetStorageFileName("/offset/file");
+        configuration.setDatabaseHistoryFileName("/database_history/file");
+
+        final ConfigurationValidation validation = configuration.validateConfiguration();
+        assertTrue(validation.isValid());
+
+        assertEquals("test_config", configuration.getName());
+        assertEquals("test_db", configuration.getDatabaseUser());
+        assertEquals("localhost", configuration.getDatabaseHostName());
+        assertEquals("test_pwd", configuration.getDatabasePassword());
+        assertEquals(123, configuration.getDatabaseServerId());
+        assertEquals("test_server", configuration.getDatabaseServerName());
+        assertEquals("/offset/file", configuration.getOffsetStorageFileName());
+        assertEquals("/database_history/file", configuration.getDatabaseHistoryFileName());
+    }
+
+}
\ No newline at end of file
diff --git a/components/camel-debezium/src/test/java/org/apache/camel/component/debezium/configuration/TestEmbeddedDebeziumConfiguration.java b/components/camel-debezium/src/test/java/org/apache/camel/component/debezium/configuration/TestEmbeddedDebeziumConfiguration.java
new file mode 100644
index 0000000..4254c03
--- /dev/null
+++ b/components/camel-debezium/src/test/java/org/apache/camel/component/debezium/configuration/TestEmbeddedDebeziumConfiguration.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.debezium.configuration;
+
+import io.debezium.config.Configuration;
+
+public class TestEmbeddedDebeziumConfiguration extends EmbeddedDebeziumConfiguration {
+
+    private String testField;
+
+    public String getTestField() {
+        return testField;
+    }
+
+    public void setTestField(String testField) {
+        this.testField = testField;
+    }
+
+    @Override
+    protected Class<?> configureConnectorClass() {
+        return Class.class;
+    }
+
+    @Override
+    protected Configuration createConnectorConfiguration() {
+        return Configuration.create().with("test.field", testField).build();
+    }
+
+    @Override
+    protected ConfigurationValidation validateConnectorConfiguration() {
+        if (isFieldValueNotSet(testField)) {
+            return ConfigurationValidation.notValid("testField cannot be empty");
+        }
+        return ConfigurationValidation.valid();
+    }
+}
diff --git a/components/camel-debezium/src/test/resources/log4j2.properties b/components/camel-debezium/src/test/resources/log4j2.properties
new file mode 100644
index 0000000..0b3e988
--- /dev/null
+++ b/components/camel-debezium/src/test/resources/log4j2.properties
@@ -0,0 +1,40 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+##      http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+appender.file.type = File
+appender.file.name = file
+appender.file.fileName = target/camel-debezium-test.log
+appender.file.layout.type = PatternLayout
+appender.file.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n
+
+appender.out.type = Console
+appender.out.name = out
+appender.out.layout.type = PatternLayout
+appender.out.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n
+
+rootLogger.level = INFO
+rootLogger.appenderRef.out.ref = out
+
+logger.camel.name=org.apache.camel
+logger.camel.level=INFO
+
+logger.camel-debezium.name=org.apache.camel.component.debezium
+logger.camel-debezium.level=INFO
+
+logger.debezium.name=io.debezium
+logger.debezium.level=INFO
+
diff --git a/components/pom.xml b/components/pom.xml
index 4762e0f..e8e8280 100644
--- a/components/pom.xml
+++ b/components/pom.xml
@@ -154,6 +154,7 @@
         <module>camel-dozer</module>
         <module>camel-drill</module>
         <module>camel-dropbox</module>
+        <module>camel-debezium</module>
         <module>camel-ehcache</module>
         <module>camel-elasticsearch-rest</module>
         <module>camel-elsql</module>
diff --git a/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/DebeziumEndpointBuilderFactory.java b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/DebeziumEndpointBuilderFactory.java
new file mode 100644
index 0000000..bd4ae72
--- /dev/null
+++ b/core/camel-endpointdsl/src/main/java/org/apache/camel/builder/endpoint/dsl/DebeziumEndpointBuilderFactory.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.builder.endpoint.dsl;
+
+import javax.annotation.Generated;
+import org.apache.camel.builder.EndpointConsumerBuilder;
+import org.apache.camel.builder.EndpointProducerBuilder;
+import org.apache.camel.builder.endpoint.AbstractEndpointBuilder;
+
+/**
+ * Represents a Debezium endpoint which is used for interacting with Debezium
+ * embedded engine.
+ *
+ * Generated by camel-package-maven-plugin - do not edit this file!
+ */
+@Generated("org.apache.camel.maven.packaging.EndpointDslMojo")
+public interface DebeziumEndpointBuilderFactory {
+
+
+    /**
+     * Builder for endpoint for the Debezium component.
+     */
+    public interface DebeziumEndpointBuilder extends EndpointConsumerBuilder {
+        default AdvancedDebeziumEndpointBuilder advanced() {
+            return (AdvancedDebeziumEndpointBuilder) this;
+        }
+    }
+
+    /**
+     * Advanced builder for endpoint for the Debezium component.
+     */
+    public interface AdvancedDebeziumEndpointBuilder
+            extends
+                EndpointConsumerBuilder {
+        default DebeziumEndpointBuilder basic() {
+            return (DebeziumEndpointBuilder) this;
+        }
+        /**
+         * Whether the endpoint should use basic property binding (Camel 2.x) or
+         * the newer property binding with additional capabilities.
+         *
+         * The option is a: <code>boolean</code> type.
+         *
+         * Group: advanced
+         */
+        default AdvancedDebeziumEndpointBuilder basicPropertyBinding(
+                boolean basicPropertyBinding) {
+            doSetProperty("basicPropertyBinding", basicPropertyBinding);
+            return this;
+        }
+        /**
+         * Whether the endpoint should use basic property binding (Camel 2.x) or
+         * the newer property binding with additional capabilities.
+         *
+         * The option will be converted to a <code>boolean</code> type.
+         *
+         * Group: advanced
+         */
+        default AdvancedDebeziumEndpointBuilder basicPropertyBinding(
+                String basicPropertyBinding) {
+            doSetProperty("basicPropertyBinding", basicPropertyBinding);
+            return this;
+        }
+        /**
+         * Sets whether synchronous processing should be strictly used, or Camel
+         * is allowed to use asynchronous processing (if supported).
+         *
+         * The option is a: <code>boolean</code> type.
+         *
+         * Group: advanced
+         */
+        default AdvancedDebeziumEndpointBuilder synchronous(boolean synchronous) {
+            doSetProperty("synchronous", synchronous);
+            return this;
+        }
+        /**
+         * Sets whether synchronous processing should be strictly used, or Camel
+         * is allowed to use asynchronous processing (if supported).
+         *
+         * The option will be converted to a <code>boolean</code> type.
+         *
+         * Group: advanced
+         */
+        default AdvancedDebeziumEndpointBuilder synchronous(String synchronous) {
+            doSetProperty("synchronous", synchronous);
+            return this;
+        }
+    }
+    /**
+     * Debezium (camel-debezium)
+     * Represents a Debezium endpoint which is used for interacting with
+     * Debezium embedded engine.
+     *
+     * Category: database,sql,nosql
+     * Available as of version: 3.0
+     * Maven coordinates: org.apache.camel:camel-debezium
+     *
+     * Syntax: <code>debezium:connectorType</code>
+     *
+     * Path parameter: connectorType (required)
+     * The Debezium connector type that is supported by Camel Debezium
+     * component.
+     */
+    default DebeziumEndpointBuilder debezium(String path) {
+        class DebeziumEndpointBuilderImpl extends AbstractEndpointBuilder implements DebeziumEndpointBuilder, AdvancedDebeziumEndpointBuilder {
+            public DebeziumEndpointBuilderImpl(String path) {
+                super("debezium", path);
+            }
+        }
+        return new DebeziumEndpointBuilderImpl(path);
+    }
+}
\ No newline at end of file
diff --git a/parent/pom.xml b/parent/pom.xml
index 0ecd0a8..a483ce0 100644
--- a/parent/pom.xml
+++ b/parent/pom.xml
@@ -172,6 +172,7 @@
         <dozer-version>6.4.1</dozer-version>
         <drools-version>7.26.0.Final</drools-version>
         <dropbox-version>3.1.1</dropbox-version>
+        <debezium-version>0.9.5.Final</debezium-version>
         <egit-github-core-version>2.1.5</egit-github-core-version>
         <egit-github-core-bundle-version>2.1.5_1</egit-github-core-bundle-version>
         <ehcache-version>2.10.6</ehcache-version>
@@ -1171,6 +1172,11 @@
       </dependency>
       <dependency>
         <groupId>org.apache.camel</groupId>
+        <artifactId>camel-debezium</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.camel</groupId>
         <artifactId>camel-digitalocean</artifactId>
         <version>${project.version}</version>
       </dependency>
@@ -2750,6 +2756,11 @@
       </dependency>
       <dependency>
         <groupId>org.apache.camel</groupId>
+        <artifactId>camel-debezium-starter</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.camel</groupId>
         <artifactId>camel-digitalocean-starter</artifactId>
         <version>${project.version}</version>
       </dependency>
diff --git a/platforms/karaf/features/src/main/resources/features.xml b/platforms/karaf/features/src/main/resources/features.xml
index 7c0d1fe..d35ac43 100644
--- a/platforms/karaf/features/src/main/resources/features.xml
+++ b/platforms/karaf/features/src/main/resources/features.xml
@@ -878,6 +878,15 @@
     <bundle dependency='true'>wrap:mvn:com.squareup.okhttp3/okhttp/3.5.0</bundle>
     <bundle>mvn:org.apache.camel/camel-dropbox/${project.version}</bundle>
   </feature>
+  <feature name='camel-debezium' version='${project.version}' start-level='50'>
+    <feature version="${project.version}">camel-core</feature>
+    <bundle dependency="true">wrap:mvn:io.debezium/debezium-core/${debezium-version}</bundle>
+    <bundle dependency="true">wrap:mvn:io.debezium/debezium-embedded/${debezium-version}</bundle>
+    <bundle dependency="true">wrap:mvn:io.debezium/debezium-connector-mysql/${debezium-version}</bundle>
+    <bundle dependency="true">wrap:mvn:org.apache.kafka/connect-api/${kafka-version}</bundle>
+    <bundle dependency="true">wrap:mvn:org.apache.kafka/connect-json/${kafka-version}</bundle>
+    <bundle>mvn:org.apache.camel/camel-debezium/${project.version}</bundle>
+  </feature>
   <feature name='camel-ehcache' version='${project.version}' start-level='50'>
     <feature version="${project.version}">camel-core</feature>
     <bundle dependency="true">mvn:org.ehcache/ehcache/${ehcache3-version}</bundle>
diff --git a/platforms/spring-boot/components-starter/camel-debezium-starter/pom.xml b/platforms/spring-boot/components-starter/camel-debezium-starter/pom.xml
new file mode 100644
index 0000000..4c57ead
--- /dev/null
+++ b/platforms/spring-boot/components-starter/camel-debezium-starter/pom.xml
@@ -0,0 +1,53 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+         http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.camel</groupId>
+    <artifactId>components-starter</artifactId>
+    <version>3.0.0-SNAPSHOT</version>
+  </parent>
+  <artifactId>camel-debezium-starter</artifactId>
+  <packaging>jar</packaging>
+  <name>Spring-Boot Starter :: Camel :: Debezium</name>
+  <description>Spring-Boot Starter for Camel Debezium support</description>
+  <dependencies>
+    <dependency>
+      <groupId>org.springframework.boot</groupId>
+      <artifactId>spring-boot-starter</artifactId>
+      <version>${spring-boot-version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-debezium</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+    <!--START OF GENERATED CODE-->
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-core-starter</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.camel</groupId>
+      <artifactId>camel-spring-boot-starter</artifactId>
+    </dependency>
+    <!--END OF GENERATED CODE-->
+  </dependencies>
+</project>
diff --git a/platforms/spring-boot/components-starter/camel-debezium-starter/src/main/java/org/apache/camel/component/debezium/springboot/DebeziumComponentAutoConfiguration.java b/platforms/spring-boot/components-starter/camel-debezium-starter/src/main/java/org/apache/camel/component/debezium/springboot/DebeziumComponentAutoConfiguration.java
new file mode 100644
index 0000000..0d8e32a
--- /dev/null
+++ b/platforms/spring-boot/components-starter/camel-debezium-starter/src/main/java/org/apache/camel/component/debezium/springboot/DebeziumComponentAutoConfiguration.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.debezium.springboot;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Generated;
+import org.apache.camel.CamelContext;
+import org.apache.camel.component.debezium.DebeziumComponent;
+import org.apache.camel.spi.ComponentCustomizer;
+import org.apache.camel.spi.HasId;
+import org.apache.camel.spring.boot.CamelAutoConfiguration;
+import org.apache.camel.spring.boot.ComponentConfigurationProperties;
+import org.apache.camel.spring.boot.util.CamelPropertiesHelper;
+import org.apache.camel.spring.boot.util.ConditionalOnCamelContextAndAutoConfigurationBeans;
+import org.apache.camel.spring.boot.util.GroupCondition;
+import org.apache.camel.spring.boot.util.HierarchicalPropertiesEvaluator;
+import org.apache.camel.support.IntrospectionSupport;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.AutoConfigureAfter;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Conditional;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.Lazy;
+
+/**
+ * Generated by camel-package-maven-plugin - do not edit this file!
+ */
+@Generated("org.apache.camel.maven.packaging.SpringBootAutoConfigurationMojo")
+@Configuration
+@Conditional({ConditionalOnCamelContextAndAutoConfigurationBeans.class,
+        DebeziumComponentAutoConfiguration.GroupConditions.class})
+@AutoConfigureAfter(CamelAutoConfiguration.class)
+@EnableConfigurationProperties({ComponentConfigurationProperties.class,
+        DebeziumComponentConfiguration.class})
+public class DebeziumComponentAutoConfiguration {
+
+    private static final Logger LOGGER = LoggerFactory
+            .getLogger(DebeziumComponentAutoConfiguration.class);
+    @Autowired
+    private ApplicationContext applicationContext;
+    @Autowired
+    private CamelContext camelContext;
+    @Autowired
+    private DebeziumComponentConfiguration configuration;
+    @Autowired(required = false)
+    private List<ComponentCustomizer<DebeziumComponent>> customizers;
+
+    static class GroupConditions extends GroupCondition {
+        public GroupConditions() {
+            super("camel.component", "camel.component.debezium");
+        }
+    }
+
+    @Lazy
+    @Bean(name = "debezium-component")
+    @ConditionalOnMissingBean(DebeziumComponent.class)
+    public DebeziumComponent configureDebeziumComponent() throws Exception {
+        DebeziumComponent component = new DebeziumComponent();
+        component.setCamelContext(camelContext);
+        Map<String, Object> parameters = new HashMap<>();
+        IntrospectionSupport.getProperties(configuration, parameters, null,
+                false);
+        for (Map.Entry<String, Object> entry : parameters.entrySet()) {
+            Object value = entry.getValue();
+            Class<?> paramClass = value.getClass();
+            if (paramClass.getName().endsWith("NestedConfiguration")) {
+                Class nestedClass = null;
+                try {
+                    nestedClass = (Class) paramClass.getDeclaredField(
+                            "CAMEL_NESTED_CLASS").get(null);
+                    HashMap<String, Object> nestedParameters = new HashMap<>();
+                    IntrospectionSupport.getProperties(value, nestedParameters,
+                            null, false);
+                    Object nestedProperty = nestedClass.newInstance();
+                    CamelPropertiesHelper.setCamelProperties(camelContext,
+                            nestedProperty, nestedParameters, false);
+                    entry.setValue(nestedProperty);
+                } catch (NoSuchFieldException e) {
+                }
+            }
+        }
+        CamelPropertiesHelper.setCamelProperties(camelContext, component,
+                parameters, false);
+        if (ObjectHelper.isNotEmpty(customizers)) {
+            for (ComponentCustomizer<DebeziumComponent> customizer : customizers) {
+                boolean useCustomizer = (customizer instanceof HasId)
+                        ? HierarchicalPropertiesEvaluator.evaluate(
+                                applicationContext.getEnvironment(),
+                                "camel.component.customizer",
+                                "camel.component.debezium.customizer",
+                                ((HasId) customizer).getId())
+                        : HierarchicalPropertiesEvaluator.evaluate(
+                                applicationContext.getEnvironment(),
+                                "camel.component.customizer",
+                                "camel.component.debezium.customizer");
+                if (useCustomizer) {
+                    LOGGER.debug("Configure component {}, with customizer {}",
+                            component, customizer);
+                    customizer.customize(component);
+                }
+            }
+        }
+        return component;
+    }
+}
\ No newline at end of file
diff --git a/platforms/spring-boot/components-starter/camel-debezium-starter/src/main/java/org/apache/camel/component/debezium/springboot/DebeziumComponentConfiguration.java b/platforms/spring-boot/components-starter/camel-debezium-starter/src/main/java/org/apache/camel/component/debezium/springboot/DebeziumComponentConfiguration.java
new file mode 100644
index 0000000..3bbcefe
--- /dev/null
+++ b/platforms/spring-boot/components-starter/camel-debezium-starter/src/main/java/org/apache/camel/component/debezium/springboot/DebeziumComponentConfiguration.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.debezium.springboot;
+
+import javax.annotation.Generated;
+import org.apache.camel.spring.boot.ComponentConfigurationPropertiesCommon;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+/**
+ * Represents a Debezium endpoint which is used for interacting with Debezium
+ * embedded engine.
+ *
+ * Generated by camel-package-maven-plugin - do not edit this file!
+ */
+@Generated("org.apache.camel.maven.packaging.SpringBootAutoConfigurationMojo")
+@ConfigurationProperties(prefix = "camel.component.debezium")
+public class DebeziumComponentConfiguration
+        extends
+            ComponentConfigurationPropertiesCommon {
+
+    /**
+     * Whether to enable auto configuration of the debezium component. This is
+     * enabled by default.
+     */
+    private Boolean enabled;
+    /**
+     * Allow pre-configured Configurations to be set, you will need to extend
+     * EmbeddedDebeziumConfiguration in order to create the configuration for
+     * the component. The option is a
+     * org.apache.camel.component.debezium.configuration.EmbeddedDebeziumConfiguration type.
+     */
+    private String configuration;
+    /**
+     * Whether the component should use basic property binding (Camel 2.x) or
+     * the newer property binding with additional capabilities
+     */
+    private Boolean basicPropertyBinding = false;
+
+    public String getConfiguration() {
+        return configuration;
+    }
+
+    public void setConfiguration(String configuration) {
+        this.configuration = configuration;
+    }
+
+    public Boolean getBasicPropertyBinding() {
+        return basicPropertyBinding;
+    }
+
+    public void setBasicPropertyBinding(Boolean basicPropertyBinding) {
+        this.basicPropertyBinding = basicPropertyBinding;
+    }
+}
\ No newline at end of file
diff --git a/platforms/spring-boot/components-starter/camel-debezium-starter/src/main/resources/META-INF/LICENSE.txt b/platforms/spring-boot/components-starter/camel-debezium-starter/src/main/resources/META-INF/LICENSE.txt
new file mode 100644
index 0000000..6b0b127
--- /dev/null
+++ b/platforms/spring-boot/components-starter/camel-debezium-starter/src/main/resources/META-INF/LICENSE.txt
@@ -0,0 +1,203 @@
+
+                                 Apache License
+                           Version 2.0, January 2004
+                        http://www.apache.org/licenses/
+
+   TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+   1. Definitions.
+
+      "License" shall mean the terms and conditions for use, reproduction,
+      and distribution as defined by Sections 1 through 9 of this document.
+
+      "Licensor" shall mean the copyright owner or entity authorized by
+      the copyright owner that is granting the License.
+
+      "Legal Entity" shall mean the union of the acting entity and all
+      other entities that control, are controlled by, or are under common
+      control with that entity. For the purposes of this definition,
+      "control" means (i) the power, direct or indirect, to cause the
+      direction or management of such entity, whether by contract or
+      otherwise, or (ii) ownership of fifty percent (50%) or more of the
+      outstanding shares, or (iii) beneficial ownership of such entity.
+
+      "You" (or "Your") shall mean an individual or Legal Entity
+      exercising permissions granted by this License.
+
+      "Source" form shall mean the preferred form for making modifications,
+      including but not limited to software source code, documentation
+      source, and configuration files.
+
+      "Object" form shall mean any form resulting from mechanical
+      transformation or translation of a Source form, including but
+      not limited to compiled object code, generated documentation,
+      and conversions to other media types.
+
+      "Work" shall mean the work of authorship, whether in Source or
+      Object form, made available under the License, as indicated by a
+      copyright notice that is included in or attached to the work
+      (an example is provided in the Appendix below).
+
+      "Derivative Works" shall mean any work, whether in Source or Object
+      form, that is based on (or derived from) the Work and for which the
+      editorial revisions, annotations, elaborations, or other modifications
+      represent, as a whole, an original work of authorship. For the purposes
+      of this License, Derivative Works shall not include works that remain
+      separable from, or merely link (or bind by name) to the interfaces of,
+      the Work and Derivative Works thereof.
+
+      "Contribution" shall mean any work of authorship, including
+      the original version of the Work and any modifications or additions
+      to that Work or Derivative Works thereof, that is intentionally
+      submitted to Licensor for inclusion in the Work by the copyright owner
+      or by an individual or Legal Entity authorized to submit on behalf of
+      the copyright owner. For the purposes of this definition, "submitted"
+      means any form of electronic, verbal, or written communication sent
+      to the Licensor or its representatives, including but not limited to
+      communication on electronic mailing lists, source code control systems,
+      and issue tracking systems that are managed by, or on behalf of, the
+      Licensor for the purpose of discussing and improving the Work, but
+      excluding communication that is conspicuously marked or otherwise
+      designated in writing by the copyright owner as "Not a Contribution."
+
+      "Contributor" shall mean Licensor and any individual or Legal Entity
+      on behalf of whom a Contribution has been received by Licensor and
+      subsequently incorporated within the Work.
+
+   2. Grant of Copyright License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      copyright license to reproduce, prepare Derivative Works of,
+      publicly display, publicly perform, sublicense, and distribute the
+      Work and such Derivative Works in Source or Object form.
+
+   3. Grant of Patent License. Subject to the terms and conditions of
+      this License, each Contributor hereby grants to You a perpetual,
+      worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+      (except as stated in this section) patent license to make, have made,
+      use, offer to sell, sell, import, and otherwise transfer the Work,
+      where such license applies only to those patent claims licensable
+      by such Contributor that are necessarily infringed by their
+      Contribution(s) alone or by combination of their Contribution(s)
+      with the Work to which such Contribution(s) was submitted. If You
+      institute patent litigation against any entity (including a
+      cross-claim or counterclaim in a lawsuit) alleging that the Work
+      or a Contribution incorporated within the Work constitutes direct
+      or contributory patent infringement, then any patent licenses
+      granted to You under this License for that Work shall terminate
+      as of the date such litigation is filed.
+
+   4. Redistribution. You may reproduce and distribute copies of the
+      Work or Derivative Works thereof in any medium, with or without
+      modifications, and in Source or Object form, provided that You
+      meet the following conditions:
+
+      (a) You must give any other recipients of the Work or
+          Derivative Works a copy of this License; and
+
+      (b) You must cause any modified files to carry prominent notices
+          stating that You changed the files; and
+
+      (c) You must retain, in the Source form of any Derivative Works
+          that You distribute, all copyright, patent, trademark, and
+          attribution notices from the Source form of the Work,
+          excluding those notices that do not pertain to any part of
+          the Derivative Works; and
+
+      (d) If the Work includes a "NOTICE" text file as part of its
+          distribution, then any Derivative Works that You distribute must
+          include a readable copy of the attribution notices contained
+          within such NOTICE file, excluding those notices that do not
+          pertain to any part of the Derivative Works, in at least one
+          of the following places: within a NOTICE text file distributed
+          as part of the Derivative Works; within the Source form or
+          documentation, if provided along with the Derivative Works; or,
+          within a display generated by the Derivative Works, if and
+          wherever such third-party notices normally appear. The contents
+          of the NOTICE file are for informational purposes only and
+          do not modify the License. You may add Your own attribution
+          notices within Derivative Works that You distribute, alongside
+          or as an addendum to the NOTICE text from the Work, provided
+          that such additional attribution notices cannot be construed
+          as modifying the License.
+
+      You may add Your own copyright statement to Your modifications and
+      may provide additional or different license terms and conditions
+      for use, reproduction, or distribution of Your modifications, or
+      for any such Derivative Works as a whole, provided Your use,
+      reproduction, and distribution of the Work otherwise complies with
+      the conditions stated in this License.
+
+   5. Submission of Contributions. Unless You explicitly state otherwise,
+      any Contribution intentionally submitted for inclusion in the Work
+      by You to the Licensor shall be under the terms and conditions of
+      this License, without any additional terms or conditions.
+      Notwithstanding the above, nothing herein shall supersede or modify
+      the terms of any separate license agreement you may have executed
+      with Licensor regarding such Contributions.
+
+   6. Trademarks. This License does not grant permission to use the trade
+      names, trademarks, service marks, or product names of the Licensor,
+      except as required for reasonable and customary use in describing the
+      origin of the Work and reproducing the content of the NOTICE file.
+
+   7. Disclaimer of Warranty. Unless required by applicable law or
+      agreed to in writing, Licensor provides the Work (and each
+      Contributor provides its Contributions) on an "AS IS" BASIS,
+      WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+      implied, including, without limitation, any warranties or conditions
+      of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+      PARTICULAR PURPOSE. You are solely responsible for determining the
+      appropriateness of using or redistributing the Work and assume any
+      risks associated with Your exercise of permissions under this License.
+
+   8. Limitation of Liability. In no event and under no legal theory,
+      whether in tort (including negligence), contract, or otherwise,
+      unless required by applicable law (such as deliberate and grossly
+      negligent acts) or agreed to in writing, shall any Contributor be
+      liable to You for damages, including any direct, indirect, special,
+      incidental, or consequential damages of any character arising as a
+      result of this License or out of the use or inability to use the
+      Work (including but not limited to damages for loss of goodwill,
+      work stoppage, computer failure or malfunction, or any and all
+      other commercial damages or losses), even if such Contributor
+      has been advised of the possibility of such damages.
+
+   9. Accepting Warranty or Additional Liability. While redistributing
+      the Work or Derivative Works thereof, You may choose to offer,
+      and charge a fee for, acceptance of support, warranty, indemnity,
+      or other liability obligations and/or rights consistent with this
+      License. However, in accepting such obligations, You may act only
+      on Your own behalf and on Your sole responsibility, not on behalf
+      of any other Contributor, and only if You agree to indemnify,
+      defend, and hold each Contributor harmless for any liability
+      incurred by, or claims asserted against, such Contributor by reason
+      of your accepting any such warranty or additional liability.
+
+   END OF TERMS AND CONDITIONS
+
+   APPENDIX: How to apply the Apache License to your work.
+
+      To apply the Apache License to your work, attach the following
+      boilerplate notice, with the fields enclosed by brackets "[]"
+      replaced with your own identifying information. (Don't include
+      the brackets!)  The text should be enclosed in the appropriate
+      comment syntax for the file format. We also recommend that a
+      file or class name and description of purpose be included on the
+      same "printed page" as the copyright notice for easier
+      identification within third-party archives.
+
+   Copyright [yyyy] [name of copyright owner]
+
+   Licensed under the Apache License, Version 2.0 (the "License");
+   you may not use this file except in compliance with the License.
+   You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+
diff --git a/platforms/spring-boot/components-starter/camel-debezium-starter/src/main/resources/META-INF/NOTICE.txt b/platforms/spring-boot/components-starter/camel-debezium-starter/src/main/resources/META-INF/NOTICE.txt
new file mode 100644
index 0000000..2e215bf
--- /dev/null
+++ b/platforms/spring-boot/components-starter/camel-debezium-starter/src/main/resources/META-INF/NOTICE.txt
@@ -0,0 +1,11 @@
+   =========================================================================
+   ==  NOTICE file corresponding to the section 4 d of                    ==
+   ==  the Apache License, Version 2.0,                                   ==
+   ==  in this case for the Apache Camel distribution.                    ==
+   =========================================================================
+
+   This product includes software developed by
+   The Apache Software Foundation (http://www.apache.org/).
+
+   Please read the different LICENSE files present in the licenses directory of
+   this distribution.
diff --git a/platforms/spring-boot/components-starter/camel-debezium-starter/src/main/resources/META-INF/spring.factories b/platforms/spring-boot/components-starter/camel-debezium-starter/src/main/resources/META-INF/spring.factories
new file mode 100644
index 0000000..829ebf5
--- /dev/null
+++ b/platforms/spring-boot/components-starter/camel-debezium-starter/src/main/resources/META-INF/spring.factories
@@ -0,0 +1,19 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+##      http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+
+org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
+org.apache.camel.component.debezium.springboot.DebeziumComponentAutoConfiguration
diff --git a/platforms/spring-boot/components-starter/camel-debezium-starter/src/main/resources/META-INF/spring.provides b/platforms/spring-boot/components-starter/camel-debezium-starter/src/main/resources/META-INF/spring.provides
new file mode 100644
index 0000000..696294b
--- /dev/null
+++ b/platforms/spring-boot/components-starter/camel-debezium-starter/src/main/resources/META-INF/spring.provides
@@ -0,0 +1,17 @@
+## ---------------------------------------------------------------------------
+## Licensed to the Apache Software Foundation (ASF) under one or more
+## contributor license agreements.  See the NOTICE file distributed with
+## this work for additional information regarding copyright ownership.
+## The ASF licenses this file to You under the Apache License, Version 2.0
+## (the "License"); you may not use this file except in compliance with
+## the License.  You may obtain a copy of the License at
+##
+##      http://www.apache.org/licenses/LICENSE-2.0
+##
+## Unless required by applicable law or agreed to in writing, software
+## distributed under the License is distributed on an "AS IS" BASIS,
+## WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+## See the License for the specific language governing permissions and
+## limitations under the License.
+## ---------------------------------------------------------------------------
+provides: camel-debezium
diff --git a/platforms/spring-boot/components-starter/pom.xml b/platforms/spring-boot/components-starter/pom.xml
index 0ef3590..8aeb9dc 100644
--- a/platforms/spring-boot/components-starter/pom.xml
+++ b/platforms/spring-boot/components-starter/pom.xml
@@ -167,6 +167,7 @@
     <module>camel-cxf-transport-starter</module>
     <module>camel-dataformat-starter</module>
     <module>camel-dataset-starter</module>
+    <module>camel-debezium-starter</module>
     <module>camel-digitalocean-starter</module>
     <module>camel-direct-starter</module>
     <module>camel-directvm-starter</module>
diff --git a/platforms/spring-boot/spring-boot-dm/camel-spring-boot-dependencies/pom.xml b/platforms/spring-boot/spring-boot-dm/camel-spring-boot-dependencies/pom.xml
index b58c9b3..1fba710 100644
--- a/platforms/spring-boot/spring-boot-dm/camel-spring-boot-dependencies/pom.xml
+++ b/platforms/spring-boot/spring-boot-dm/camel-spring-boot-dependencies/pom.xml
@@ -1034,6 +1034,16 @@
       </dependency>
       <dependency>
         <groupId>org.apache.camel</groupId>
+        <artifactId>camel-debezium</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.camel</groupId>
+        <artifactId>camel-debezium-starter</artifactId>
+        <version>${project.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.camel</groupId>
         <artifactId>camel-digitalocean</artifactId>
         <version>${project.version}</version>
       </dependency>
diff --git a/tests/camel-itest-karaf/src/test/java/org/apache/camel/itest/karaf/CamelDebeziumTest.java b/tests/camel-itest-karaf/src/test/java/org/apache/camel/itest/karaf/CamelDebeziumTest.java
new file mode 100644
index 0000000..254f4e8
--- /dev/null
+++ b/tests/camel-itest-karaf/src/test/java/org/apache/camel/itest/karaf/CamelDebeziumTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.itest.karaf;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.ops4j.pax.exam.junit.PaxExam;
+
+@RunWith(PaxExam.class)
+public class CamelDebeziumTest extends BaseKarafTest {
+
+    public static final String COMPONENT = extractName(CamelDebeziumTest.class);
+
+    @Test
+    public void test() throws Exception {
+        testComponent(COMPONENT);
+    }
+
+}
diff --git a/tests/camel-itest-spring-boot/src/test/java/org/apache/camel/itest/springboot/CamelDebeziumTest.java b/tests/camel-itest-spring-boot/src/test/java/org/apache/camel/itest/springboot/CamelDebeziumTest.java
new file mode 100644
index 0000000..288cd2e
--- /dev/null
+++ b/tests/camel-itest-spring-boot/src/test/java/org/apache/camel/itest/springboot/CamelDebeziumTest.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.itest.springboot;
+
+import org.apache.camel.itest.springboot.util.ArquillianPackager;
+import org.jboss.arquillian.container.test.api.Deployment;
+import org.jboss.arquillian.junit.Arquillian;
+import org.jboss.shrinkwrap.api.Archive;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+
+@RunWith(Arquillian.class)
+public class CamelDebeziumTest extends AbstractSpringBootTestSupport {
+
+    @Deployment
+    public static Archive<?> createSpringBootPackage() throws Exception {
+        return ArquillianPackager.springBootPackage(createTestConfig());
+    }
+
+    public static ITestConfig createTestConfig() {
+        return new ITestConfigBuilder()
+                .module(inferModuleName(CamelDebeziumTest.class))
+                .build();
+    }
+
+    @Test
+    public void componentTests() throws Exception {
+        this.runComponentTest(config);
+        this.runModuleUnitTestsIfEnabled(config);
+    }
+}

Reply | Threaded
Open this post in threaded view
|

[camel] 02/13: Enhance the documentation with more information

acosentino
In reply to this post by acosentino
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 477c50a1353d47c86357b504c921e2a8295df51f
Author: Omar Al-Safi <[hidden email]>
AuthorDate: Thu Sep 5 16:21:57 2019 +0200

    Enhance the documentation with more information
---
 .../camel-debezium/src/main/docs/debezium-component.adoc      | 11 +++++++++--
 1 file changed, 9 insertions(+), 2 deletions(-)

diff --git a/components/camel-debezium/src/main/docs/debezium-component.adoc b/components/camel-debezium/src/main/docs/debezium-component.adoc
index ba137cc..e1785fc 100644
--- a/components/camel-debezium/src/main/docs/debezium-component.adoc
+++ b/components/camel-debezium/src/main/docs/debezium-component.adoc
@@ -191,8 +191,13 @@ from("debezium:mysql?name=dbz-test-1&offsetStorageFileName=/usr/offset-file-1.da
     .log("    the previous value is ${headers.CamelDebeziumBefore}")
 ----
 
-You can query the body as normal `Map` since this component contains a https://camel.apache.org/manual/latest/type-converter.html[Type Converter] that converts from
-from default output type of https://kafka.apache.org/22/javadoc/org/apache/kafka/connect/data/Struct.html[`Struct`] to `Map`. However, sometimes you may want to access the schema of the value, especially if you will perform special data conversion (to protobuf, avro .. etc), you can obtain https://kafka.apache.org/22/javadoc/org/apache/kafka/connect/data/Schema.html[`Schema`] type from `Struct` like this:
+By default, the component will emit the events in the body and `CamelDebeziumBefore` header as https://kafka.apache.org/22/javadoc/org/apache/kafka/connect/data/Struct.html[`Struct`] data type, the reasoning behind this, is to perceive the schema information in case is needed.
+However, the component as well contains a https://camel.apache.org/manual/latest/type-converter.html[Type Converter] that converts from
+from default output type of https://kafka.apache.org/22/javadoc/org/apache/kafka/connect/data/Struct.html[`Struct`] to `Map` in order to leverage Camel's rich https://camel.apache.org/manual/latest/data-format.html[Data Format] types which many of them work out of box with `Map` data type.
+To use it, you can either add `Map.class` type when you access the message e.g: `exchange.getIn().getBody(Map.class)`, or you can convert the body always to `Map` from the route builder by adding `.convertBodyTo(Map.class)` to your Camel Route DSL after `from` statement.
+
+We mentioned above about the schema, which can be used in case you need to perform advance data transformation and the schema is needed for that. If you choose not to convert your body to `Map`,
+you can obtain the schema information as https://kafka.apache.org/22/javadoc/org/apache/kafka/connect/data/Schema.html[`Schema`] type from `Struct` like this:
 [source,java]
 ----
 from("debezium:[connectorType]?[options]])
@@ -203,8 +208,10 @@ from("debezium:[connectorType]?[options]])
         log.info("Body value is :" + bodyValue);
         log.info("With Schema : " + schemaValue);
         log.info("And fields of :" + schemaValue.fields());
+        log.info("Field name has `" + schemaValue.field("name").schema() + "` type");
     });
 ----
 
 
+
 *Important Note:* This component is a thin wrapper around Debezium Engine as mentioned, therefore before using this component in production, you need to understand how Debezium works and how configurations can reflect the expected behavior, especially in regards to https://debezium.io/docs/embedded/#handling_failures[handling failures].

Reply | Threaded
Open this post in threaded view
|

[camel] 03/13: Update components/camel-debezium/src/main/docs/debezium-component.adoc

acosentino
In reply to this post by acosentino
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 125d386c3e3dce92aeec1826eb89bd680b49697b
Author: Omar Al-Safi <[hidden email]>
AuthorDate: Fri Sep 6 09:36:25 2019 +0200

    Update components/camel-debezium/src/main/docs/debezium-component.adoc
   
    Co-Authored-By: Zoran Regvart <[hidden email]>
---
 components/camel-debezium/src/main/docs/debezium-component.adoc | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/components/camel-debezium/src/main/docs/debezium-component.adoc b/components/camel-debezium/src/main/docs/debezium-component.adoc
index e1785fc..c4562cb 100644
--- a/components/camel-debezium/src/main/docs/debezium-component.adoc
+++ b/components/camel-debezium/src/main/docs/debezium-component.adoc
@@ -192,7 +192,7 @@ from("debezium:mysql?name=dbz-test-1&offsetStorageFileName=/usr/offset-file-1.da
 ----
 
 By default, the component will emit the events in the body and `CamelDebeziumBefore` header as https://kafka.apache.org/22/javadoc/org/apache/kafka/connect/data/Struct.html[`Struct`] data type, the reasoning behind this, is to perceive the schema information in case is needed.
-However, the component as well contains a https://camel.apache.org/manual/latest/type-converter.html[Type Converter] that converts from
+However, the component as well contains a xref:manual::type-converter.adoc[Type Converter] that converts from
 from default output type of https://kafka.apache.org/22/javadoc/org/apache/kafka/connect/data/Struct.html[`Struct`] to `Map` in order to leverage Camel's rich https://camel.apache.org/manual/latest/data-format.html[Data Format] types which many of them work out of box with `Map` data type.
 To use it, you can either add `Map.class` type when you access the message e.g: `exchange.getIn().getBody(Map.class)`, or you can convert the body always to `Map` from the route builder by adding `.convertBodyTo(Map.class)` to your Camel Route DSL after `from` statement.
 

Reply | Threaded
Open this post in threaded view
|

[camel] 04/13: Update components/camel-debezium/src/main/docs/debezium-component.adoc

acosentino
In reply to this post by acosentino
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 129ba315ced0b1e5bdee04078d87483d27d91360
Author: Omar Al-Safi <[hidden email]>
AuthorDate: Fri Sep 6 09:37:04 2019 +0200

    Update components/camel-debezium/src/main/docs/debezium-component.adoc
   
    Co-Authored-By: Zoran Regvart <[hidden email]>
---
 components/camel-debezium/src/main/docs/debezium-component.adoc | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/components/camel-debezium/src/main/docs/debezium-component.adoc b/components/camel-debezium/src/main/docs/debezium-component.adoc
index c4562cb..4035e13 100644
--- a/components/camel-debezium/src/main/docs/debezium-component.adoc
+++ b/components/camel-debezium/src/main/docs/debezium-component.adoc
@@ -193,7 +193,7 @@ from("debezium:mysql?name=dbz-test-1&offsetStorageFileName=/usr/offset-file-1.da
 
 By default, the component will emit the events in the body and `CamelDebeziumBefore` header as https://kafka.apache.org/22/javadoc/org/apache/kafka/connect/data/Struct.html[`Struct`] data type, the reasoning behind this, is to perceive the schema information in case is needed.
 However, the component as well contains a xref:manual::type-converter.adoc[Type Converter] that converts from
-from default output type of https://kafka.apache.org/22/javadoc/org/apache/kafka/connect/data/Struct.html[`Struct`] to `Map` in order to leverage Camel's rich https://camel.apache.org/manual/latest/data-format.html[Data Format] types which many of them work out of box with `Map` data type.
+from default output type of https://kafka.apache.org/22/javadoc/org/apache/kafka/connect/data/Struct.html[`Struct`] to `Map` in order to leverage Camel's rich xref:manual::data-format.adoc[Data Format] types which many of them work out of box with `Map` data type.
 To use it, you can either add `Map.class` type when you access the message e.g: `exchange.getIn().getBody(Map.class)`, or you can convert the body always to `Map` from the route builder by adding `.convertBodyTo(Map.class)` to your Camel Route DSL after `from` statement.
 
 We mentioned above about the schema, which can be used in case you need to perform advance data transformation and the schema is needed for that. If you choose not to convert your body to `Map`,

Reply | Threaded
Open this post in threaded view
|

[camel] 05/13: Update components/camel-debezium/src/main/docs/debezium-component.adoc

acosentino
In reply to this post by acosentino
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 75e4798b65ad3703db201fd31829f42d4fad16b9
Author: Omar Al-Safi <[hidden email]>
AuthorDate: Fri Sep 6 16:22:36 2019 +0200

    Update components/camel-debezium/src/main/docs/debezium-component.adoc
   
    Co-Authored-By: Gunnar Morling <[hidden email]>
---
 components/camel-debezium/src/main/docs/debezium-component.adoc | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/components/camel-debezium/src/main/docs/debezium-component.adoc b/components/camel-debezium/src/main/docs/debezium-component.adoc
index 4035e13..74122a2 100644
--- a/components/camel-debezium/src/main/docs/debezium-component.adoc
+++ b/components/camel-debezium/src/main/docs/debezium-component.adoc
@@ -6,7 +6,7 @@
 The Debezium component is wrapper around https://debezium.io/[Debezium] using https://debezium.io/docs/embedded/[Debezium Embedded], which enabled Change Data Capture from various databases without the need of Kafka or Kafka Connect.
 
 *Note on handling failures:* Per https://debezium.io/docs/embedded/#handling_failures[Debezium Embedded Engine] documentation, the engines is actively recording source offsets and periodically flushes these offsets to a persistent storage, so when the application is restarted or crashed, the engine will resume from the last recorded offset.
-Thus, at normal operation, your downstream routes will receive exactly once event, however in case of application crash (not having a graceful shutdown), the application will resume from the last recorded offset,
+Thus, at normal operation, your downstream routes will receive each event exactly once, however in case of an application crash (not having a graceful shutdown), the application will resume from the last recorded offset,
 which may result of receiving duplicate events immediately after the restart. Therefore, your downstream routes should be tolerant enough of such case and deduplicate events if needed.
 
 Maven users will need to add the following dependency to their `pom.xml`

Reply | Threaded
Open this post in threaded view
|

[camel] 06/13: Update components/camel-debezium/src/main/docs/debezium-component.adoc

acosentino
In reply to this post by acosentino
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 075d9398ef6b0fdeafebe8f2226af87d220b605b
Author: Omar Al-Safi <[hidden email]>
AuthorDate: Fri Sep 6 16:23:22 2019 +0200

    Update components/camel-debezium/src/main/docs/debezium-component.adoc
   
    Co-Authored-By: Gunnar Morling <[hidden email]>
---
 components/camel-debezium/src/main/docs/debezium-component.adoc | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/components/camel-debezium/src/main/docs/debezium-component.adoc b/components/camel-debezium/src/main/docs/debezium-component.adoc
index 74122a2..8cd3526 100644
--- a/components/camel-debezium/src/main/docs/debezium-component.adoc
+++ b/components/camel-debezium/src/main/docs/debezium-component.adoc
@@ -7,7 +7,7 @@ The Debezium component is wrapper around https://debezium.io/[Debezium] using ht
 
 *Note on handling failures:* Per https://debezium.io/docs/embedded/#handling_failures[Debezium Embedded Engine] documentation, the engines is actively recording source offsets and periodically flushes these offsets to a persistent storage, so when the application is restarted or crashed, the engine will resume from the last recorded offset.
 Thus, at normal operation, your downstream routes will receive each event exactly once, however in case of an application crash (not having a graceful shutdown), the application will resume from the last recorded offset,
-which may result of receiving duplicate events immediately after the restart. Therefore, your downstream routes should be tolerant enough of such case and deduplicate events if needed.
+which may result in receiving duplicate events immediately after the restart. Therefore, your downstream routes should be tolerant enough of such case and deduplicate events if needed.
 
 Maven users will need to add the following dependency to their `pom.xml`
 for this component.

Reply | Threaded
Open this post in threaded view
|

[camel] 07/13: Update components/camel-debezium/src/main/docs/debezium-component.adoc

acosentino
In reply to this post by acosentino
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 858676c6e8ea79210bb0c5e5bbec84034cf5d59e
Author: Omar Al-Safi <[hidden email]>
AuthorDate: Fri Sep 6 16:23:48 2019 +0200

    Update components/camel-debezium/src/main/docs/debezium-component.adoc
   
    Co-Authored-By: Gunnar Morling <[hidden email]>
---
 components/camel-debezium/src/main/docs/debezium-component.adoc | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/components/camel-debezium/src/main/docs/debezium-component.adoc b/components/camel-debezium/src/main/docs/debezium-component.adoc
index 8cd3526..2cc8cec 100644
--- a/components/camel-debezium/src/main/docs/debezium-component.adoc
+++ b/components/camel-debezium/src/main/docs/debezium-component.adoc
@@ -150,7 +150,7 @@ The component supports 3 options, which are listed below.
 | Name | Description | Default | Type
 | *camel.component.debezium.basic-property-binding* | Whether the component should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities | false | Boolean
 | *camel.component.debezium.configuration* | Allow pre-configured Configurations to be set, you will need to extend EmbeddedDebeziumConfiguration in order to create the configuration for the component. The option is a org.apache.camel.component.debezium.configuration.EmbeddedDebeziumConfiguration type. |  | String
-| *camel.component.debezium.enabled* | Whether to enable auto configuration of the debezium component. This is enabled by default. |  | Boolean
+| *camel.component.debezium.enabled* | Whether to enable auto configuration of the Debezium component. This is enabled by default. |  | Boolean
 |===
 // spring-boot-auto-configure options: END
 

Reply | Threaded
Open this post in threaded view
|

[camel] 08/13: Update components/camel-debezium/src/main/docs/debezium-component.adoc

acosentino
In reply to this post by acosentino
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 4c2a80b9310597fe663d0887609df9c456c39a1e
Author: Omar Al-Safi <[hidden email]>
AuthorDate: Fri Sep 6 16:24:08 2019 +0200

    Update components/camel-debezium/src/main/docs/debezium-component.adoc
   
    Co-Authored-By: Gunnar Morling <[hidden email]>
---
 components/camel-debezium/src/main/docs/debezium-component.adoc | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/components/camel-debezium/src/main/docs/debezium-component.adoc b/components/camel-debezium/src/main/docs/debezium-component.adoc
index 2cc8cec..fbd9edf 100644
--- a/components/camel-debezium/src/main/docs/debezium-component.adoc
+++ b/components/camel-debezium/src/main/docs/debezium-component.adoc
@@ -168,7 +168,7 @@ The following headers are available when consuming messages from Kafka.
 | Header constant                           | Header value                                   | Type        | Description
 | DebeziumConstants.HEADER_IDENTIFIER       | "CamelDebeziumIdentifier"                      | String      | The identifier of the connector, normally is this format "{server-name}.{database-name}.{table-name}".
 | DebeziumConstants.HEADER_KEY              | "CamelDebeziumKey"                             | Object      | The key of the event, normally is the table Primary Key.
-| DebeziumConstants.HEADER_SOURCE_METADATA  | "CamelDebeziumSourceMetadata"                  | Map         | The metadata about the source event, for example `table` name, database `name`, log position, etc, please refer to debezium documentation for more info.
+| DebeziumConstants.HEADER_SOURCE_METADATA  | "CamelDebeziumSourceMetadata"                  | Map         | The metadata about the source event, for example `table` name, database `name`, log position, etc, please refer to the Debezium documentation for more info.
 | DebeziumConstants.HEADER_OPERATION        | "CamelDebeziumOperation"                       | String      | If presents, the type of event operation. Values for the connector are c for create (or insert), u for update, d for delete.
 | DebeziumConstants.HEADER_TIMESTAMP        | "CamelDebeziumTimestamp"                       | Long        | If presents, the time (using the system clock in the JVM) at which the connector processed the event.
 | DebeziumConstants.HEADER_BEFORE           | "CamelDebeziumBefore"                          | Map/Struct  | If presents, contains the state of the row before the event occurred.

Reply | Threaded
Open this post in threaded view
|

[camel] 09/13: Update components/camel-debezium/src/main/docs/debezium-component.adoc

acosentino
In reply to this post by acosentino
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 3f738370de3b3efc5113fd42dcce3093f16131d5
Author: Omar Al-Safi <[hidden email]>
AuthorDate: Fri Sep 6 16:24:21 2019 +0200

    Update components/camel-debezium/src/main/docs/debezium-component.adoc
   
    Co-Authored-By: Gunnar Morling <[hidden email]>
---
 components/camel-debezium/src/main/docs/debezium-component.adoc | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/components/camel-debezium/src/main/docs/debezium-component.adoc b/components/camel-debezium/src/main/docs/debezium-component.adoc
index fbd9edf..8b2ecdf 100644
--- a/components/camel-debezium/src/main/docs/debezium-component.adoc
+++ b/components/camel-debezium/src/main/docs/debezium-component.adoc
@@ -162,7 +162,7 @@ https://debezium.io/docs/connectors/mysql/#connector-properties[https://debezium
 
 === Consumer headers
 
-The following headers are available when consuming messages from Kafka.
+The following headers are available when consuming change events from Debezium.
 [width="100%",cols="2m,2m,1m,5",options="header"]
 |===
 | Header constant                           | Header value                                   | Type        | Description

Reply | Threaded
Open this post in threaded view
|

[camel] 10/13: Update components/camel-debezium/src/main/docs/debezium-component.adoc

acosentino
In reply to this post by acosentino
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 03c775e86f88745dadbd8d5eaab6eab333e5ba29
Author: Omar Al-Safi <[hidden email]>
AuthorDate: Fri Sep 6 16:24:32 2019 +0200

    Update components/camel-debezium/src/main/docs/debezium-component.adoc
   
    Co-Authored-By: Gunnar Morling <[hidden email]>
---
 components/camel-debezium/src/main/docs/debezium-component.adoc | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/components/camel-debezium/src/main/docs/debezium-component.adoc b/components/camel-debezium/src/main/docs/debezium-component.adoc
index 8b2ecdf..c0ea77c 100644
--- a/components/camel-debezium/src/main/docs/debezium-component.adoc
+++ b/components/camel-debezium/src/main/docs/debezium-component.adoc
@@ -178,7 +178,7 @@ The following headers are available when consuming change events from Debezium.
 
 === Consuming events
 
-Here is a very simple route that you can use in order to listen to Debezium events from MySql connector.
+Here is a very simple route that you can use in order to listen to Debezium events from MySQL connector.
 [source,java]
 ----
 from("debezium:mysql?name=dbz-test-1&offsetStorageFileName=/usr/offset-file-1.dat&databaseHostName=localhost&databaseUser=debezium&databasePassword=dbz&databaseServerName=my-app-connector&databaseHistoryFileName=/usr/history-file-1.dat")

Reply | Threaded
Open this post in threaded view
|

[camel] 11/13: Improve the documentation and fix debezium links

acosentino
In reply to this post by acosentino
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 811f03e426ccd2fa67ba766976c0dd110f564258
Author: Omar Al-Safi <[hidden email]>
AuthorDate: Fri Sep 6 16:47:00 2019 +0200

    Improve the documentation and fix debezium links
---
 .../src/main/docs/debezium-component.adoc              | 18 ++++++++++--------
 1 file changed, 10 insertions(+), 8 deletions(-)

diff --git a/components/camel-debezium/src/main/docs/debezium-component.adoc b/components/camel-debezium/src/main/docs/debezium-component.adoc
index c0ea77c..b321238 100644
--- a/components/camel-debezium/src/main/docs/debezium-component.adoc
+++ b/components/camel-debezium/src/main/docs/debezium-component.adoc
@@ -3,9 +3,9 @@
 
 *Available as of Camel version 3.0*
 
-The Debezium component is wrapper around https://debezium.io/[Debezium] using https://debezium.io/docs/embedded/[Debezium Embedded], which enabled Change Data Capture from various databases without the need of Kafka or Kafka Connect.
+The Debezium component is wrapper around https://debezium.io/[Debezium] using https://debezium.io/documentation/reference/0.9/operations/embedded.html[Debezium Embedded], which enabled Change Data Capture from various databases without the need of Kafka or Kafka Connect.
 
-*Note on handling failures:* Per https://debezium.io/docs/embedded/#handling_failures[Debezium Embedded Engine] documentation, the engines is actively recording source offsets and periodically flushes these offsets to a persistent storage, so when the application is restarted or crashed, the engine will resume from the last recorded offset.
+*Note on handling failures:* Per https://debezium.io/documentation/reference/0.9/operations/embedded.html#_handling_failures[Debezium Embedded Engine] documentation, the engines is actively recording source offsets and periodically flushes these offsets to a persistent storage, so when the application is restarted or crashed, the engine will resume from the last recorded offset.
 Thus, at normal operation, your downstream routes will receive each event exactly once, however in case of an application crash (not having a graceful shutdown), the application will resume from the last recorded offset,
 which may result in receiving duplicate events immediately after the restart. Therefore, your downstream routes should be tolerant enough of such case and deduplicate events if needed.
 
@@ -31,7 +31,9 @@ debezium:connector-type[?options]
 ---------------------------
 
 == Supported Debezium Connectors
-- https://debezium.io/docs/connectors/mysql/[MySql].
+- https://debezium.io/documentation/reference/0.9/connectors/mysql.html[MySql].
+
+*Note:* Other Debezium connectors are _not_ supported at the moment.
 
 
 == Options
@@ -66,7 +68,7 @@ with the following path and query parameters:
 [width="100%",cols="2,5,^1,2",options="header"]
 |===
 | Name | Description | Default | Type
-| *connectorType* | *Required* The Debezium connector type that is supported by Camel Debezium component. |  | String
+| *connectorType* | *Required* The Debezium connector type that is supported by Camel Debezium component. Allowed values: `mysql`. |  | String
 |===
 
 
@@ -155,8 +157,8 @@ The component supports 3 options, which are listed below.
 // spring-boot-auto-configure options: END
 
 For more information about configuration:
-https://debezium.io/docs/embedded/#engine-properties[https://debezium.io/docs/embedded/#engine-properties]
-https://debezium.io/docs/connectors/mysql/#connector-properties[https://debezium.io/docs/connectors/mysql/#connector-properties]
+https://debezium.io/documentation/reference/0.9/operations/embedded.html#engine-properties[https://debezium.io/documentation/reference/0.9/operations/embedded.html#engine-properties]
+https://debezium.io/documentation/reference/0.9/connectors/mysql.html#connector-properties[https://debezium.io/documentation/reference/0.9/connectors/mysql.html#connector-properties]
 
 == Message headers
 
@@ -169,7 +171,7 @@ The following headers are available when consuming change events from Debezium.
 | DebeziumConstants.HEADER_IDENTIFIER       | "CamelDebeziumIdentifier"                      | String      | The identifier of the connector, normally is this format "{server-name}.{database-name}.{table-name}".
 | DebeziumConstants.HEADER_KEY              | "CamelDebeziumKey"                             | Object      | The key of the event, normally is the table Primary Key.
 | DebeziumConstants.HEADER_SOURCE_METADATA  | "CamelDebeziumSourceMetadata"                  | Map         | The metadata about the source event, for example `table` name, database `name`, log position, etc, please refer to the Debezium documentation for more info.
-| DebeziumConstants.HEADER_OPERATION        | "CamelDebeziumOperation"                       | String      | If presents, the type of event operation. Values for the connector are c for create (or insert), u for update, d for delete.
+| DebeziumConstants.HEADER_OPERATION        | "CamelDebeziumOperation"                       | String      | If presents, the type of event operation. Values for the connector are `c` for create (or insert), `u` for update, `d` for delete or `r` in case of a snapshot event.
 | DebeziumConstants.HEADER_TIMESTAMP        | "CamelDebeziumTimestamp"                       | Long        | If presents, the time (using the system clock in the JVM) at which the connector processed the event.
 | DebeziumConstants.HEADER_BEFORE           | "CamelDebeziumBefore"                          | Map/Struct  | If presents, contains the state of the row before the event occurred.
 |===
@@ -214,4 +216,4 @@ from("debezium:[connectorType]?[options]])
 
 
 
-*Important Note:* This component is a thin wrapper around Debezium Engine as mentioned, therefore before using this component in production, you need to understand how Debezium works and how configurations can reflect the expected behavior, especially in regards to https://debezium.io/docs/embedded/#handling_failures[handling failures].
+*Important Note:* This component is a thin wrapper around Debezium Engine as mentioned, therefore before using this component in production, you need to understand how Debezium works and how configurations can reflect the expected behavior, especially in regards to https://debezium.io/documentation/reference/0.9/operations/embedded.html#_handling_failures[handling failures].

Reply | Threaded
Open this post in threaded view
|

[camel] 12/13: CAMEL-12543 - Fixed log configuration

acosentino
In reply to this post by acosentino
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 24961a96bdafdb5a9257765ad92f7147054b2cb3
Author: Andrea Cosentino <[hidden email]>
AuthorDate: Tue Sep 10 12:00:25 2019 +0200

    CAMEL-12543 - Fixed log configuration
---
 .../camel-debezium/src/main/docs/debezium-component.adoc      |  2 +-
 .../camel-debezium/src/test/resources/log4j2.properties       | 11 +----------
 2 files changed, 2 insertions(+), 11 deletions(-)

diff --git a/components/camel-debezium/src/main/docs/debezium-component.adoc b/components/camel-debezium/src/main/docs/debezium-component.adoc
index b321238..1598047 100644
--- a/components/camel-debezium/src/main/docs/debezium-component.adoc
+++ b/components/camel-debezium/src/main/docs/debezium-component.adoc
@@ -68,7 +68,7 @@ with the following path and query parameters:
 [width="100%",cols="2,5,^1,2",options="header"]
 |===
 | Name | Description | Default | Type
-| *connectorType* | *Required* The Debezium connector type that is supported by Camel Debezium component. Allowed values: `mysql`. |  | String
+| *connectorType* | *Required* The Debezium connector type that is supported by Camel Debezium component. |  | String
 |===
 
 
diff --git a/components/camel-debezium/src/test/resources/log4j2.properties b/components/camel-debezium/src/test/resources/log4j2.properties
index 0b3e988..36721e2 100644
--- a/components/camel-debezium/src/test/resources/log4j2.properties
+++ b/components/camel-debezium/src/test/resources/log4j2.properties
@@ -27,14 +27,5 @@ appender.out.layout.type = PatternLayout
 appender.out.layout.pattern = %d [%-15.15t] %-5p %-30.30c{1} - %m%n
 
 rootLogger.level = INFO
-rootLogger.appenderRef.out.ref = out
-
-logger.camel.name=org.apache.camel
-logger.camel.level=INFO
-
-logger.camel-debezium.name=org.apache.camel.component.debezium
-logger.camel-debezium.level=INFO
-
-logger.debezium.name=io.debezium
-logger.debezium.level=INFO
+rootLogger.appenderRef.file.ref = file
 

Reply | Threaded
Open this post in threaded view
|

[camel] 13/13: CAMEL-12543 - Regen docs

acosentino
In reply to this post by acosentino
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit c3fe401428a6074f5122c7b9233c7883c62614ab
Author: Andrea Cosentino <[hidden email]>
AuthorDate: Tue Sep 10 12:46:39 2019 +0200

    CAMEL-12543 - Regen docs
---
 components/camel-debezium/src/main/docs/debezium-component.adoc      | 2 +-
 components/readme.adoc                                               | 5 ++++-
 docs/components/modules/ROOT/nav.adoc                                | 1 +
 .../components/modules/ROOT/pages}/debezium-component.adoc           | 3 ++-
 4 files changed, 8 insertions(+), 3 deletions(-)

diff --git a/components/camel-debezium/src/main/docs/debezium-component.adoc b/components/camel-debezium/src/main/docs/debezium-component.adoc
index 1598047..9935c8c 100644
--- a/components/camel-debezium/src/main/docs/debezium-component.adoc
+++ b/components/camel-debezium/src/main/docs/debezium-component.adoc
@@ -152,7 +152,7 @@ The component supports 3 options, which are listed below.
 | Name | Description | Default | Type
 | *camel.component.debezium.basic-property-binding* | Whether the component should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities | false | Boolean
 | *camel.component.debezium.configuration* | Allow pre-configured Configurations to be set, you will need to extend EmbeddedDebeziumConfiguration in order to create the configuration for the component. The option is a org.apache.camel.component.debezium.configuration.EmbeddedDebeziumConfiguration type. |  | String
-| *camel.component.debezium.enabled* | Whether to enable auto configuration of the Debezium component. This is enabled by default. |  | Boolean
+| *camel.component.debezium.enabled* | Whether to enable auto configuration of the debezium component. This is enabled by default. |  | Boolean
 |===
 // spring-boot-auto-configure options: END
 
diff --git a/components/readme.adoc b/components/readme.adoc
index 539881e..74f10cd 100644
--- a/components/readme.adoc
+++ b/components/readme.adoc
@@ -1,7 +1,7 @@
 = Components
 
 // components: START
-Number of Components: 298 in 236 JAR artifacts (0 deprecated)
+Number of Components: 299 in 237 JAR artifacts (0 deprecated)
 
 [width="100%",cols="4,1,5",options="header"]
 |===
@@ -217,6 +217,9 @@ Number of Components: 298 in 236 JAR artifacts (0 deprecated)
 | link:camel-dataset/src/main/docs/dataset-test-component.adoc[DataSet Test] (camel-dataset) +
 `dataset-test:name` | 1.3 | The dataset-test component extends the mock component by on startup to pull messages from another endpoint to set the expected message bodies.
 
+| link:camel-debezium/src/main/docs/debezium-component.adoc[Debezium] (camel-debezium) +
+`debezium:connectorType` | 3.0 | Represents a Debezium endpoint which is used for interacting with Debezium embedded engine.
+
 | link:camel-digitalocean/src/main/docs/digitalocean-component.adoc[DigitalOcean] (camel-digitalocean) +
 `digitalocean:operation` | 2.19 | The DigitalOcean component allows you to manage Droplets and resources within the DigitalOcean cloud.
 
diff --git a/docs/components/modules/ROOT/nav.adoc b/docs/components/modules/ROOT/nav.adoc
index 545ea60..d983783 100644
--- a/docs/components/modules/ROOT/nav.adoc
+++ b/docs/components/modules/ROOT/nav.adoc
@@ -86,6 +86,7 @@
 * xref:dataformat-component.adoc[Data Format Component]
 * xref:dataset-component.adoc[Dataset Component]
 * xref:dataset-test-component.adoc[DataSet Test Component]
+* xref:debezium-component.adoc[Debezium Component]
 * xref:digitalocean-component.adoc[DigitalOcean Component]
 * xref:direct-component.adoc[Direct Component]
 * xref:direct-vm-component.adoc[Direct VM Component]
diff --git a/components/camel-debezium/src/main/docs/debezium-component.adoc b/docs/components/modules/ROOT/pages/debezium-component.adoc
similarity index 99%
copy from components/camel-debezium/src/main/docs/debezium-component.adoc
copy to docs/components/modules/ROOT/pages/debezium-component.adoc
index 1598047..130b8b4 100644
--- a/components/camel-debezium/src/main/docs/debezium-component.adoc
+++ b/docs/components/modules/ROOT/pages/debezium-component.adoc
@@ -1,5 +1,6 @@
 [[debezium-component]]
 = Debezium Component
+:page-source: components/camel-debezium/src/main/docs/debezium-component.adoc
 
 *Available as of Camel version 3.0*
 
@@ -152,7 +153,7 @@ The component supports 3 options, which are listed below.
 | Name | Description | Default | Type
 | *camel.component.debezium.basic-property-binding* | Whether the component should use basic property binding (Camel 2.x) or the newer property binding with additional capabilities | false | Boolean
 | *camel.component.debezium.configuration* | Allow pre-configured Configurations to be set, you will need to extend EmbeddedDebeziumConfiguration in order to create the configuration for the component. The option is a org.apache.camel.component.debezium.configuration.EmbeddedDebeziumConfiguration type. |  | String
-| *camel.component.debezium.enabled* | Whether to enable auto configuration of the Debezium component. This is enabled by default. |  | Boolean
+| *camel.component.debezium.enabled* | Whether to enable auto configuration of the debezium component. This is enabled by default. |  | Boolean
 |===
 // spring-boot-auto-configure options: END