[GitHub] [camel-kafka-connector] gohanbg opened a new issue #843: [Question] How to process avro message in S3 connector

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

[GitHub] [camel-kafka-connector] gohanbg opened a new issue #843: [Question] How to process avro message in S3 connector

GitBox

gohanbg opened a new issue #843:
URL: https://github.com/apache/camel-kafka-connector/issues/843


   Hello,
   
   I have been playing around with Strimzi and camel's kafka connectors, in particular the S3 sink connector.
   
   My setup consists of Strimzi kafka and Confluent schema registry, all running in k8s
   
   The **KafkaConnect** looks like this:
   ```yaml
   apiVersion: kafka.strimzi.io/v1beta1
   kind: KafkaConnect
   metadata:
     namespace: messaging
     name: aws-connect
     annotations:
       strimzi.io/use-connector-resources: "true"
   spec:
     version: 2.5.0
     image: my-image
     logging:
       type: inline
       loggers:
         connect.root.logger.level: "INFO"
     replicas: 1
     bootstrapServers: my-cluster-kafka-brokers.messaging.svc:9092
     externalConfiguration:
       volumes:
         - name: kafka-aws-credentials
           secret:
             secretName: kafka-aws-credentials
     config:
       config.providers: file
       config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
       key.converter: org.apache.kafka.connect.storage.StringConverter
       value.converter: io.confluent.connect.avro.AvroConverter
       value.converter.schema.registry.url: http://confluent-cp-schema-registry.messaging.svc:8081
       key.converter.schemas.enable: false
       value.converter.schemas.enable: true
     template:
       pod:
         imagePullSecrets:
           - name: docker-registry-secret
   ```
   
   the **KafkaConnector** looks like this:
   ```yaml
   apiVersion: kafka.strimzi.io/v1alpha1
   kind: KafkaConnector
   metadata:
     name: s3-sink-connector
     labels:
       strimzi.io/cluster: aws-connect
   spec:
     class: org.apache.camel.kafkaconnector.awss3.CamelAwss3SinkConnector
     tasksMax: 1
     config:
       key.converter: org.apache.kafka.connect.storage.StringConverter
       value.converter: io.confluent.connect.avro.AvroConverter
       value.converter.schema.registry.url: http://confluent-cp-schema-registry.messaging.svc:8081
       topics: topic-to-export
       camel.sink.path.bucketNameOrArn: s3-bucket-name
       camel.sink.endpoint.keyName: ${date:now:yyyyMMdd-HHmmssSSS}-${exchangeId}
       camel.sink.maxPollDuration: 10000
       camel.component.aws-s3.configuration.autocloseBody: false
       camel.component.aws-s3.accessKey: ${file:/opt/kafka/external-configuration/kafka-aws-credentials/kafka-s3-credentials.properties:aws_access_key_id}
       camel.component.aws-s3.secretKey: ${file:/opt/kafka/external-configuration/kafka-aws-credentials/kafka-s3-credentials.properties:aws_secret_access_key}
       camel.component.aws-s3.region: S3_REGION
   ```
   
   When I run the Connector it starts reading the topic correctly, but just before sending it to S3 it says the following:
   ```error
   org.apache.camel.NoTypeConversionAvailableException: No type converter available to convert from type: org.apache.kafka.connect.data.Struct to the required type: java.io.InputStream
   ```
   
   
   I can see that the **Struct** message contains the correct data from the logs, but it can not be converted to InputStream.
   I just want to have the message as a JSON in S3 at the end. Not any special transformations.
   
   Here is how my **KafkaConnect** image looks like
   ```dockerfile
   FROM strimzi/kafka:0.17.0-kafka-2.4.0
   USER root:root
   RUN mkdir -p /opt/kafka/plugins/camel-aws-s3
   COPY ./camel-aws-s3/* /opt/kafka/plugins/camel-aws-s3/
   USER 1001
   ```
   and the camel-aws-s3 directory, from which I'm copying looks like this:
   ```
   annotations-13.0.jar                                        camel-core-engine-3.7.0.jar      commons-compress-1.20.jar           jackson-mapper-asl-1.9.13.jar             medeia-validator-core-1.1.1.jar
   apicurio-registry-common-1.3.1.Final.jar                    camel-core-languages-3.7.0.jar   commons-logging-1.2.jar             jboss-jaxrs-api_2.1_spec-2.0.1.Final.jar  medeia-validator-jackson-1.1.1.jar
   apicurio-registry-distro-connect-converter-1.3.0.Final.jar  camel-core-model-3.7.0.jar       common-utils-5.5.0.jar              jmespath-java-1.11.714.jar                NOTICE.txt
   apicurio-registry-rest-client-1.3.1.Final.jar               camel-core-processor-3.7.0.jar   connect-json-2.6.0.jar              joda-time-2.8.1.jar                       okhttp-3.14.9.jar
   apicurio-registry-utils-converter-1.3.1.Final.jar           camel-core-reifier-3.7.0.jar     converter-jackson-2.9.0.jar         kafka-avro-serializer-5.5.0.jar           okio-1.17.2.jar
   apicurio-registry-utils-serde-1.3.1.Final.jar               camel-direct-3.7.0.jar           httpclient-4.5.13.jar               kafka-clients-2.6.0.jar                   protobuf-java-3.13.0.jar
   avro-1.10.0.jar                                             camel-jackson-3.7.0.jar          httpcore-4.4.14.jar                 kafka-connect-avro-converter-5.5.0.jar    README.adoc
   aws-java-sdk-core-1.11.714.jar                              camel-kafka-3.7.0.jar            ion-java-1.0.2.jar                  kafka-connect-avro-data-5.5.0.jar         retrofit-2.9.0.jar
   aws-java-sdk-kms-1.11.714.jar                               camel-kafka-connector-0.7.0.jar  jackson-annotations-2.11.3.jar      kafka-schema-registry-client-5.5.0.jar    slf4j-api-1.7.30.jar
   aws-java-sdk-s3-1.11.714.jar                                camel-main-3.7.0.jar             jackson-core-2.11.3.jar             kafka-schema-serializer-5.5.0.jar         snappy-java-1.1.7.3.jar
   camel-api-3.7.0.jar                                         camel-management-api-3.7.0.jar   jackson-core-asl-1.9.13.jar         kotlin-reflect-1.3.20.jar                 zstd-jni-1.4.4-7.jar
   camel-aws-s3-3.7.0.jar                                      camel-support-3.7.0.jar          jackson-databind-2.11.3.jar         kotlin-stdlib-1.3.20.jar
   camel-aws-s3-kafka-connector-0.7.0.jar                      camel-util-3.7.0.jar             jackson-dataformat-avro-2.11.3.jar  kotlin-stdlib-common-1.3.20.jar
   camel-base-3.7.0.jar                                        common-config-5.5.0.jar          jackson-dataformat-cbor-2.11.3.jar  LICENSE.txt
   camel-base-engine-3.7.0.jar                                 commons-codec-1.15.jar           jackson-datatype-jdk8-2.10.2.jar    lz4-java-1.7.1.jar
   
   ```
   Basically, it contains unzipped [camel-aws-s3-kafka-connector](https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-aws-s3-kafka-connector/0.7.0/camel-aws-s3-kafka-connector-0.7.0-package.zip) plus the confluent's libraries for reading from Avro schema registry. I was following [this example](https://github.com/debezium/docker-images/blob/master/connect-base/1.2/Dockerfile#L39) to get confluent's libraries
   
   Do you have any suggestions on how I can proceed
   
   PS: If I don't use the **io.confluent.connect.avro.AvroConverter** for the values, but rather use **org.apache.kafka.connect.storage.StringConverter** it all works great and reaches S3 with no issues. The problem is that the message does not look great


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [camel-kafka-connector] valdar commented on issue #843: [Question] How to process avro message in S3 connector

