gohanbg opened a new issue #843:
URL:
https://github.com/apache/camel-kafka-connector/issues/843 Hello,
I have been playing around with Strimzi and camel's kafka connectors, in particular the S3 sink connector.
My setup consists of Strimzi kafka and Confluent schema registry, all running in k8s
The **KafkaConnect** looks like this:
```yaml
apiVersion: kafka.strimzi.io/v1beta1
kind: KafkaConnect
metadata:
namespace: messaging
name: aws-connect
annotations:
strimzi.io/use-connector-resources: "true"
spec:
version: 2.5.0
image: my-image
logging:
type: inline
loggers:
connect.root.logger.level: "INFO"
replicas: 1
bootstrapServers: my-cluster-kafka-brokers.messaging.svc:9092
externalConfiguration:
volumes:
- name: kafka-aws-credentials
secret:
secretName: kafka-aws-credentials
config:
config.providers: file
config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url:
http://confluent-cp-schema-registry.messaging.svc:8081 key.converter.schemas.enable: false
value.converter.schemas.enable: true
template:
pod:
imagePullSecrets:
- name: docker-registry-secret
```
the **KafkaConnector** looks like this:
```yaml
apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
name: s3-sink-connector
labels:
strimzi.io/cluster: aws-connect
spec:
class: org.apache.camel.kafkaconnector.awss3.CamelAwss3SinkConnector
tasksMax: 1
config:
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url:
http://confluent-cp-schema-registry.messaging.svc:8081 topics: topic-to-export
camel.sink.path.bucketNameOrArn: s3-bucket-name
camel.sink.endpoint.keyName: ${date:now:yyyyMMdd-HHmmssSSS}-${exchangeId}
camel.sink.maxPollDuration: 10000
camel.component.aws-s3.configuration.autocloseBody: false
camel.component.aws-s3.accessKey: ${file:/opt/kafka/external-configuration/kafka-aws-credentials/kafka-s3-credentials.properties:aws_access_key_id}
camel.component.aws-s3.secretKey: ${file:/opt/kafka/external-configuration/kafka-aws-credentials/kafka-s3-credentials.properties:aws_secret_access_key}
camel.component.aws-s3.region: S3_REGION
```
When I run the Connector it starts reading the topic correctly, but just before sending it to S3 it says the following:
```error
org.apache.camel.NoTypeConversionAvailableException: No type converter available to convert from type: org.apache.kafka.connect.data.Struct to the required type: java.io.InputStream
```
I can see that the **Struct** message contains the correct data from the logs, but it can not be converted to InputStream.
I just want to have the message as a JSON in S3 at the end. Not any special transformations.
Here is how my **KafkaConnect** image looks like
```dockerfile
FROM strimzi/kafka:0.17.0-kafka-2.4.0
USER root:root
RUN mkdir -p /opt/kafka/plugins/camel-aws-s3
COPY ./camel-aws-s3/* /opt/kafka/plugins/camel-aws-s3/
USER 1001
```
and the camel-aws-s3 directory, from which I'm copying looks like this:
```
annotations-13.0.jar camel-core-engine-3.7.0.jar commons-compress-1.20.jar jackson-mapper-asl-1.9.13.jar medeia-validator-core-1.1.1.jar
apicurio-registry-common-1.3.1.Final.jar camel-core-languages-3.7.0.jar commons-logging-1.2.jar jboss-jaxrs-api_2.1_spec-2.0.1.Final.jar medeia-validator-jackson-1.1.1.jar
apicurio-registry-distro-connect-converter-1.3.0.Final.jar camel-core-model-3.7.0.jar common-utils-5.5.0.jar jmespath-java-1.11.714.jar NOTICE.txt
apicurio-registry-rest-client-1.3.1.Final.jar camel-core-processor-3.7.0.jar connect-json-2.6.0.jar joda-time-2.8.1.jar okhttp-3.14.9.jar
apicurio-registry-utils-converter-1.3.1.Final.jar camel-core-reifier-3.7.0.jar converter-jackson-2.9.0.jar kafka-avro-serializer-5.5.0.jar okio-1.17.2.jar
apicurio-registry-utils-serde-1.3.1.Final.jar camel-direct-3.7.0.jar httpclient-4.5.13.jar kafka-clients-2.6.0.jar protobuf-java-3.13.0.jar
avro-1.10.0.jar camel-jackson-3.7.0.jar httpcore-4.4.14.jar kafka-connect-avro-converter-5.5.0.jar README.adoc
aws-java-sdk-core-1.11.714.jar camel-kafka-3.7.0.jar ion-java-1.0.2.jar kafka-connect-avro-data-5.5.0.jar retrofit-2.9.0.jar
aws-java-sdk-kms-1.11.714.jar camel-kafka-connector-0.7.0.jar jackson-annotations-2.11.3.jar kafka-schema-registry-client-5.5.0.jar slf4j-api-1.7.30.jar
aws-java-sdk-s3-1.11.714.jar camel-main-3.7.0.jar jackson-core-2.11.3.jar kafka-schema-serializer-5.5.0.jar snappy-java-1.1.7.3.jar
camel-api-3.7.0.jar camel-management-api-3.7.0.jar jackson-core-asl-1.9.13.jar kotlin-reflect-1.3.20.jar zstd-jni-1.4.4-7.jar
camel-aws-s3-3.7.0.jar camel-support-3.7.0.jar jackson-databind-2.11.3.jar kotlin-stdlib-1.3.20.jar
camel-aws-s3-kafka-connector-0.7.0.jar camel-util-3.7.0.jar jackson-dataformat-avro-2.11.3.jar kotlin-stdlib-common-1.3.20.jar
camel-base-3.7.0.jar common-config-5.5.0.jar jackson-dataformat-cbor-2.11.3.jar LICENSE.txt
camel-base-engine-3.7.0.jar commons-codec-1.15.jar jackson-datatype-jdk8-2.10.2.jar lz4-java-1.7.1.jar
```
Basically, it contains unzipped [camel-aws-s3-kafka-connector](
https://repo.maven.apache.org/maven2/org/apache/camel/kafkaconnector/camel-aws-s3-kafka-connector/0.7.0/camel-aws-s3-kafka-connector-0.7.0-package.zip) plus the confluent's libraries for reading from Avro schema registry. I was following [this example](
https://github.com/debezium/docker-images/blob/master/connect-base/1.2/Dockerfile#L39) to get confluent's libraries
Do you have any suggestions on how I can proceed
PS: If I don't use the **io.confluent.connect.avro.AvroConverter** for the values, but rather use **org.apache.kafka.connect.storage.StringConverter** it all works great and reaches S3 with no issues. The problem is that the message does not look great
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[hidden email]