[camel-kafka-connector] branch add-struct-check created (now 68dbfb8)

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

[camel-kafka-connector] branch add-struct-check created (now 68dbfb8)

oalsafi
This is an automated email from the ASF dual-hosted git repository.

oalsafi pushed a change to branch add-struct-check
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git.


      at 68dbfb8  Convert Struct to Map

This branch includes the following new commits:

     new 68dbfb8  Convert Struct to Map

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Reply | Threaded
Open this post in threaded view
|

[camel-kafka-connector] 01/01: Convert Struct to Map

oalsafi
This is an automated email from the ASF dual-hosted git repository.

oalsafi pushed a commit to branch add-struct-check
in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git

commit 68dbfb8c1b42aec6ecdce82b0cb4195b359d5ea2
Author: Omar Al-Safi <[hidden email]>
AuthorDate: Tue Feb 23 10:35:27 2021 +0100

    Convert Struct to Map
---
 .../apache/camel/kafkaconnector/CamelSinkTask.java | 35 ++++++++--
 .../camel/kafkaconnector/CamelSinkTaskTest.java    | 77 ++++++++++++++++++----
 2 files changed, 97 insertions(+), 15 deletions(-)

diff --git a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
index 82c16d2..1e8fa43 100644
--- a/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
+++ b/core/src/main/java/org/apache/camel/kafkaconnector/CamelSinkTask.java
@@ -18,6 +18,7 @@ package org.apache.camel.kafkaconnector;
 
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 
@@ -33,7 +34,9 @@ import org.apache.camel.support.DefaultExchange;
 import org.apache.camel.util.StringHelper;
 import org.apache.kafka.connect.data.Decimal;
 import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.errors.DataException;
 import org.apache.kafka.connect.header.Header;
 import org.apache.kafka.connect.sink.ErrantRecordReporter;
 import org.apache.kafka.connect.sink.SinkRecord;