GitBox

valdar commented on issue #843:
URL: https://github.com/apache/camel-kafka-connector/issues/843#issuecomment-757971895


   Hello @gohanbg in which format do you have your kafka messages serialized in the topic? looks like in avro since you are using `value.converter: io.confluent.connect.avro.AvroConverter` ? What happen if you try the jsonConverter?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [camel-kafka-connector] gohanbg commented on issue #843: [Question] How to process avro message in S3 connector

GitBox
In reply to this post by GitBox

gohanbg commented on issue #843:
URL: https://github.com/apache/camel-kafka-connector/issues/843#issuecomment-758164904


   Hi @valdar
   
   Inside kafka topic itself, messages are in `Avro` format. If I try to use `org.apache.kafka.connect.json.JsonConverter` I get a parsing exception -
   ```
   Caused by: org.apache.kafka.common.errors.SerializationException: java.io.CharConversionException: Invalid UTF-32 character 0x515486f (above 0x0010ffff) at char #1, byte #7)
   Caused by: java.io.CharConversionException: Invalid UTF-32 character 0x515486f (above 0x0010ffff) at char #1, byte #7)
   ```
   
   If I use the StringConverter this is what reaches S3:
   ```
   House Stark�<a href="https://static.wikia.nocookie.net/gameofthrones/images/8/8a/House-Stark-Main-Shield.PNG/revision/latest/scale-to-width-down/400?cb=20170101103142�https://media-exp1.licdn.com/dms/image/C4E1BAQEY67rO9YNpIw/company-background_10000/0?e=1592661600&v=beta&t=w0K0h9L18D6HieYM1giMf7TqKGne4LKqieuENS4RqDwH7d4cb366-aedf-476e-b3fc-61b9b1fc55eb%">https://static.wikia.nocookie.net/gameofthrones/images/8/8a/House-Stark-Main-Shield.PNG/revision/latest/scale-to-width-down/400?cb=20170101103142�https://media-exp1.licdn.com/dms/image/C4E1BAQEY67rO9YNpIw/company-background_10000/0?e=1592661600&v=beta&t=w0K0h9L18D6HieYM1giMf7TqKGne4LKqieuENS4RqDwH7d4cb366-aedf-476e-b3fc-61b9b1fc55eb% 
   ```
   
   I guess the JsonConverter tries to parse the exact same string and find unexpected characters.
   
   Only with `io.confluent.connect.avro.AvroConverter` I see a nicely parsed object, but it can't reach S3 :(
   
   Regards
   Mihail Yordanov
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [camel-kafka-connector] valdar commented on issue #843: [Question] How to process avro message in S3 connector

GitBox
In reply to this post by GitBox

valdar commented on issue #843:
URL: https://github.com/apache/camel-kafka-connector/issues/843#issuecomment-758583745


   Ok so a possible solution would be an SMT to transform `kafka connect struct` into json... a quick search revealed that no such SMT is publicly available, so might be worth to add one as part of this project...


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [camel-kafka-connector] gohanbg commented on issue #843: [Question] How to process avro message in S3 connector

GitBox
In reply to this post by GitBox

gohanbg commented on issue #843:
URL: https://github.com/apache/camel-kafka-connector/issues/843#issuecomment-758585601


   @valdar What does SMT means - is this some sort of interface?. Can you give me an example of an existing SMT transformation so that I can use it as a reference :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [camel-kafka-connector] valdar commented on issue #843: [Question] How to process avro message in S3 connector

GitBox
In reply to this post by GitBox

valdar commented on issue #843:
URL: https://github.com/apache/camel-kafka-connector/issues/843#issuecomment-758683452


   SMT stands for "single message transformation", the interface is `org.apache.kafka.connect.transforms.Transformation` an example is: https://github.com/apache/camel-kafka-connector/blob/master/core/src/main/java/org/apache/camel/kafkaconnector/transforms/CamelTypeConverterTransform.java


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [camel-kafka-connector] gohanbg commented on issue #843: [Question] How to process avro message in S3 connector

GitBox
In reply to this post by GitBox

gohanbg commented on issue #843:
URL: https://github.com/apache/camel-kafka-connector/issues/843#issuecomment-759090362


   Thank you for the response. Will try to implement it and see how it goes :)


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [camel-kafka-connector] valdar commented on issue #843: [Question] How to process avro message in S3 connector

GitBox
In reply to this post by GitBox

valdar commented on issue #843:
URL: https://github.com/apache/camel-kafka-connector/issues/843#issuecomment-759487281


   I am finalizing a first version that I will contribute shortly. :crossed_fingers:


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [camel-kafka-connector] ruchirvaninasdaq commented on issue #843: [Question] How to process avro message in S3 connector

GitBox
In reply to this post by GitBox

ruchirvaninasdaq commented on issue #843:
URL: https://github.com/apache/camel-kafka-connector/issues/843#issuecomment-759610050


   @valdar : Are you planning to add confluent properties (of schema regiustry) also in camel connector? as I think to run this value converter `io.confluent.connect.avro.AvroConverter` we might need schema registry address also, right?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [camel-kafka-connector] ruchirvaninasdaq removed a comment on issue #843: [Question] How to process avro message in S3 connector

GitBox
In reply to this post by GitBox

ruchirvaninasdaq removed a comment on issue #843:
URL: https://github.com/apache/camel-kafka-connector/issues/843#issuecomment-759610050


   @valdar : Are you planning to add confluent properties (of schema regiustry) also in camel connector? as I think to run this value converter `io.confluent.connect.avro.AvroConverter` we might need schema registry address also, right?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [camel-kafka-connector] valdar commented on issue #843: [Question] How to process avro message in S3 connector

GitBox
In reply to this post by GitBox

valdar commented on issue #843:
URL: https://github.com/apache/camel-kafka-connector/issues/843#issuecomment-760327703


   schema registry properties should already be supported and work with confluent registry as well as other registries like apicurio: https://www.apicur.io/registry/


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [camel-kafka-connector] oscerd closed issue #843: [Question] How to process avro message in S3 connector

GitBox
In reply to this post by GitBox

oscerd closed issue #843:
URL: https://github.com/apache/camel-kafka-connector/issues/843


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [camel-kafka-connector] valdar commented on issue #843: [Question] How to process avro message in S3 connector

GitBox
In reply to this post by GitBox

valdar commented on issue #843:
URL: https://github.com/apache/camel-kafka-connector/issues/843#issuecomment-761074624


   @gohanbg I have added the SMT here https://github.com/apache/camel-kafka-connector/pull/867/commits/e945f76796d78b828453f4d765b5f887a149a83c feel free to try it and report problems.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]