@@ -110,7 +113,7 @@ public class CamelSinkTask extends SinkTask {
             final String headersRemovePattern = config.getString(CamelSinkConnectorConfig.CAMEL_CONNECTOR_REMOVE_HEADERS_PATTERN_CONF);
             mapProperties = config.getBoolean(CamelSinkConnectorConfig.CAMEL_CONNECTOR_MAP_PROPERTIES_CONF);
             mapHeaders = config.getBoolean(CamelSinkConnectorConfig.CAMEL_CONNECTOR_MAP_HEADERS_CONF);
-            
+
             CamelContext camelContext = new DefaultCamelContext();
             if (remoteUrl == null) {
                 remoteUrl = TaskHelper.buildUrl(camelContext,
@@ -175,8 +178,8 @@ public class CamelSinkTask extends SinkTask {
             TaskHelper.logRecordContent(LOG, loggingLevel, record);
 
             Exchange exchange = new DefaultExchange(producer.getCamelContext());
-            exchange.getMessage().setBody(record.value());
-            exchange.getMessage().setHeader(KAFKA_RECORD_KEY_HEADER, record.key());
+            exchange.getMessage().setBody(convertValueFromStruct(record.valueSchema(), record.value()));
+            exchange.getMessage().setHeader(KAFKA_RECORD_KEY_HEADER, convertValueFromStruct(record.keySchema(), record.key()));
 
             for (Header header : record.headers()) {
                 if (header.key().startsWith(HEADER_CAMEL_PREFIX)) {
@@ -232,8 +235,32 @@ public class CamelSinkTask extends SinkTask {
         if (schema.type().equals(Schema.BYTES_SCHEMA.type()) && Objects.equals(schema.name(), Decimal.LOGICAL_NAME)) {
             destination.put(key, Decimal.toLogical(schema, (byte[]) header.value()));
         } else {
-            destination.put(key, header.value());
+            destination.put(key, convertValueFromStruct(header.schema(), header.value()));
+        }
+    }
+
+    private static Object convertValueFromStruct(Schema schema, Object value) {
+        // if we have a schema of type Struct, we convert it to map, otherwise
+        // we just return the value as it
+        if (schema != null && value != null && Schema.Type.STRUCT == schema.type()) {
+            return toMap((Struct) value);
         }
+
+        return value;
+    }
+
+    private static Map<String, Object> toMap(final Struct struct) {
+        final HashMap<String, Object> fieldsToValues = new HashMap<>();
+
+        struct.schema().fields().forEach(field -> {
+            try {
+                fieldsToValues.put(field.name(), struct.get(field));
+            } catch (DataException e) {
+                fieldsToValues.put(field.name(), null);
+            }
+        });
+
+        return fieldsToValues;
     }
 
     CamelKafkaConnectMain getCms() {
diff --git a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
index bab0a5d..0316b2f 100644
--- a/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
+++ b/core/src/test/java/org/apache/camel/kafkaconnector/CamelSinkTaskTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.connect.data.Decimal;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.junit.jupiter.api.Test;
@@ -74,6 +75,52 @@ public class CamelSinkTaskTest {
     }
 
     @Test
+    public void testStructBody() {
+        Map<String, String> props = new HashMap<>();
+        props.put(TOPIC_CONF, TOPIC_NAME);
+        props.put(CamelSinkConnectorConfig.CAMEL_SINK_URL_CONF, SEDA_URI);
+
+        CamelSinkTask sinkTask = new CamelSinkTask();
+        sinkTask.start(props);
+
+        List<SinkRecord> records = new ArrayList<SinkRecord>();
+        Schema keySchema = SchemaBuilder.struct()
+                .name("keySchema")
+                .field("id", Schema.INT32_SCHEMA)
+                .build();
+
+        Schema valueSchema = SchemaBuilder.struct()
+                .name("valueSchema")
+                .field("id", SchemaBuilder.INT32_SCHEMA)
+                .field("name", SchemaBuilder.STRING_SCHEMA)
+                .field("isAdult", SchemaBuilder.BOOLEAN_SCHEMA)
+                .build();
+
+        Struct key = new Struct(keySchema).put("id", 12);
+        Struct value = new Struct(valueSchema)
+                .put("id", 12)
+                .put("name", "jane doe")
+                .put("isAdult", true);
+
+        SinkRecord record = new SinkRecord(TOPIC_NAME, 1, keySchema, key, valueSchema, value, 42);
+        records.add(record);
+        sinkTask.put(records);
+
+        ConsumerTemplate consumer = sinkTask.getCms().getConsumerTemplate();
+        Exchange exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
+
+        assertEquals("jane doe", exchange.getMessage().getBody(Map.class).get("name"));
+        assertEquals(12, exchange.getMessage().getBody(Map.class).get("id"));
+        assertTrue((Boolean) exchange.getMessage().getBody(Map.class).get("isAdult"));
+
+        assertEquals(12, ((Map) exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER)).get("id"));
+        assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
+                .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
+
+        sinkTask.stop();
+    }
+
+    @Test
     public void testTopicsRegex() {
         Map<String, String> props = new HashMap<>();
         props.put("topics.regex", "topic1*");
@@ -120,6 +167,12 @@ public class CamelSinkTaskTest {
         BigDecimal myBigDecimal = new BigDecimal(1234567890);
         Schema schema = Decimal.schema(myBigDecimal.scale());
 
+        Schema headerStruct = SchemaBuilder.struct()
+                .field("myHeader", Schema.STRING_SCHEMA)
+                .build();
+
+        Struct headerStructValue = new Struct(headerStruct).put("myHeader", "structHeader");
+
         List<SinkRecord> records = new ArrayList<SinkRecord>();
         SinkRecord record = new SinkRecord(TOPIC_NAME, 1, null, "test", null, "camel", 42);
         record.headers().addBoolean(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyBoolean", true);
@@ -130,6 +183,7 @@ public class CamelSinkTaskTest {
         record.headers().addInt(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyInteger", myInteger);
         record.headers().addLong(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyLong", myLong);
         record.headers().add(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyBigDecimal", Decimal.fromLogical(schema, myBigDecimal), schema);
+        record.headers().addStruct(CamelSinkTask.HEADER_CAMEL_PREFIX + "MyStruct", headerStructValue);
         records.add(record);
         sinkTask.put(records);
 
@@ -145,10 +199,11 @@ public class CamelSinkTaskTest {
         assertEquals(myInteger, exchange.getIn().getHeader("MyInteger"));
         assertEquals(myLong, exchange.getIn().getHeader("MyLong", Long.class));
         assertEquals(myBigDecimal, exchange.getIn().getHeader("MyBigDecimal", BigDecimal.class));
+        assertEquals("structHeader", exchange.getIn().getHeader("MyStruct", Map.class).get("myHeader"));
 
         sinkTask.stop();
     }
-    
+
     @Test
     public void testBodyAndHeadersExclusions() {
         Map<String, String> props = new HashMap<>();
@@ -196,7 +251,7 @@ public class CamelSinkTaskTest {
 
         sinkTask.stop();
     }
-    
+
     @Test
     public void testBodyAndHeadersExclusionsRegex() {
         Map<String, String> props = new HashMap<>();
@@ -671,7 +726,7 @@ public class CamelSinkTaskTest {
 
         sinkTask.stop();
     }
-    
+
     @Test
     public void testBodyAndPropertiesHeadersMixedWithoutPropertiesAndHeadersMapping() {
         Map<String, String> props = new HashMap<>();
@@ -730,7 +785,7 @@ public class CamelSinkTaskTest {
 
         sinkTask.stop();
     }
-    
+
     @Test
     public void testBodyAndPropertiesHeadersMixedWithoutPropertiesMapping() {
         Map<String, String> props = new HashMap<>();
@@ -879,7 +934,7 @@ public class CamelSinkTaskTest {
 
         sinkTask.stop();
     }
-    
+
     @Test
     public void testAggregationWithIdempotencyBodyAndTimeout() throws InterruptedException {
         Map<String, String> props = new HashMap<>();
@@ -933,7 +988,7 @@ public class CamelSinkTaskTest {
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
         assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
             .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
-        
+
         exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel5 camel6 camel7 camel8 camel9", exchange.getMessage().getBody());
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
@@ -942,7 +997,7 @@ public class CamelSinkTaskTest {
 
         sinkTask.stop();
     }
-    
+
     @Test
     public void testWithIdempotency() throws InterruptedException {
         Map<String, String> props = new HashMap<>();
@@ -993,13 +1048,13 @@ public class CamelSinkTaskTest {
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
         assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
             .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
-        
+
         exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel1", exchange.getMessage().getBody());
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
         assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
             .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
-        
+
         exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel2", exchange.getMessage().getBody());
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
@@ -1008,7 +1063,7 @@ public class CamelSinkTaskTest {
 
         sinkTask.stop();
     }
-    
+
     @Test
     public void testWithIdempotencyAndHeader() throws InterruptedException {
         Map<String, String> props = new HashMap<>();
@@ -1040,7 +1095,7 @@ public class CamelSinkTaskTest {
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));
         assertEquals(LoggingLevel.OFF.toString(), sinkTask.getCamelSinkConnectorConfig(props)
             .getString(CamelSinkConnectorConfig.CAMEL_SINK_CONTENT_LOG_LEVEL_CONF));
-        
+
         exchange = consumer.receive(SEDA_URI, RECEIVE_TIMEOUT);
         assertEquals("camel1", exchange.getMessage().getBody());
         assertEquals("test", exchange.getMessage().getHeaders().get(CamelSinkTask.KAFKA_RECORD_KEY_HEADER));