[GitHub] [camel] Nayananga opened a new pull request #3897: CAMEL-13934 camel-minio - Component to store/load files from blob store

classic Classic list List threaded Threaded
53 messages Options
123
Reply | Threaded
Open this post in threaded view
|

[GitHub] [camel] omarsmak commented on a change in pull request #3897: CAMEL-13934 camel-minio - Component to store/load files from blob store

GitBox

omarsmak commented on a change in pull request #3897:
URL: https://github.com/apache/camel/pull/3897#discussion_r460750413



##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioEndpoint.java
##########
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio;
+
+import java.io.*;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+import io.minio.BucketExistsArgs;
+import io.minio.MakeBucketArgs;
+import io.minio.MinioClient;
+import io.minio.ObjectStat;
+import io.minio.SetBucketPolicyArgs;
+import io.minio.StatObjectArgs;
+import io.minio.errors.InvalidBucketNameException;
+import org.apache.camel.Category;
+import org.apache.camel.Component;
+import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.ExtendedExchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.component.minio.client.MinioClientFactory;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriPath;
+import org.apache.camel.support.ScheduledPollEndpoint;
+import org.apache.camel.support.SynchronizationAdapter;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Store and retrieve objects from Minio Storage Service using Minio SDK.
+ */
+@UriEndpoint(firstVersion = "3.5.0", scheme = "minio", title = "Minio Storage Service", syntax = "minio://bucketName",
+        category = {Category.CLOUD, Category.FILE})
+
+public class MinioEndpoint extends ScheduledPollEndpoint {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MinioEndpoint.class);
+
+    private MinioClient minioClient;
+
+    @UriPath(description = "Bucket name")
+    @Metadata(required = true)
+    private String bucketName; // to support component docs
+    @UriParam
+    private MinioConfiguration configuration;
+    @UriParam(label = "consumer", defaultValue = "10")
+    private int maxMessagesPerPoll = 10;
+    @UriParam(label = "consumer", defaultValue = "60")
+    private int maxConnections = 50 + maxMessagesPerPoll;
+
+    public MinioEndpoint(String uri, Component component, MinioConfiguration configuration) {
+        super(uri, component);
+        this.configuration = configuration;
+    }
+
+    @Override
+    public Consumer createConsumer(Processor processor) throws Exception {
+        MinioConsumer minioConsumer = new MinioConsumer(this, processor);
+        configureConsumer(minioConsumer);
+        minioConsumer.setMaxMessagesPerPoll(maxMessagesPerPoll);
+        return minioConsumer;
+    }
+
+    @Override
+    public Producer createProducer() {
+        return new MinioProducer(this);
+    }
+
+    @Override
+    public void doStart() throws Exception {
+        super.doStart();
+
+        minioClient = getConfiguration().getMinioClient() != null
+                ? getConfiguration().getMinioClient()
+                : MinioClientFactory.getClient(getConfiguration()).getMinioClient();
+
+        String objectName = getConfiguration().getObjectName();
+
+        if (objectName != null) {
+            LOG.trace("Object name {} requested, so skipping bucket check...", objectName);
+            return;
+        }
+
+        String bucketName = getConfiguration().getBucketName();
+        LOG.trace("Querying whether bucket {} already exists...", bucketName);
+
+        if (bucketExists(bucketName)) {
+            LOG.trace("Bucket {} already exists", bucketName);
+        } else {
+            if (!getConfiguration().isAutoCreateBucket()) {
+                throw new InvalidBucketNameException("Bucket {} does not exists, set autoCreateBucket option for bucket auto creation", bucketName);
+            } else {
+                LOG.trace("AutoCreateBucket set to true, Creating bucket {}...", bucketName);
+                makeBucket(bucketName);
+                LOG.trace("Bucket created");
+            }
+        }
+
+        if (getConfiguration().getPolicy() != null) {
+            setBucketPolicy(bucketName);
+        }
+    }
+
+    @Override
+    public void doStop() throws Exception {
+        if (ObjectHelper.isEmpty(getConfiguration().getMinioClient())) {
+            if (minioClient != null) {

Review comment:
       I don't see any reason to null the client here, isn't?

##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java
##########
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio;
+
+import java.io.InputStream;
+import java.util.*;
+
+import io.minio.CopyObjectArgs;
+import io.minio.CopySource;
+import io.minio.GetObjectArgs;
+import io.minio.ListObjectsArgs;
+import io.minio.MinioClient;
+import io.minio.RemoveObjectArgs;
+import io.minio.errors.MinioException;
+import io.minio.messages.Contents;
+import io.minio.messages.ListBucketResultV2;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
+import org.apache.camel.Processor;
+import org.apache.camel.spi.Synchronization;
+import org.apache.camel.support.ScheduledBatchPollingConsumer;
+import org.apache.camel.util.CastUtils;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.URISupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Consumer of messages from the Minio Storage Service.
+ */
+public class MinioConsumer extends ScheduledBatchPollingConsumer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MinioConsumer.class);
+
+    private String continuationToken;
+    private transient String minioConsumerToString;
+
+    public MinioConsumer(MinioEndpoint endpoint, Processor processor) {
+        super(endpoint, processor);
+    }
+
+    @Override
+    protected int poll() throws Exception {
+        // must reset for each poll
+        shutdownRunningTask = null;
+        pendingExchanges = 0;
+
+        String bucketName = getConfiguration().getBucketName();
+        String objectName = getConfiguration().getObjectName();
+        MinioClient minioClient = getMinioClient();
+        Queue<Exchange> exchanges;
+
+        if (objectName != null) {
+            LOG.trace("Getting object in bucket {} with object name {}...", bucketName, objectName);
+
+            InputStream minioObject = getObject(bucketName, minioClient, objectName);
+            exchanges = createExchanges(minioObject, objectName);
+
+        } else {
+
+            LOG.trace("Queueing objects in bucket [{}]...", bucketName);
+
+            ListObjectsArgs.Builder listObjectRequest = ListObjectsArgs.builder()
+                    .bucket(bucketName)
+                    .includeUserMetadata(getConfiguration().isIncludeUserMetadata())
+                    .includeVersions(getConfiguration().isIncludeVersions())
+                    .recursive(getConfiguration().isRecursive())
+                    .useApiVersion1(getConfiguration().isUseVersion1());
+
+            if (getConfiguration().getDelimiter() != null) {
+                listObjectRequest.delimiter(getConfiguration().getDelimiter());
+            }
+
+            if (maxMessagesPerPoll > 0) {
+                listObjectRequest.maxKeys(maxMessagesPerPoll);
+            }
+
+            if (getConfiguration().getPrefix() != null) {
+                listObjectRequest.prefix(getConfiguration().getPrefix());
+            }
+
+            if (getConfiguration().getStartAfter() != null) {
+                listObjectRequest.startAfter(getConfiguration().getStartAfter());
+            }
+
+            // if there was a marker from previous poll then use that to
+            // continue from where we left last time
+            if (continuationToken != null) {
+                LOG.trace("Resuming from marker: {}", continuationToken);
+                listObjectRequest.continuationToken(continuationToken);
+            }
+
+            // TODO: Check for validity of the statement
+            ListBucketResultV2 listObjects = (ListBucketResultV2) getMinioClient().listObjects(listObjectRequest.build());
+
+            if (listObjects.isTruncated()) {
+                LOG.trace("Returned list is truncated, so setting next marker: {}", continuationToken);
+                continuationToken = listObjects.nextContinuationToken();
+
+            } else {
+                // no more data so clear marker
+                continuationToken = null;
+            }
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Found {} objects in bucket [{}]...", listObjects.contents().size(), bucketName);
+            }
+
+            exchanges = createExchanges(listObjects.contents());
+        }
+        return processBatch(CastUtils.cast(exchanges));
+    }
+
+    protected Queue<Exchange> createExchanges(InputStream objectStream, String objectName) throws Exception {
+        Queue<Exchange> answer = new LinkedList<>();
+        Exchange exchange = getEndpoint().createExchange(objectStream, objectName);
+        answer.add(exchange);
+        IOHelper.close(objectStream);
+        return answer;
+    }
+
+    protected Queue<Exchange> createExchanges(List<Contents> minioObjectSummaries) throws Exception {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Received {} messages in this poll", minioObjectSummaries.size());
+        }
+        String bucketName = getConfiguration().getBucketName();
+        Collection<InputStream> minioObjects = new ArrayList<>();
+        Queue<Exchange> answer = new LinkedList<>();
+        try {
+            if (getConfiguration().isIncludeFolders()) {
+                for (Contents minioObjectSummary : minioObjectSummaries) {
+                    InputStream minioObject = getObject(bucketName, getMinioClient(), minioObjectSummary.objectName());
+                    minioObjects.add(minioObject);
+                    Exchange exchange = getEndpoint().createExchange(minioObject, minioObjectSummary.objectName());
+                    answer.add(exchange);
+                }
+            } else {
+                for (Contents minioObjectSummary : minioObjectSummaries) {
+                    // ignore if directory
+                    if (!minioObjectSummary.isDir()) {
+                        InputStream minioObject = getObject(bucketName, getMinioClient(), minioObjectSummary.objectName());
+                        minioObjects.add(minioObject);
+                        Exchange exchange = getEndpoint().createExchange(minioObject, minioObjectSummary.objectName());
+                        answer.add(exchange);
+                    }
+                }
+            }
+
+        } catch (Throwable e) {
+            LOG.warn("Error getting MinioObject due: {}", e.getMessage());
+            throw e;
+
+        } finally {
+            // ensure all previous gathered minio objects are closed
+            // if there was an exception creating the exchanges in this batch
+            minioObjects.forEach(IOHelper::close);
+        }
+
+        return answer;
+    }
+
+    private InputStream getObject(String bucketName, MinioClient minioClient, String objectName) throws Exception {
+        GetObjectArgs.Builder getObjectRequest = GetObjectArgs.builder().bucket(bucketName).object(objectName);
+
+        if (getConfiguration().getServerSideEncryptionCustomerKey() != null) {
+            getObjectRequest.ssec(getConfiguration().getServerSideEncryptionCustomerKey());
+        }
+        if (getConfiguration().getOffset() != 0) {
+            getObjectRequest.offset(getConfiguration().getOffset());
+        }
+        if (getConfiguration().getLength() != 0) {
+            getObjectRequest.length(getConfiguration().getLength());
+        }
+        if (getConfiguration().getVersionId() != null) {
+            getObjectRequest.versionId(getConfiguration().getVersionId());
+        }
+        if (getConfiguration().getMatchETag() != null) {

Review comment:
       Perhaps we can use `isNotEmpty` from `org.apache.camel.util.ObjectHelper` to check for nulls or emptyness

##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConfiguration.java
##########
@@ -0,0 +1,641 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio;
+
+import java.time.ZonedDateTime;
+
+import io.minio.MinioClient;
+import io.minio.ServerSideEncryption;
+import io.minio.ServerSideEncryptionCustomerKey;
+import okhttp3.OkHttpClient;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriParams;
+
+@UriParams
+public class MinioConfiguration implements Cloneable {
+
+    @UriParam
+    private String endpoint;
+    @UriParam
+    private Integer proxyPort;
+
+    @UriParam(label = "security", secret = true)
+    private String accessKey;
+    @UriParam(label = "security", secret = true)
+    private String secretKey;
+    @UriParam(defaultValue = "false")
+    private boolean secure;
+
+    @UriParam

Review comment:
       labels are missing here, for example `producer`, `consumer`, `common` ..etc

##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioEndpoint.java
##########
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio;
+
+import java.io.*;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+import io.minio.BucketExistsArgs;
+import io.minio.MakeBucketArgs;
+import io.minio.MinioClient;
+import io.minio.ObjectStat;
+import io.minio.SetBucketPolicyArgs;
+import io.minio.StatObjectArgs;
+import io.minio.errors.InvalidBucketNameException;
+import org.apache.camel.Category;
+import org.apache.camel.Component;
+import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.ExtendedExchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.component.minio.client.MinioClientFactory;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriPath;
+import org.apache.camel.support.ScheduledPollEndpoint;
+import org.apache.camel.support.SynchronizationAdapter;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Store and retrieve objects from Minio Storage Service using Minio SDK.
+ */
+@UriEndpoint(firstVersion = "3.5.0", scheme = "minio", title = "Minio Storage Service", syntax = "minio://bucketName",
+        category = {Category.CLOUD, Category.FILE})
+
+public class MinioEndpoint extends ScheduledPollEndpoint {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MinioEndpoint.class);
+
+    private MinioClient minioClient;
+
+    @UriPath(description = "Bucket name")
+    @Metadata(required = true)
+    private String bucketName; // to support component docs
+    @UriParam
+    private MinioConfiguration configuration;
+    @UriParam(label = "consumer", defaultValue = "10")
+    private int maxMessagesPerPoll = 10;
+    @UriParam(label = "consumer", defaultValue = "60")
+    private int maxConnections = 50 + maxMessagesPerPoll;
+
+    public MinioEndpoint(String uri, Component component, MinioConfiguration configuration) {
+        super(uri, component);
+        this.configuration = configuration;
+    }
+
+    @Override
+    public Consumer createConsumer(Processor processor) throws Exception {
+        MinioConsumer minioConsumer = new MinioConsumer(this, processor);
+        configureConsumer(minioConsumer);
+        minioConsumer.setMaxMessagesPerPoll(maxMessagesPerPoll);
+        return minioConsumer;
+    }
+
+    @Override
+    public Producer createProducer() {
+        return new MinioProducer(this);
+    }
+
+    @Override
+    public void doStart() throws Exception {
+        super.doStart();
+
+        minioClient = getConfiguration().getMinioClient() != null
+                ? getConfiguration().getMinioClient()
+                : MinioClientFactory.getClient(getConfiguration()).getMinioClient();
+
+        String objectName = getConfiguration().getObjectName();
+
+        if (objectName != null) {
+            LOG.trace("Object name {} requested, so skipping bucket check...", objectName);
+            return;
+        }
+
+        String bucketName = getConfiguration().getBucketName();
+        LOG.trace("Querying whether bucket {} already exists...", bucketName);
+
+        if (bucketExists(bucketName)) {
+            LOG.trace("Bucket {} already exists", bucketName);
+        } else {
+            if (!getConfiguration().isAutoCreateBucket()) {
+                throw new InvalidBucketNameException("Bucket {} does not exists, set autoCreateBucket option for bucket auto creation", bucketName);
+            } else {
+                LOG.trace("AutoCreateBucket set to true, Creating bucket {}...", bucketName);
+                makeBucket(bucketName);
+                LOG.trace("Bucket created");
+            }
+        }
+
+        if (getConfiguration().getPolicy() != null) {
+            setBucketPolicy(bucketName);
+        }
+    }
+
+    @Override
+    public void doStop() throws Exception {
+        if (ObjectHelper.isEmpty(getConfiguration().getMinioClient())) {
+            if (minioClient != null) {
+                minioClient = null;
+            }
+        }
+        super.doStop();
+    }
+
+    public Exchange createExchange(InputStream minioObject, String objectName) throws Exception {
+        return createExchange(getExchangePattern(), minioObject, objectName);
+    }
+
+    public Exchange createExchange(ExchangePattern pattern,
+                                   InputStream minioObject, String objectName) throws Exception {
+        LOG.trace("Getting object with objectName {} from bucket {}...", objectName, getConfiguration().getBucketName());
+
+        Exchange exchange = super.createExchange(pattern);
+        Message message = exchange.getIn();
+        LOG.trace("Got object!");
+
+        getObjectStat(objectName, message);
+
+        if (getConfiguration().isIncludeBody()) {
+            try {
+                message.setBody(readInputStream(minioObject));
+                if (getConfiguration().isAutocloseBody()) {
+                    exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() {
+                        @Override
+                        public void onDone(Exchange exchange) {
+                            IOHelper.close(minioObject);
+                        }
+                    });
+                }
+
+            } catch (IOException e) {
+                // TODO Auto-generated catch block
+                e.printStackTrace();
+            }
+        } else {
+            message.setBody(null);
+            IOHelper.close(minioObject);
+        }
+
+        return exchange;
+    }
+
+    public MinioConfiguration getConfiguration() {
+        return configuration;
+    }
+
+    public void setConfiguration(MinioConfiguration configuration) {
+        this.configuration = configuration;
+    }
+
+    public MinioClient getMinioClient() {
+        return minioClient;
+    }
+
+    public void setMinioClient(MinioClient minioClient) {
+        this.minioClient = minioClient;
+    }
+
+    public int getMaxMessagesPerPoll() {
+        return maxMessagesPerPoll;
+    }
+
+    /**
+     * Gets the maximum number of messages as a limit to poll at each polling.
+     * <p/>
+     * Gets the maximum number of messages as a limit to poll at each polling.
+     * The default value is 10. Use 0 or a negative number to set it as
+     * unlimited.
+     */
+    public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
+        this.maxMessagesPerPoll = maxMessagesPerPoll;
+    }
+
+    public int getMaxConnections() {
+        return maxConnections;
+    }
+
+    /**
+     * Set the maxConnections parameter in the minio client configuration
+     */
+    public void setMaxConnections(int maxConnections) {
+        this.maxConnections = maxConnections;
+    }
+
+    private String readInputStream(InputStream minioObject) throws IOException {
+        StringBuilder textBuilder = new StringBuilder();
+        try (Reader reader = new BufferedReader(new InputStreamReader(minioObject, Charset.forName(StandardCharsets.UTF_8.name())))) {
+            int c;
+            while ((c = reader.read()) != -1) {
+                textBuilder.append((char) c);
+            }
+        }
+        return textBuilder.toString();
+    }
+
+    private boolean bucketExists(String bucketName) throws Exception {
+        return minioClient.bucketExists(BucketExistsArgs.builder().bucket(bucketName).build());
+    }
+
+    private void makeBucket(String bucketName) throws Exception {
+        MakeBucketArgs.Builder makeBucketRequest = MakeBucketArgs.builder().bucket(bucketName).objectLock(getConfiguration().isObjectLock());
+        if (getConfiguration().getRegion() != null) {
+            makeBucketRequest.region(getConfiguration().getRegion());
+        }
+        minioClient.makeBucket(makeBucketRequest.build());
+    }
+
+    private void setBucketPolicy(String bucketName) throws Exception {
+        LOG.trace("Updating bucket {} with policy...", bucketName);
+        minioClient.setBucketPolicy(
+                SetBucketPolicyArgs.builder().bucket(bucketName).config(getConfiguration().getPolicy()).build());
+        LOG.trace("Bucket policy updated");
+    }
+
+    private void getObjectStat(String objectName, Message message) throws Exception {
+
+        String bucketName = getConfiguration().getBucketName();
+        StatObjectArgs.Builder statObjectRequest = StatObjectArgs.builder().bucket(bucketName).object(objectName);
+
+        if (getConfiguration().getServerSideEncryptionCustomerKey() != null) {
+            statObjectRequest.ssec(getConfiguration().getServerSideEncryptionCustomerKey());
+        }
+        if (getConfiguration().getOffset() != 0) {

Review comment:
       These checks are pretty redundant. Perhaps what we can do here, wrap these into centralized lambdas somewhere and just pass the function reference. Example:
   ```
   private void checkMatchTagConfig(final MinioConfiguration configuration, final java.util.function.Consumer<String> fn) {
           if ( ObjectHelper.isNotEmpty(configuration.getMatchETag()) ) {
               fn.accept(configuration.getMatchETag());
           }
       }
   ```  
   And then somewhere in the code:
   ```
   checkMatchTagConfig(getConfiguration(), statObjectRequest::matchETag);
   ```

##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioEndpoint.java
##########
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio;
+
+import java.io.*;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+import io.minio.BucketExistsArgs;
+import io.minio.MakeBucketArgs;
+import io.minio.MinioClient;
+import io.minio.ObjectStat;
+import io.minio.SetBucketPolicyArgs;
+import io.minio.StatObjectArgs;
+import io.minio.errors.InvalidBucketNameException;
+import org.apache.camel.Category;
+import org.apache.camel.Component;
+import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.ExtendedExchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.component.minio.client.MinioClientFactory;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriPath;
+import org.apache.camel.support.ScheduledPollEndpoint;
+import org.apache.camel.support.SynchronizationAdapter;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Store and retrieve objects from Minio Storage Service using Minio SDK.
+ */
+@UriEndpoint(firstVersion = "3.5.0", scheme = "minio", title = "Minio Storage Service", syntax = "minio://bucketName",
+        category = {Category.CLOUD, Category.FILE})
+
+public class MinioEndpoint extends ScheduledPollEndpoint {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MinioEndpoint.class);
+
+    private MinioClient minioClient;
+
+    @UriPath(description = "Bucket name")
+    @Metadata(required = true)
+    private String bucketName; // to support component docs
+    @UriParam
+    private MinioConfiguration configuration;
+    @UriParam(label = "consumer", defaultValue = "10")
+    private int maxMessagesPerPoll = 10;
+    @UriParam(label = "consumer", defaultValue = "60")
+    private int maxConnections = 50 + maxMessagesPerPoll;
+
+    public MinioEndpoint(String uri, Component component, MinioConfiguration configuration) {
+        super(uri, component);
+        this.configuration = configuration;
+    }
+
+    @Override
+    public Consumer createConsumer(Processor processor) throws Exception {
+        MinioConsumer minioConsumer = new MinioConsumer(this, processor);
+        configureConsumer(minioConsumer);
+        minioConsumer.setMaxMessagesPerPoll(maxMessagesPerPoll);
+        return minioConsumer;
+    }
+
+    @Override
+    public Producer createProducer() {
+        return new MinioProducer(this);
+    }
+
+    @Override
+    public void doStart() throws Exception {
+        super.doStart();
+
+        minioClient = getConfiguration().getMinioClient() != null
+                ? getConfiguration().getMinioClient()
+                : MinioClientFactory.getClient(getConfiguration()).getMinioClient();
+
+        String objectName = getConfiguration().getObjectName();
+
+        if (objectName != null) {
+            LOG.trace("Object name {} requested, so skipping bucket check...", objectName);
+            return;
+        }
+
+        String bucketName = getConfiguration().getBucketName();
+        LOG.trace("Querying whether bucket {} already exists...", bucketName);
+
+        if (bucketExists(bucketName)) {
+            LOG.trace("Bucket {} already exists", bucketName);
+        } else {
+            if (!getConfiguration().isAutoCreateBucket()) {
+                throw new InvalidBucketNameException("Bucket {} does not exists, set autoCreateBucket option for bucket auto creation", bucketName);
+            } else {
+                LOG.trace("AutoCreateBucket set to true, Creating bucket {}...", bucketName);
+                makeBucket(bucketName);
+                LOG.trace("Bucket created");
+            }
+        }
+
+        if (getConfiguration().getPolicy() != null) {
+            setBucketPolicy(bucketName);
+        }
+    }
+
+    @Override
+    public void doStop() throws Exception {
+        if (ObjectHelper.isEmpty(getConfiguration().getMinioClient())) {
+            if (minioClient != null) {
+                minioClient = null;
+            }
+        }
+        super.doStop();
+    }
+
+    public Exchange createExchange(InputStream minioObject, String objectName) throws Exception {
+        return createExchange(getExchangePattern(), minioObject, objectName);
+    }
+
+    public Exchange createExchange(ExchangePattern pattern,
+                                   InputStream minioObject, String objectName) throws Exception {
+        LOG.trace("Getting object with objectName {} from bucket {}...", objectName, getConfiguration().getBucketName());
+
+        Exchange exchange = super.createExchange(pattern);
+        Message message = exchange.getIn();
+        LOG.trace("Got object!");
+
+        getObjectStat(objectName, message);
+
+        if (getConfiguration().isIncludeBody()) {
+            try {
+                message.setBody(readInputStream(minioObject));
+                if (getConfiguration().isAutocloseBody()) {
+                    exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() {
+                        @Override
+                        public void onDone(Exchange exchange) {
+                            IOHelper.close(minioObject);
+                        }
+                    });
+                }
+
+            } catch (IOException e) {
+                // TODO Auto-generated catch block
+                e.printStackTrace();

Review comment:
       please don't print the stack trace here, instead handle the exception.

##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioProducer.java
##########
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio;
+
+import java.io.*;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import io.minio.CopyObjectArgs;
+import io.minio.CopySource;
+import io.minio.GetObjectArgs;
+import io.minio.ListObjectsArgs;
+import io.minio.MinioClient;
+import io.minio.ObjectWriteResponse;
+import io.minio.PutObjectArgs;
+import io.minio.RemoveBucketArgs;
+import io.minio.RemoveObjectArgs;
+import io.minio.RemoveObjectsArgs;
+import io.minio.Result;
+import io.minio.messages.Bucket;
+import io.minio.messages.Item;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.Message;
+import org.apache.camel.WrappedFile;
+import org.apache.camel.support.DefaultProducer;
+import org.apache.camel.util.FileUtil;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.URISupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Producer which sends messages to the Minio Simple Storage
+ */
+public class MinioProducer extends DefaultProducer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MinioProducer.class);
+
+    private transient String minioProducerToString;
+
+    public MinioProducer(final Endpoint endpoint) {
+        super(endpoint);
+    }
+
+    public static Message getMessageForResponse(final Exchange exchange) {
+        return exchange.getMessage();
+    }
+
+    @Override
+    public void process(final Exchange exchange) throws Exception {
+        MinioOperations operation = determineOperation(exchange);
+        MinioClient minioClient = getEndpoint().getMinioClient();
+        if (ObjectHelper.isEmpty(operation)) {
+            putObject(minioClient, exchange);
+        } else {
+            switch (operation) {
+                case copyObject:
+                    copyObject(minioClient, exchange);
+                    break;
+                case deleteObject:
+                    deleteObject(minioClient, exchange);
+                    break;
+                case deleteObjects:
+                    deleteObjects(minioClient, exchange);
+                    break;
+                case listBuckets:
+                    listBuckets(minioClient, exchange);
+                    break;
+                case deleteBucket:
+                    deleteBucket(minioClient, exchange);
+                    break;
+                case listObjects:
+                    listObjects(minioClient, exchange);
+                    break;
+                case getObject:
+                    getObject(minioClient, exchange);
+                    break;
+                case getPartialObject:
+                    getPartialObject(minioClient, exchange);
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unsupported operation");
+            }
+        }
+    }
+
+    public void putObject(MinioClient minioClient, final Exchange exchange) throws Exception {
+
+        if (getConfiguration().isPojoRequest()) {
+            PutObjectArgs.Builder payload = exchange.getIn().getMandatoryBody(PutObjectArgs.Builder.class);
+            if (payload != null) {
+                ObjectWriteResponse putObjectResult = minioClient.putObject(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setHeader(MinioConstants.E_TAG, putObjectResult.etag());
+                if (putObjectResult.versionId() != null) {
+                    message.setHeader(MinioConstants.VERSION_ID, putObjectResult.versionId());
+                }
+            }
+        } else {
+            final String bucketName = determineBucketName(exchange);
+            final String objectName = determineObjectName(exchange);
+            Map<String, String> objectMetadata = determineMetadata(exchange);
+            Map<String, String> extraHeaders = determineExtraHeaders(exchange);
+
+            File filePayload = null;
+            InputStream inputStream;
+            ByteArrayOutputStream baos;
+            Object object = exchange.getIn().getMandatoryBody();
+
+            // Need to check if the message body is WrappedFile
+            if (object instanceof WrappedFile) {
+                object = ((WrappedFile<?>) object).getFile();
+            }
+            if (object instanceof File) {
+                filePayload = (File) object;
+                inputStream = new FileInputStream(filePayload);
+            } else {
+                inputStream = exchange.getIn().getMandatoryBody(InputStream.class);
+                if (objectMetadata.containsKey(Exchange.CONTENT_LENGTH)) {
+                    if (objectMetadata.get("Content-Length").equals("0") && ObjectHelper.isEmpty(exchange.getProperty(Exchange.CONTENT_LENGTH))) {
+                        LOG.debug("The content length is not defined. It needs to be determined by reading the data into memory");
+                        baos = determineLengthInputStream(inputStream);
+                        objectMetadata.put("Content-Length", String.valueOf(baos.size()));
+                        inputStream = new ByteArrayInputStream(baos.toByteArray());
+                    } else {
+                        if (ObjectHelper.isNotEmpty(exchange.getProperty(Exchange.CONTENT_LENGTH))) {
+                            objectMetadata.put("Content-Length", exchange.getProperty(Exchange.CONTENT_LENGTH, String.class));
+                        }
+                    }
+                }
+            }
+            PutObjectArgs.Builder putObjectRequest = PutObjectArgs.builder()
+                    .stream(inputStream, inputStream.available(), -1)
+                    .bucket(bucketName)
+                    .object(objectName)
+                    .userMetadata(objectMetadata);
+
+            if (!extraHeaders.isEmpty()) {
+                putObjectRequest.extraHeaders(extraHeaders);
+            }
+
+            LOG.trace("Put object from exchange...");
+
+            ObjectWriteResponse putObjectResult = getEndpoint().getMinioClient().putObject(putObjectRequest.build());
+
+            LOG.trace("Received result...");
+
+            Message message = getMessageForResponse(exchange);
+            message.setHeader(MinioConstants.E_TAG, putObjectResult.etag());
+            if (putObjectResult.versionId() != null) {
+                message.setHeader(MinioConstants.VERSION_ID, putObjectResult.versionId());
+            }
+
+            IOHelper.close(inputStream);
+
+            if (getConfiguration().isDeleteAfterWrite() && filePayload != null) {
+                FileUtil.deleteFile(filePayload);
+            }
+        }
+    }
+
+    private Map<String, String> determineExtraHeaders(Exchange exchange) {
+        Map<String, String> extraHeaders = new HashMap<>();
+        String storageClass = determineStorageClass(exchange);
+        if (storageClass != null) {
+            extraHeaders.put("X-Amz-Storage-Class", storageClass);
+        }
+
+        String cannedAcl = exchange.getIn().getHeader(MinioConstants.CANNED_ACL, String.class);
+        if (cannedAcl != null) {
+            extraHeaders.put("x-amz-acl", cannedAcl);
+        }
+
+        return extraHeaders;
+    }
+
+    private void copyObject(MinioClient minioClient, Exchange exchange) throws Exception {
+
+        if (getConfiguration().isPojoRequest()) {
+            CopyObjectArgs.Builder payload = exchange.getIn().getMandatoryBody(CopyObjectArgs.Builder.class);
+            if (payload != null) {
+                ObjectWriteResponse result = minioClient.copyObject(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody(result);
+            }
+        } else {
+
+            final String bucketName = determineBucketName(exchange);
+            final String sourceKey = determineObjectName(exchange);
+            final String destinationKey = exchange.getIn().getHeader(MinioConstants.DESTINATION_OBJECT_NAME, String.class);
+            final String destinationBucketName = exchange.getIn().getHeader(MinioConstants.DESTINATION_BUCKET_NAME, String.class);
+
+            if (ObjectHelper.isEmpty(destinationBucketName)) {
+                throw new IllegalArgumentException("Bucket Name Destination must be specified for copyObject Operation");
+            }
+            if (ObjectHelper.isEmpty(destinationKey)) {
+                throw new IllegalArgumentException("Destination Key must be specified for copyObject Operation");
+            }
+
+            CopySource.Builder copySourceBuilder = CopySource.builder()
+                    .bucket(bucketName)
+                    .object(sourceKey);
+
+            CopyObjectArgs.Builder copyObjectRequest = CopyObjectArgs.builder()
+                    .bucket(destinationBucketName)
+                    .object(destinationKey)
+                    .source(copySourceBuilder.build());
+
+            ObjectWriteResponse copyObjectResult = minioClient.copyObject(copyObjectRequest.build());
+
+            Message message = getMessageForResponse(exchange);
+            if (copyObjectResult.versionId() != null) {
+                message.setHeader(MinioConstants.VERSION_ID, copyObjectResult.versionId());
+            }
+        }
+    }
+
+    private void deleteObject(MinioClient minioClient, Exchange exchange) throws Exception {
+        final String bucketName = determineBucketName(exchange);
+        final String sourceKey = determineObjectName(exchange);
+        final String versionId = determineVersionId(exchange);

Review comment:
       `versionId` is not used here

##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/client/GetMinioClient.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio.client;
+
+import io.minio.MinioClient;
+import org.apache.camel.component.minio.MinioConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Creates MinIO client object according to the
+ * given endpoint, port, access key, secret key, region and secure option.
+ */
+public class GetMinioClient implements MinioCamelInternalClient {
+    private static final Logger LOG = LoggerFactory.getLogger(GetMinioClient.class);
+    private final MinioConfiguration configuration;
+
+    /**
+     * Constructor that uses the config file.
+     */
+    public GetMinioClient(MinioConfiguration configuration) {
+        LOG.trace("Creating an Minio client.");
+        this.configuration = configuration;
+    }
+
+    /**
+     * Getting the minio client.
+     *
+     * @return Minio Client.
+     */
+    @Override
+    public MinioClient getMinioClient() {
+        if (configuration.getEndpoint() != null) {
+            MinioClient.Builder minioClientRequest = MinioClient.builder();
+
+            if (configuration.getProxyPort() != null) {
+                minioClientRequest.endpoint(configuration.getEndpoint(), configuration.getProxyPort(), configuration.isSecure());
+            } else {
+                minioClientRequest.endpoint(configuration.getEndpoint());
+            }
+            if (configuration.getAccessKey() != null && configuration.getSecretKey() != null) {

Review comment:
       I wonder accessKeys, same as region are not required?

##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioProducer.java
##########
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio;
+
+import java.io.*;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import io.minio.CopyObjectArgs;
+import io.minio.CopySource;
+import io.minio.GetObjectArgs;
+import io.minio.ListObjectsArgs;
+import io.minio.MinioClient;
+import io.minio.ObjectWriteResponse;
+import io.minio.PutObjectArgs;
+import io.minio.RemoveBucketArgs;
+import io.minio.RemoveObjectArgs;
+import io.minio.RemoveObjectsArgs;
+import io.minio.Result;
+import io.minio.messages.Bucket;
+import io.minio.messages.Item;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.Message;
+import org.apache.camel.WrappedFile;
+import org.apache.camel.support.DefaultProducer;
+import org.apache.camel.util.FileUtil;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.URISupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Producer which sends messages to the Minio Simple Storage
+ */
+public class MinioProducer extends DefaultProducer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MinioProducer.class);
+
+    private transient String minioProducerToString;
+
+    public MinioProducer(final Endpoint endpoint) {
+        super(endpoint);
+    }
+
+    public static Message getMessageForResponse(final Exchange exchange) {
+        return exchange.getMessage();
+    }
+
+    @Override
+    public void process(final Exchange exchange) throws Exception {
+        MinioOperations operation = determineOperation(exchange);
+        MinioClient minioClient = getEndpoint().getMinioClient();
+        if (ObjectHelper.isEmpty(operation)) {
+            putObject(minioClient, exchange);
+        } else {
+            switch (operation) {
+                case copyObject:
+                    copyObject(minioClient, exchange);
+                    break;
+                case deleteObject:
+                    deleteObject(minioClient, exchange);
+                    break;
+                case deleteObjects:
+                    deleteObjects(minioClient, exchange);
+                    break;
+                case listBuckets:
+                    listBuckets(minioClient, exchange);
+                    break;
+                case deleteBucket:
+                    deleteBucket(minioClient, exchange);
+                    break;
+                case listObjects:
+                    listObjects(minioClient, exchange);
+                    break;
+                case getObject:
+                    getObject(minioClient, exchange);
+                    break;
+                case getPartialObject:
+                    getPartialObject(minioClient, exchange);
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unsupported operation");
+            }
+        }
+    }
+
+    public void putObject(MinioClient minioClient, final Exchange exchange) throws Exception {
+
+        if (getConfiguration().isPojoRequest()) {
+            PutObjectArgs.Builder payload = exchange.getIn().getMandatoryBody(PutObjectArgs.Builder.class);
+            if (payload != null) {
+                ObjectWriteResponse putObjectResult = minioClient.putObject(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setHeader(MinioConstants.E_TAG, putObjectResult.etag());
+                if (putObjectResult.versionId() != null) {
+                    message.setHeader(MinioConstants.VERSION_ID, putObjectResult.versionId());
+                }
+            }
+        } else {
+            final String bucketName = determineBucketName(exchange);
+            final String objectName = determineObjectName(exchange);
+            Map<String, String> objectMetadata = determineMetadata(exchange);
+            Map<String, String> extraHeaders = determineExtraHeaders(exchange);
+
+            File filePayload = null;
+            InputStream inputStream;
+            ByteArrayOutputStream baos;
+            Object object = exchange.getIn().getMandatoryBody();
+
+            // Need to check if the message body is WrappedFile
+            if (object instanceof WrappedFile) {
+                object = ((WrappedFile<?>) object).getFile();
+            }
+            if (object instanceof File) {
+                filePayload = (File) object;
+                inputStream = new FileInputStream(filePayload);
+            } else {
+                inputStream = exchange.getIn().getMandatoryBody(InputStream.class);
+                if (objectMetadata.containsKey(Exchange.CONTENT_LENGTH)) {
+                    if (objectMetadata.get("Content-Length").equals("0") && ObjectHelper.isEmpty(exchange.getProperty(Exchange.CONTENT_LENGTH))) {
+                        LOG.debug("The content length is not defined. It needs to be determined by reading the data into memory");
+                        baos = determineLengthInputStream(inputStream);
+                        objectMetadata.put("Content-Length", String.valueOf(baos.size()));
+                        inputStream = new ByteArrayInputStream(baos.toByteArray());
+                    } else {
+                        if (ObjectHelper.isNotEmpty(exchange.getProperty(Exchange.CONTENT_LENGTH))) {
+                            objectMetadata.put("Content-Length", exchange.getProperty(Exchange.CONTENT_LENGTH, String.class));
+                        }
+                    }
+                }
+            }
+            PutObjectArgs.Builder putObjectRequest = PutObjectArgs.builder()
+                    .stream(inputStream, inputStream.available(), -1)
+                    .bucket(bucketName)
+                    .object(objectName)
+                    .userMetadata(objectMetadata);
+
+            if (!extraHeaders.isEmpty()) {
+                putObjectRequest.extraHeaders(extraHeaders);
+            }
+
+            LOG.trace("Put object from exchange...");
+
+            ObjectWriteResponse putObjectResult = getEndpoint().getMinioClient().putObject(putObjectRequest.build());
+
+            LOG.trace("Received result...");
+
+            Message message = getMessageForResponse(exchange);
+            message.setHeader(MinioConstants.E_TAG, putObjectResult.etag());
+            if (putObjectResult.versionId() != null) {
+                message.setHeader(MinioConstants.VERSION_ID, putObjectResult.versionId());
+            }
+
+            IOHelper.close(inputStream);
+
+            if (getConfiguration().isDeleteAfterWrite() && filePayload != null) {
+                FileUtil.deleteFile(filePayload);
+            }
+        }
+    }
+
+    private Map<String, String> determineExtraHeaders(Exchange exchange) {
+        Map<String, String> extraHeaders = new HashMap<>();
+        String storageClass = determineStorageClass(exchange);
+        if (storageClass != null) {
+            extraHeaders.put("X-Amz-Storage-Class", storageClass);
+        }
+
+        String cannedAcl = exchange.getIn().getHeader(MinioConstants.CANNED_ACL, String.class);
+        if (cannedAcl != null) {
+            extraHeaders.put("x-amz-acl", cannedAcl);
+        }
+
+        return extraHeaders;
+    }
+
+    private void copyObject(MinioClient minioClient, Exchange exchange) throws Exception {
+
+        if (getConfiguration().isPojoRequest()) {
+            CopyObjectArgs.Builder payload = exchange.getIn().getMandatoryBody(CopyObjectArgs.Builder.class);
+            if (payload != null) {
+                ObjectWriteResponse result = minioClient.copyObject(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody(result);
+            }
+        } else {
+
+            final String bucketName = determineBucketName(exchange);
+            final String sourceKey = determineObjectName(exchange);
+            final String destinationKey = exchange.getIn().getHeader(MinioConstants.DESTINATION_OBJECT_NAME, String.class);
+            final String destinationBucketName = exchange.getIn().getHeader(MinioConstants.DESTINATION_BUCKET_NAME, String.class);
+
+            if (ObjectHelper.isEmpty(destinationBucketName)) {
+                throw new IllegalArgumentException("Bucket Name Destination must be specified for copyObject Operation");
+            }
+            if (ObjectHelper.isEmpty(destinationKey)) {
+                throw new IllegalArgumentException("Destination Key must be specified for copyObject Operation");
+            }
+
+            CopySource.Builder copySourceBuilder = CopySource.builder()
+                    .bucket(bucketName)
+                    .object(sourceKey);
+
+            CopyObjectArgs.Builder copyObjectRequest = CopyObjectArgs.builder()
+                    .bucket(destinationBucketName)
+                    .object(destinationKey)
+                    .source(copySourceBuilder.build());
+
+            ObjectWriteResponse copyObjectResult = minioClient.copyObject(copyObjectRequest.build());
+
+            Message message = getMessageForResponse(exchange);
+            if (copyObjectResult.versionId() != null) {
+                message.setHeader(MinioConstants.VERSION_ID, copyObjectResult.versionId());
+            }
+        }
+    }
+
+    private void deleteObject(MinioClient minioClient, Exchange exchange) throws Exception {
+        final String bucketName = determineBucketName(exchange);
+        final String sourceKey = determineObjectName(exchange);
+        final String versionId = determineVersionId(exchange);
+        if (getConfiguration().isPojoRequest()) {
+            RemoveObjectArgs.Builder payload = exchange.getIn().getMandatoryBody(RemoveObjectArgs.Builder.class);
+            if (payload != null) {
+                minioClient.removeObject(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody(true);
+            }
+        } else {
+
+            minioClient.removeObject(RemoveObjectArgs.builder()
+                    .bucket(bucketName)
+                    .object(sourceKey).build());
+
+            Message message = getMessageForResponse(exchange);
+            message.setBody(true);
+        }
+    }
+
+    private void deleteObjects(MinioClient minioClient, Exchange exchange) throws Exception {
+        if (getConfiguration().isPojoRequest()) {
+            RemoveObjectsArgs.Builder payload = exchange.getIn().getMandatoryBody(RemoveObjectsArgs.Builder.class);
+            if (payload != null) {
+                minioClient.removeObjects(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody(true);
+            }
+        } else {
+            throw new IllegalArgumentException("Cannot delete multiple objects without a POJO request");
+        }
+    }
+
+    private void listBuckets(MinioClient minioClient, Exchange exchange) throws Exception {
+        List<Bucket> bucketsList = minioClient.listBuckets();
+        Message message = getMessageForResponse(exchange);
+        //returns iterator of bucketList
+        message.setBody(bucketsList.iterator());
+    }
+
+    private void deleteBucket(MinioClient minioClient, Exchange exchange) throws Exception {
+        final String bucketName = determineBucketName(exchange);
+
+        if (getConfiguration().isPojoRequest()) {
+            RemoveBucketArgs.Builder payload = exchange.getIn().getMandatoryBody(RemoveBucketArgs.Builder.class);
+            if (payload != null) {
+                minioClient.removeBucket(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody("ok");
+            }
+        } else {
+
+            minioClient.removeBucket(RemoveBucketArgs.builder().bucket(bucketName).build());
+            Message message = getMessageForResponse(exchange);
+            message.setBody("ok");
+        }
+    }
+
+    private void getObject(MinioClient minioClient, Exchange exchange) throws Exception {
+
+        if (getConfiguration().isPojoRequest()) {
+            GetObjectArgs.Builder payload = exchange.getIn().getMandatoryBody(GetObjectArgs.Builder.class);
+            if (payload != null) {
+                InputStream respond = minioClient.getObject(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody(respond);
+            }
+        } else {
+            final String bucketName = determineBucketName(exchange);
+            final String sourceKey = determineObjectName(exchange);
+
+            InputStream respond = minioClient.getObject(GetObjectArgs.builder()
+                    .bucket(bucketName)
+                    .object(sourceKey)
+                    .build());
+
+            Message message = getMessageForResponse(exchange);
+            message.setBody(respond);
+        }
+    }
+
+    private void getPartialObject(MinioClient minioClient, Exchange exchange) throws Exception {
+
+        if (getConfiguration().isPojoRequest()) {
+            GetObjectArgs.Builder payload = exchange.getIn().getMandatoryBody(GetObjectArgs.Builder.class);
+            if (payload != null) {
+                InputStream respond = minioClient.getObject(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody(respond);
+            }
+        } else {
+            final String bucketName = determineBucketName(exchange);
+            final String sourceKey = determineObjectName(exchange);
+            final String offset = exchange.getIn().getHeader(MinioConstants.OFFSET, String.class);
+            final String length = exchange.getIn().getHeader(MinioConstants.LENGTH, String.class);
+
+            if (ObjectHelper.isEmpty(offset) || ObjectHelper.isEmpty(length)) {
+                throw new IllegalArgumentException("A Offset and length header must be configured to perform a partial get operation.");
+            }
+
+            InputStream respond = minioClient.getObject(GetObjectArgs.builder()
+                    .bucket(bucketName)
+                    .object(sourceKey)
+                    .offset(Long.parseLong(offset))
+                    .length(Long.parseLong(length))
+                    .build());
+
+            Message message = getMessageForResponse(exchange);
+            message.setBody(respond);
+        }
+    }
+
+    private void listObjects(MinioClient minioClient, Exchange exchange) throws InvalidPayloadException {
+
+        if (getConfiguration().isPojoRequest()) {
+            ListObjectsArgs.Builder payload = exchange.getIn().getMandatoryBody(ListObjectsArgs.Builder.class);
+            if (payload != null) {
+                Iterable<Result<Item>> objectList = minioClient.listObjects(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody(objectList);
+            }
+        } else {
+            final String bucketName = determineBucketName(exchange);
+
+            Iterable<Result<Item>> objectList = minioClient.listObjects(ListObjectsArgs.builder()
+                    .bucket(bucketName)
+                    .recursive(getConfiguration().isRecursive())
+                    .build());
+
+            Message message = getMessageForResponse(exchange);
+            message.setBody(objectList);
+        }
+    }
+
+    private MinioOperations determineOperation(Exchange exchange) {
+        MinioOperations operation = exchange.getIn().getHeader(MinioConstants.MINIO_OPERATION, MinioOperations.class);
+        if (operation == null) {
+            operation = getConfiguration().getOperation();
+        }
+        return operation;
+    }
+
+    private Map<String, String> determineMetadata(final Exchange exchange) {
+        Map<String, String> objectMetadata = new HashMap<>();
+
+        Long contentLength = exchange.getIn().getHeader(MinioConstants.CONTENT_LENGTH, Long.class);
+        if (contentLength != null) {
+            objectMetadata.put("Content-Length", String.valueOf(contentLength));
+        }
+
+        String contentType = exchange.getIn().getHeader(MinioConstants.CONTENT_TYPE, String.class);
+        if (contentType != null) {
+            objectMetadata.put("Content-Type", contentType);
+        }
+
+        String cacheControl = exchange.getIn().getHeader(MinioConstants.CACHE_CONTROL, String.class);
+        if (cacheControl != null) {
+            objectMetadata.put("Cache-Control", cacheControl);
+        }
+
+        String contentDisposition = exchange.getIn().getHeader(MinioConstants.CONTENT_DISPOSITION, String.class);
+        if (contentDisposition != null) {
+            objectMetadata.put("Content-Disposition", contentDisposition);
+        }
+
+        String contentEncoding = exchange.getIn().getHeader(MinioConstants.CONTENT_ENCODING, String.class);
+        if (contentEncoding != null) {
+            objectMetadata.put("Content-Encoding", contentEncoding);
+        }
+
+        String contentMD5 = exchange.getIn().getHeader(MinioConstants.CONTENT_MD5, String.class);
+        if (contentMD5 != null) {
+            objectMetadata.put("Content-Md5", contentMD5);
+        }
+
+        return objectMetadata;
+    }
+
+    /**
+     * Reads the bucket name from the header of the given exchange. If not
+     * provided, it's read from the endpoint configuration.
+     *
+     * @param exchange The exchange to read the header from.
+     * @return The bucket name.
+     * @throws IllegalArgumentException if the header could not be determined.
+     */
+    private String determineBucketName(final Exchange exchange) {
+        String bucketName = exchange.getIn().getHeader(MinioConstants.BUCKET_NAME, String.class);
+
+        if (ObjectHelper.isEmpty(bucketName)) {
+            bucketName = getConfiguration().getBucketName();
+            LOG.trace("Minio Bucket name header is missing, using default one [{}]", bucketName);
+        }
+
+        if (bucketName == null) {
+            throw new IllegalArgumentException("Minio Bucket name header is missing or not configured.");
+        }
+
+        return bucketName;
+    }
+
+    private String determineObjectName(final Exchange exchange) {
+        String objectName = exchange.getIn().getHeader(MinioConstants.OBJECT_NAME, String.class);
+        if (ObjectHelper.isEmpty(objectName)) {
+            objectName = getConfiguration().getKeyName();
+        }
+        if (objectName == null) {
+            throw new IllegalArgumentException("Minio Key header is missing.");
+        }
+        return objectName;
+    }
+
+    private String determineStorageClass(final Exchange exchange) {
+        String storageClass = exchange.getIn().getHeader(MinioConstants.STORAGE_CLASS, String.class);
+        if (storageClass == null) {
+            storageClass = getConfiguration().getStorageClass();
+        }
+
+        return storageClass;
+    }
+
+    private String determineVersionId(final Exchange exchange) {
+        String versionId = exchange.getIn().getHeader(MinioConstants.VERSION_ID, String.class);
+        if (versionId == null) {
+            versionId = getConfiguration().getVersionId();
+        }
+
+        return versionId;
+    }
+
+    private ByteArrayOutputStream determineLengthInputStream(InputStream inputStream) throws IOException {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        byte[] bytes = new byte[1024];

Review comment:
       why is it `1024` bits here? Can you please move this into constant and with some comment why you opted for `1024`bits?

##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioProducer.java
##########
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio;
+
+import java.io.*;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import io.minio.CopyObjectArgs;
+import io.minio.CopySource;
+import io.minio.GetObjectArgs;
+import io.minio.ListObjectsArgs;
+import io.minio.MinioClient;
+import io.minio.ObjectWriteResponse;
+import io.minio.PutObjectArgs;
+import io.minio.RemoveBucketArgs;
+import io.minio.RemoveObjectArgs;
+import io.minio.RemoveObjectsArgs;
+import io.minio.Result;
+import io.minio.messages.Bucket;
+import io.minio.messages.Item;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.Message;
+import org.apache.camel.WrappedFile;
+import org.apache.camel.support.DefaultProducer;
+import org.apache.camel.util.FileUtil;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.URISupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Producer which sends messages to the Minio Simple Storage
+ */
+public class MinioProducer extends DefaultProducer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MinioProducer.class);
+
+    private transient String minioProducerToString;
+
+    public MinioProducer(final Endpoint endpoint) {
+        super(endpoint);
+    }
+
+    public static Message getMessageForResponse(final Exchange exchange) {
+        return exchange.getMessage();
+    }
+
+    @Override
+    public void process(final Exchange exchange) throws Exception {
+        MinioOperations operation = determineOperation(exchange);
+        MinioClient minioClient = getEndpoint().getMinioClient();
+        if (ObjectHelper.isEmpty(operation)) {
+            putObject(minioClient, exchange);
+        } else {
+            switch (operation) {
+                case copyObject:
+                    copyObject(minioClient, exchange);
+                    break;
+                case deleteObject:
+                    deleteObject(minioClient, exchange);
+                    break;
+                case deleteObjects:
+                    deleteObjects(minioClient, exchange);
+                    break;
+                case listBuckets:
+                    listBuckets(minioClient, exchange);
+                    break;
+                case deleteBucket:
+                    deleteBucket(minioClient, exchange);
+                    break;
+                case listObjects:
+                    listObjects(minioClient, exchange);
+                    break;
+                case getObject:
+                    getObject(minioClient, exchange);
+                    break;
+                case getPartialObject:
+                    getPartialObject(minioClient, exchange);
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unsupported operation");
+            }
+        }
+    }
+
+    public void putObject(MinioClient minioClient, final Exchange exchange) throws Exception {
+
+        if (getConfiguration().isPojoRequest()) {
+            PutObjectArgs.Builder payload = exchange.getIn().getMandatoryBody(PutObjectArgs.Builder.class);
+            if (payload != null) {
+                ObjectWriteResponse putObjectResult = minioClient.putObject(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setHeader(MinioConstants.E_TAG, putObjectResult.etag());
+                if (putObjectResult.versionId() != null) {
+                    message.setHeader(MinioConstants.VERSION_ID, putObjectResult.versionId());
+                }
+            }
+        } else {
+            final String bucketName = determineBucketName(exchange);
+            final String objectName = determineObjectName(exchange);
+            Map<String, String> objectMetadata = determineMetadata(exchange);
+            Map<String, String> extraHeaders = determineExtraHeaders(exchange);
+
+            File filePayload = null;
+            InputStream inputStream;
+            ByteArrayOutputStream baos;
+            Object object = exchange.getIn().getMandatoryBody();
+
+            // Need to check if the message body is WrappedFile
+            if (object instanceof WrappedFile) {
+                object = ((WrappedFile<?>) object).getFile();
+            }
+            if (object instanceof File) {
+                filePayload = (File) object;
+                inputStream = new FileInputStream(filePayload);
+            } else {
+                inputStream = exchange.getIn().getMandatoryBody(InputStream.class);
+                if (objectMetadata.containsKey(Exchange.CONTENT_LENGTH)) {
+                    if (objectMetadata.get("Content-Length").equals("0") && ObjectHelper.isEmpty(exchange.getProperty(Exchange.CONTENT_LENGTH))) {
+                        LOG.debug("The content length is not defined. It needs to be determined by reading the data into memory");
+                        baos = determineLengthInputStream(inputStream);
+                        objectMetadata.put("Content-Length", String.valueOf(baos.size()));
+                        inputStream = new ByteArrayInputStream(baos.toByteArray());
+                    } else {
+                        if (ObjectHelper.isNotEmpty(exchange.getProperty(Exchange.CONTENT_LENGTH))) {
+                            objectMetadata.put("Content-Length", exchange.getProperty(Exchange.CONTENT_LENGTH, String.class));
+                        }
+                    }
+                }
+            }
+            PutObjectArgs.Builder putObjectRequest = PutObjectArgs.builder()
+                    .stream(inputStream, inputStream.available(), -1)
+                    .bucket(bucketName)
+                    .object(objectName)
+                    .userMetadata(objectMetadata);
+
+            if (!extraHeaders.isEmpty()) {
+                putObjectRequest.extraHeaders(extraHeaders);
+            }
+
+            LOG.trace("Put object from exchange...");
+
+            ObjectWriteResponse putObjectResult = getEndpoint().getMinioClient().putObject(putObjectRequest.build());
+
+            LOG.trace("Received result...");
+
+            Message message = getMessageForResponse(exchange);
+            message.setHeader(MinioConstants.E_TAG, putObjectResult.etag());
+            if (putObjectResult.versionId() != null) {
+                message.setHeader(MinioConstants.VERSION_ID, putObjectResult.versionId());
+            }
+
+            IOHelper.close(inputStream);
+
+            if (getConfiguration().isDeleteAfterWrite() && filePayload != null) {
+                FileUtil.deleteFile(filePayload);
+            }
+        }
+    }
+
+    private Map<String, String> determineExtraHeaders(Exchange exchange) {
+        Map<String, String> extraHeaders = new HashMap<>();
+        String storageClass = determineStorageClass(exchange);
+        if (storageClass != null) {
+            extraHeaders.put("X-Amz-Storage-Class", storageClass);
+        }
+
+        String cannedAcl = exchange.getIn().getHeader(MinioConstants.CANNED_ACL, String.class);
+        if (cannedAcl != null) {
+            extraHeaders.put("x-amz-acl", cannedAcl);
+        }
+
+        return extraHeaders;
+    }
+
+    private void copyObject(MinioClient minioClient, Exchange exchange) throws Exception {
+
+        if (getConfiguration().isPojoRequest()) {
+            CopyObjectArgs.Builder payload = exchange.getIn().getMandatoryBody(CopyObjectArgs.Builder.class);
+            if (payload != null) {
+                ObjectWriteResponse result = minioClient.copyObject(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody(result);
+            }
+        } else {
+
+            final String bucketName = determineBucketName(exchange);
+            final String sourceKey = determineObjectName(exchange);
+            final String destinationKey = exchange.getIn().getHeader(MinioConstants.DESTINATION_OBJECT_NAME, String.class);
+            final String destinationBucketName = exchange.getIn().getHeader(MinioConstants.DESTINATION_BUCKET_NAME, String.class);
+
+            if (ObjectHelper.isEmpty(destinationBucketName)) {
+                throw new IllegalArgumentException("Bucket Name Destination must be specified for copyObject Operation");
+            }
+            if (ObjectHelper.isEmpty(destinationKey)) {
+                throw new IllegalArgumentException("Destination Key must be specified for copyObject Operation");
+            }
+
+            CopySource.Builder copySourceBuilder = CopySource.builder()
+                    .bucket(bucketName)
+                    .object(sourceKey);
+
+            CopyObjectArgs.Builder copyObjectRequest = CopyObjectArgs.builder()
+                    .bucket(destinationBucketName)
+                    .object(destinationKey)
+                    .source(copySourceBuilder.build());
+
+            ObjectWriteResponse copyObjectResult = minioClient.copyObject(copyObjectRequest.build());
+
+            Message message = getMessageForResponse(exchange);
+            if (copyObjectResult.versionId() != null) {
+                message.setHeader(MinioConstants.VERSION_ID, copyObjectResult.versionId());
+            }
+        }
+    }
+
+    private void deleteObject(MinioClient minioClient, Exchange exchange) throws Exception {
+        final String bucketName = determineBucketName(exchange);
+        final String sourceKey = determineObjectName(exchange);
+        final String versionId = determineVersionId(exchange);
+        if (getConfiguration().isPojoRequest()) {
+            RemoveObjectArgs.Builder payload = exchange.getIn().getMandatoryBody(RemoveObjectArgs.Builder.class);
+            if (payload != null) {
+                minioClient.removeObject(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody(true);
+            }
+        } else {
+
+            minioClient.removeObject(RemoveObjectArgs.builder()
+                    .bucket(bucketName)
+                    .object(sourceKey).build());
+
+            Message message = getMessageForResponse(exchange);
+            message.setBody(true);
+        }
+    }
+
+    private void deleteObjects(MinioClient minioClient, Exchange exchange) throws Exception {
+        if (getConfiguration().isPojoRequest()) {
+            RemoveObjectsArgs.Builder payload = exchange.getIn().getMandatoryBody(RemoveObjectsArgs.Builder.class);
+            if (payload != null) {
+                minioClient.removeObjects(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody(true);
+            }
+        } else {
+            throw new IllegalArgumentException("Cannot delete multiple objects without a POJO request");
+        }
+    }
+
+    private void listBuckets(MinioClient minioClient, Exchange exchange) throws Exception {
+        List<Bucket> bucketsList = minioClient.listBuckets();
+        Message message = getMessageForResponse(exchange);
+        //returns iterator of bucketList
+        message.setBody(bucketsList.iterator());
+    }
+
+    private void deleteBucket(MinioClient minioClient, Exchange exchange) throws Exception {
+        final String bucketName = determineBucketName(exchange);
+
+        if (getConfiguration().isPojoRequest()) {
+            RemoveBucketArgs.Builder payload = exchange.getIn().getMandatoryBody(RemoveBucketArgs.Builder.class);
+            if (payload != null) {
+                minioClient.removeBucket(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody("ok");
+            }
+        } else {
+
+            minioClient.removeBucket(RemoveBucketArgs.builder().bucket(bucketName).build());
+            Message message = getMessageForResponse(exchange);
+            message.setBody("ok");
+        }
+    }
+
+    private void getObject(MinioClient minioClient, Exchange exchange) throws Exception {
+
+        if (getConfiguration().isPojoRequest()) {
+            GetObjectArgs.Builder payload = exchange.getIn().getMandatoryBody(GetObjectArgs.Builder.class);
+            if (payload != null) {
+                InputStream respond = minioClient.getObject(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody(respond);
+            }
+        } else {
+            final String bucketName = determineBucketName(exchange);
+            final String sourceKey = determineObjectName(exchange);
+
+            InputStream respond = minioClient.getObject(GetObjectArgs.builder()
+                    .bucket(bucketName)
+                    .object(sourceKey)
+                    .build());
+
+            Message message = getMessageForResponse(exchange);
+            message.setBody(respond);
+        }
+    }
+
+    private void getPartialObject(MinioClient minioClient, Exchange exchange) throws Exception {
+
+        if (getConfiguration().isPojoRequest()) {
+            GetObjectArgs.Builder payload = exchange.getIn().getMandatoryBody(GetObjectArgs.Builder.class);
+            if (payload != null) {
+                InputStream respond = minioClient.getObject(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody(respond);
+            }
+        } else {
+            final String bucketName = determineBucketName(exchange);
+            final String sourceKey = determineObjectName(exchange);
+            final String offset = exchange.getIn().getHeader(MinioConstants.OFFSET, String.class);
+            final String length = exchange.getIn().getHeader(MinioConstants.LENGTH, String.class);
+
+            if (ObjectHelper.isEmpty(offset) || ObjectHelper.isEmpty(length)) {
+                throw new IllegalArgumentException("A Offset and length header must be configured to perform a partial get operation.");
+            }
+
+            InputStream respond = minioClient.getObject(GetObjectArgs.builder()
+                    .bucket(bucketName)
+                    .object(sourceKey)
+                    .offset(Long.parseLong(offset))
+                    .length(Long.parseLong(length))
+                    .build());
+
+            Message message = getMessageForResponse(exchange);
+            message.setBody(respond);
+        }
+    }
+
+    private void listObjects(MinioClient minioClient, Exchange exchange) throws InvalidPayloadException {
+
+        if (getConfiguration().isPojoRequest()) {
+            ListObjectsArgs.Builder payload = exchange.getIn().getMandatoryBody(ListObjectsArgs.Builder.class);
+            if (payload != null) {
+                Iterable<Result<Item>> objectList = minioClient.listObjects(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody(objectList);
+            }
+        } else {
+            final String bucketName = determineBucketName(exchange);
+
+            Iterable<Result<Item>> objectList = minioClient.listObjects(ListObjectsArgs.builder()
+                    .bucket(bucketName)
+                    .recursive(getConfiguration().isRecursive())
+                    .build());
+
+            Message message = getMessageForResponse(exchange);
+            message.setBody(objectList);
+        }
+    }
+
+    private MinioOperations determineOperation(Exchange exchange) {
+        MinioOperations operation = exchange.getIn().getHeader(MinioConstants.MINIO_OPERATION, MinioOperations.class);
+        if (operation == null) {
+            operation = getConfiguration().getOperation();
+        }
+        return operation;
+    }
+
+    private Map<String, String> determineMetadata(final Exchange exchange) {
+        Map<String, String> objectMetadata = new HashMap<>();
+
+        Long contentLength = exchange.getIn().getHeader(MinioConstants.CONTENT_LENGTH, Long.class);
+        if (contentLength != null) {
+            objectMetadata.put("Content-Length", String.valueOf(contentLength));
+        }
+
+        String contentType = exchange.getIn().getHeader(MinioConstants.CONTENT_TYPE, String.class);
+        if (contentType != null) {
+            objectMetadata.put("Content-Type", contentType);
+        }
+
+        String cacheControl = exchange.getIn().getHeader(MinioConstants.CACHE_CONTROL, String.class);
+        if (cacheControl != null) {
+            objectMetadata.put("Cache-Control", cacheControl);
+        }
+
+        String contentDisposition = exchange.getIn().getHeader(MinioConstants.CONTENT_DISPOSITION, String.class);
+        if (contentDisposition != null) {
+            objectMetadata.put("Content-Disposition", contentDisposition);
+        }
+
+        String contentEncoding = exchange.getIn().getHeader(MinioConstants.CONTENT_ENCODING, String.class);
+        if (contentEncoding != null) {
+            objectMetadata.put("Content-Encoding", contentEncoding);
+        }
+
+        String contentMD5 = exchange.getIn().getHeader(MinioConstants.CONTENT_MD5, String.class);
+        if (contentMD5 != null) {
+            objectMetadata.put("Content-Md5", contentMD5);
+        }
+
+        return objectMetadata;
+    }
+
+    /**
+     * Reads the bucket name from the header of the given exchange. If not
+     * provided, it's read from the endpoint configuration.
+     *
+     * @param exchange The exchange to read the header from.
+     * @return The bucket name.
+     * @throws IllegalArgumentException if the header could not be determined.
+     */
+    private String determineBucketName(final Exchange exchange) {
+        String bucketName = exchange.getIn().getHeader(MinioConstants.BUCKET_NAME, String.class);
+
+        if (ObjectHelper.isEmpty(bucketName)) {
+            bucketName = getConfiguration().getBucketName();
+            LOG.trace("Minio Bucket name header is missing, using default one [{}]", bucketName);
+        }
+
+        if (bucketName == null) {

Review comment:
       this check here is not needed. `ObjectHelper.isEmpty(bucketName)` will check if string is empty or null

##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java
##########
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio;
+
+import java.io.InputStream;
+import java.util.*;

Review comment:
       Please avoid wildcard import

##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java
##########
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio;
+
+import java.io.InputStream;
+import java.util.*;
+
+import io.minio.CopyObjectArgs;
+import io.minio.CopySource;
+import io.minio.GetObjectArgs;
+import io.minio.ListObjectsArgs;
+import io.minio.MinioClient;
+import io.minio.RemoveObjectArgs;
+import io.minio.errors.MinioException;
+import io.minio.messages.Contents;
+import io.minio.messages.ListBucketResultV2;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
+import org.apache.camel.Processor;
+import org.apache.camel.spi.Synchronization;
+import org.apache.camel.support.ScheduledBatchPollingConsumer;
+import org.apache.camel.util.CastUtils;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.URISupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Consumer of messages from the Minio Storage Service.
+ */
+public class MinioConsumer extends ScheduledBatchPollingConsumer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MinioConsumer.class);
+
+    private String continuationToken;
+    private transient String minioConsumerToString;
+
+    public MinioConsumer(MinioEndpoint endpoint, Processor processor) {
+        super(endpoint, processor);
+    }
+
+    @Override
+    protected int poll() throws Exception {
+        // must reset for each poll
+        shutdownRunningTask = null;
+        pendingExchanges = 0;
+
+        String bucketName = getConfiguration().getBucketName();
+        String objectName = getConfiguration().getObjectName();
+        MinioClient minioClient = getMinioClient();
+        Queue<Exchange> exchanges;
+
+        if (objectName != null) {
+            LOG.trace("Getting object in bucket {} with object name {}...", bucketName, objectName);
+
+            InputStream minioObject = getObject(bucketName, minioClient, objectName);
+            exchanges = createExchanges(minioObject, objectName);
+
+        } else {
+
+            LOG.trace("Queueing objects in bucket [{}]...", bucketName);
+
+            ListObjectsArgs.Builder listObjectRequest = ListObjectsArgs.builder()
+                    .bucket(bucketName)
+                    .includeUserMetadata(getConfiguration().isIncludeUserMetadata())
+                    .includeVersions(getConfiguration().isIncludeVersions())
+                    .recursive(getConfiguration().isRecursive())
+                    .useApiVersion1(getConfiguration().isUseVersion1());
+
+            if (getConfiguration().getDelimiter() != null) {
+                listObjectRequest.delimiter(getConfiguration().getDelimiter());
+            }
+
+            if (maxMessagesPerPoll > 0) {
+                listObjectRequest.maxKeys(maxMessagesPerPoll);
+            }
+
+            if (getConfiguration().getPrefix() != null) {
+                listObjectRequest.prefix(getConfiguration().getPrefix());
+            }
+
+            if (getConfiguration().getStartAfter() != null) {
+                listObjectRequest.startAfter(getConfiguration().getStartAfter());
+            }
+
+            // if there was a marker from previous poll then use that to
+            // continue from where we left last time
+            if (continuationToken != null) {
+                LOG.trace("Resuming from marker: {}", continuationToken);
+                listObjectRequest.continuationToken(continuationToken);
+            }
+
+            // TODO: Check for validity of the statement
+            ListBucketResultV2 listObjects = (ListBucketResultV2) getMinioClient().listObjects(listObjectRequest.build());
+
+            if (listObjects.isTruncated()) {
+                LOG.trace("Returned list is truncated, so setting next marker: {}", continuationToken);
+                continuationToken = listObjects.nextContinuationToken();
+
+            } else {
+                // no more data so clear marker
+                continuationToken = null;
+            }
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Found {} objects in bucket [{}]...", listObjects.contents().size(), bucketName);
+            }
+
+            exchanges = createExchanges(listObjects.contents());
+        }
+        return processBatch(CastUtils.cast(exchanges));
+    }
+
+    protected Queue<Exchange> createExchanges(InputStream objectStream, String objectName) throws Exception {
+        Queue<Exchange> answer = new LinkedList<>();
+        Exchange exchange = getEndpoint().createExchange(objectStream, objectName);
+        answer.add(exchange);
+        IOHelper.close(objectStream);
+        return answer;
+    }
+
+    protected Queue<Exchange> createExchanges(List<Contents> minioObjectSummaries) throws Exception {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Received {} messages in this poll", minioObjectSummaries.size());
+        }
+        String bucketName = getConfiguration().getBucketName();
+        Collection<InputStream> minioObjects = new ArrayList<>();
+        Queue<Exchange> answer = new LinkedList<>();
+        try {
+            if (getConfiguration().isIncludeFolders()) {
+                for (Contents minioObjectSummary : minioObjectSummaries) {
+                    InputStream minioObject = getObject(bucketName, getMinioClient(), minioObjectSummary.objectName());
+                    minioObjects.add(minioObject);
+                    Exchange exchange = getEndpoint().createExchange(minioObject, minioObjectSummary.objectName());
+                    answer.add(exchange);
+                }
+            } else {
+                for (Contents minioObjectSummary : minioObjectSummaries) {
+                    // ignore if directory
+                    if (!minioObjectSummary.isDir()) {
+                        InputStream minioObject = getObject(bucketName, getMinioClient(), minioObjectSummary.objectName());
+                        minioObjects.add(minioObject);
+                        Exchange exchange = getEndpoint().createExchange(minioObject, minioObjectSummary.objectName());
+                        answer.add(exchange);
+                    }
+                }
+            }
+
+        } catch (Throwable e) {
+            LOG.warn("Error getting MinioObject due: {}", e.getMessage());
+            throw e;
+
+        } finally {
+            // ensure all previous gathered minio objects are closed
+            // if there was an exception creating the exchanges in this batch
+            minioObjects.forEach(IOHelper::close);
+        }
+
+        return answer;
+    }
+
+    private InputStream getObject(String bucketName, MinioClient minioClient, String objectName) throws Exception {
+        GetObjectArgs.Builder getObjectRequest = GetObjectArgs.builder().bucket(bucketName).object(objectName);
+
+        if (getConfiguration().getServerSideEncryptionCustomerKey() != null) {
+            getObjectRequest.ssec(getConfiguration().getServerSideEncryptionCustomerKey());
+        }
+        if (getConfiguration().getOffset() != 0) {
+            getObjectRequest.offset(getConfiguration().getOffset());
+        }
+        if (getConfiguration().getLength() != 0) {
+            getObjectRequest.length(getConfiguration().getLength());
+        }
+        if (getConfiguration().getVersionId() != null) {
+            getObjectRequest.versionId(getConfiguration().getVersionId());
+        }
+        if (getConfiguration().getMatchETag() != null) {
+            getObjectRequest.matchETag(getConfiguration().getMatchETag());
+        }
+        if (getConfiguration().getNotMatchETag() != null) {
+            getObjectRequest.notMatchETag(getConfiguration().getNotMatchETag());
+        }
+        if (getConfiguration().getModifiedSince() != null) {
+            getObjectRequest.modifiedSince(getConfiguration().getModifiedSince());
+        }
+        if (getConfiguration().getUnModifiedSince() != null) {
+            getObjectRequest.unmodifiedSince(getConfiguration().getUnModifiedSince());
+        }
+
+        return minioClient.getObject(getObjectRequest.build());
+    }
+
+    @Override
+    public int processBatch(Queue<Object> exchanges) {
+        int total = exchanges.size();
+
+        for (int index = 0; index < total && isBatchAllowed(); index++) {
+            // only loop if we are started (allowed to run)
+            final Exchange exchange = ObjectHelper.cast(Exchange.class, exchanges.poll());
+            // add current index and total as properties
+            exchange.setProperty(Exchange.BATCH_INDEX, index);
+            exchange.setProperty(Exchange.BATCH_SIZE, total);
+            exchange.setProperty(Exchange.BATCH_COMPLETE, index == total - 1);
+
+            // update pending number of exchanges
+            pendingExchanges = total - index - 1;
+
+            // add on completion to handle after work when the exchange is done
+            exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() {
+                public void onComplete(Exchange exchange) {
+                    processCommit(exchange);
+                }
+
+                public void onFailure(Exchange exchange) {
+                    processRollback(exchange);
+                }
+
+                @Override
+                public String toString() {
+                    return "MinioConsumerOnCompletion";
+                }
+            });
+
+            LOG.trace("Processing exchange ...");
+            getAsyncProcessor().process(exchange, doneSync -> LOG.trace("Processing exchange done."));
+        }
+
+        return total;
+    }
+
+    /**
+     * Strategy to delete the message after being processed.
+     *
+     * @param exchange the exchange
+     */
+    protected void processCommit(Exchange exchange) {
+        try {
+            String srcBucketName = exchange.getIn().getHeader(MinioConstants.BUCKET_NAME, String.class);
+            String srcObjectName = exchange.getIn().getHeader(MinioConstants.OBJECT_NAME, String.class);
+
+            if (getConfiguration().isDeleteAfterRead() || getConfiguration().isMoveAfterRead()) {
+                if (getConfiguration().isMoveAfterRead()) {
+                    copyObject(srcBucketName, srcObjectName);
+                    LOG.trace("Copied object from bucket {} with objectName {} to bucket {}...",
+                            srcBucketName, srcObjectName, getConfiguration().getDestinationBucketName());
+                }
+
+                LOG.trace("Deleting object from bucket {} with objectName {}...", srcBucketName, srcObjectName);
+
+                RemoveObjectArgs.Builder removeObjectRequest = RemoveObjectArgs.builder()
+                        .bucket(srcBucketName)
+                        .object(srcObjectName)
+                        .bypassGovernanceMode(getConfiguration().isBypassGovernanceMode());
+
+                if (getConfiguration().getVersionId() != null) {
+                    removeObjectRequest.versionId(getConfiguration().getVersionId());
+                }
+                if (getConfiguration().isBypassGovernanceMode()) {
+                    removeObjectRequest.versionId(getConfiguration().getVersionId());
+                }
+                getMinioClient().removeObject(removeObjectRequest.build());
+
+                LOG.trace("Deleted object from bucket {} with objectName {}...", srcBucketName, srcObjectName);
+            }
+        } catch (MinioException e) {
+            getExceptionHandler().handleException("Error occurred during moving or deleting object. This exception is ignored.",
+                    exchange, e);
+        } catch (Exception e) {
+            LOG.trace("Error process commit...");

Review comment:
       Should we handle this exception as well?

##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java
##########
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio;
+
+import java.io.InputStream;
+import java.util.*;
+
+import io.minio.CopyObjectArgs;
+import io.minio.CopySource;
+import io.minio.GetObjectArgs;
+import io.minio.ListObjectsArgs;
+import io.minio.MinioClient;
+import io.minio.RemoveObjectArgs;
+import io.minio.errors.MinioException;
+import io.minio.messages.Contents;
+import io.minio.messages.ListBucketResultV2;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
+import org.apache.camel.Processor;
+import org.apache.camel.spi.Synchronization;
+import org.apache.camel.support.ScheduledBatchPollingConsumer;
+import org.apache.camel.util.CastUtils;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.URISupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Consumer of messages from the Minio Storage Service.
+ */
+public class MinioConsumer extends ScheduledBatchPollingConsumer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MinioConsumer.class);
+
+    private String continuationToken;
+    private transient String minioConsumerToString;
+
+    public MinioConsumer(MinioEndpoint endpoint, Processor processor) {
+        super(endpoint, processor);
+    }
+
+    @Override
+    protected int poll() throws Exception {
+        // must reset for each poll
+        shutdownRunningTask = null;
+        pendingExchanges = 0;
+
+        String bucketName = getConfiguration().getBucketName();
+        String objectName = getConfiguration().getObjectName();
+        MinioClient minioClient = getMinioClient();
+        Queue<Exchange> exchanges;
+
+        if (objectName != null) {
+            LOG.trace("Getting object in bucket {} with object name {}...", bucketName, objectName);
+
+            InputStream minioObject = getObject(bucketName, minioClient, objectName);
+            exchanges = createExchanges(minioObject, objectName);
+
+        } else {
+
+            LOG.trace("Queueing objects in bucket [{}]...", bucketName);
+
+            ListObjectsArgs.Builder listObjectRequest = ListObjectsArgs.builder()
+                    .bucket(bucketName)
+                    .includeUserMetadata(getConfiguration().isIncludeUserMetadata())
+                    .includeVersions(getConfiguration().isIncludeVersions())
+                    .recursive(getConfiguration().isRecursive())
+                    .useApiVersion1(getConfiguration().isUseVersion1());
+
+            if (getConfiguration().getDelimiter() != null) {
+                listObjectRequest.delimiter(getConfiguration().getDelimiter());
+            }
+
+            if (maxMessagesPerPoll > 0) {
+                listObjectRequest.maxKeys(maxMessagesPerPoll);
+            }
+
+            if (getConfiguration().getPrefix() != null) {
+                listObjectRequest.prefix(getConfiguration().getPrefix());
+            }
+
+            if (getConfiguration().getStartAfter() != null) {
+                listObjectRequest.startAfter(getConfiguration().getStartAfter());
+            }
+
+            // if there was a marker from previous poll then use that to
+            // continue from where we left last time
+            if (continuationToken != null) {
+                LOG.trace("Resuming from marker: {}", continuationToken);
+                listObjectRequest.continuationToken(continuationToken);
+            }
+
+            // TODO: Check for validity of the statement
+            ListBucketResultV2 listObjects = (ListBucketResultV2) getMinioClient().listObjects(listObjectRequest.build());
+
+            if (listObjects.isTruncated()) {
+                LOG.trace("Returned list is truncated, so setting next marker: {}", continuationToken);
+                continuationToken = listObjects.nextContinuationToken();
+
+            } else {
+                // no more data so clear marker
+                continuationToken = null;
+            }
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Found {} objects in bucket [{}]...", listObjects.contents().size(), bucketName);
+            }
+
+            exchanges = createExchanges(listObjects.contents());
+        }
+        return processBatch(CastUtils.cast(exchanges));
+    }
+
+    protected Queue<Exchange> createExchanges(InputStream objectStream, String objectName) throws Exception {
+        Queue<Exchange> answer = new LinkedList<>();
+        Exchange exchange = getEndpoint().createExchange(objectStream, objectName);
+        answer.add(exchange);
+        IOHelper.close(objectStream);
+        return answer;
+    }
+
+    protected Queue<Exchange> createExchanges(List<Contents> minioObjectSummaries) throws Exception {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Received {} messages in this poll", minioObjectSummaries.size());
+        }
+        String bucketName = getConfiguration().getBucketName();
+        Collection<InputStream> minioObjects = new ArrayList<>();
+        Queue<Exchange> answer = new LinkedList<>();
+        try {
+            if (getConfiguration().isIncludeFolders()) {
+                for (Contents minioObjectSummary : minioObjectSummaries) {
+                    InputStream minioObject = getObject(bucketName, getMinioClient(), minioObjectSummary.objectName());
+                    minioObjects.add(minioObject);
+                    Exchange exchange = getEndpoint().createExchange(minioObject, minioObjectSummary.objectName());
+                    answer.add(exchange);
+                }
+            } else {
+                for (Contents minioObjectSummary : minioObjectSummaries) {
+                    // ignore if directory
+                    if (!minioObjectSummary.isDir()) {
+                        InputStream minioObject = getObject(bucketName, getMinioClient(), minioObjectSummary.objectName());
+                        minioObjects.add(minioObject);
+                        Exchange exchange = getEndpoint().createExchange(minioObject, minioObjectSummary.objectName());
+                        answer.add(exchange);
+                    }
+                }
+            }
+
+        } catch (Throwable e) {
+            LOG.warn("Error getting MinioObject due: {}", e.getMessage());
+            throw e;
+
+        } finally {
+            // ensure all previous gathered minio objects are closed
+            // if there was an exception creating the exchanges in this batch
+            minioObjects.forEach(IOHelper::close);
+        }
+
+        return answer;
+    }
+
+    private InputStream getObject(String bucketName, MinioClient minioClient, String objectName) throws Exception {
+        GetObjectArgs.Builder getObjectRequest = GetObjectArgs.builder().bucket(bucketName).object(objectName);
+
+        if (getConfiguration().getServerSideEncryptionCustomerKey() != null) {
+            getObjectRequest.ssec(getConfiguration().getServerSideEncryptionCustomerKey());
+        }
+        if (getConfiguration().getOffset() != 0) {

Review comment:
       I am bit not comfortable of having check with not equal 0, odd cases like minus could slip. I'd recommend changing this to `> 0` instead. Same for the other checks

##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/client/GetMinioClient.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio.client;
+
+import io.minio.MinioClient;
+import org.apache.camel.component.minio.MinioConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Creates MinIO client object according to the
+ * given endpoint, port, access key, secret key, region and secure option.
+ */
+public class GetMinioClient implements MinioCamelInternalClient {

Review comment:
       This class name is weird for the client impl. If this the remote client from the factory, something like `MinioRemoteClientImpl` would make more sense

##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java
##########
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio;
+
+import java.io.InputStream;
+import java.util.*;
+
+import io.minio.CopyObjectArgs;
+import io.minio.CopySource;
+import io.minio.GetObjectArgs;
+import io.minio.ListObjectsArgs;
+import io.minio.MinioClient;
+import io.minio.RemoveObjectArgs;
+import io.minio.errors.MinioException;
+import io.minio.messages.Contents;
+import io.minio.messages.ListBucketResultV2;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
+import org.apache.camel.Processor;
+import org.apache.camel.spi.Synchronization;
+import org.apache.camel.support.ScheduledBatchPollingConsumer;
+import org.apache.camel.util.CastUtils;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.URISupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Consumer of messages from the Minio Storage Service.
+ */
+public class MinioConsumer extends ScheduledBatchPollingConsumer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MinioConsumer.class);
+
+    private String continuationToken;
+    private transient String minioConsumerToString;
+
+    public MinioConsumer(MinioEndpoint endpoint, Processor processor) {
+        super(endpoint, processor);
+    }
+
+    @Override
+    protected int poll() throws Exception {
+        // must reset for each poll
+        shutdownRunningTask = null;
+        pendingExchanges = 0;
+
+        String bucketName = getConfiguration().getBucketName();
+        String objectName = getConfiguration().getObjectName();
+        MinioClient minioClient = getMinioClient();
+        Queue<Exchange> exchanges;
+
+        if (objectName != null) {
+            LOG.trace("Getting object in bucket {} with object name {}...", bucketName, objectName);
+
+            InputStream minioObject = getObject(bucketName, minioClient, objectName);
+            exchanges = createExchanges(minioObject, objectName);
+
+        } else {
+
+            LOG.trace("Queueing objects in bucket [{}]...", bucketName);
+
+            ListObjectsArgs.Builder listObjectRequest = ListObjectsArgs.builder()
+                    .bucket(bucketName)
+                    .includeUserMetadata(getConfiguration().isIncludeUserMetadata())
+                    .includeVersions(getConfiguration().isIncludeVersions())
+                    .recursive(getConfiguration().isRecursive())
+                    .useApiVersion1(getConfiguration().isUseVersion1());
+
+            if (getConfiguration().getDelimiter() != null) {
+                listObjectRequest.delimiter(getConfiguration().getDelimiter());
+            }
+
+            if (maxMessagesPerPoll > 0) {
+                listObjectRequest.maxKeys(maxMessagesPerPoll);
+            }
+
+            if (getConfiguration().getPrefix() != null) {
+                listObjectRequest.prefix(getConfiguration().getPrefix());
+            }
+
+            if (getConfiguration().getStartAfter() != null) {
+                listObjectRequest.startAfter(getConfiguration().getStartAfter());
+            }
+
+            // if there was a marker from previous poll then use that to
+            // continue from where we left last time
+            if (continuationToken != null) {
+                LOG.trace("Resuming from marker: {}", continuationToken);
+                listObjectRequest.continuationToken(continuationToken);
+            }
+
+            // TODO: Check for validity of the statement
+            ListBucketResultV2 listObjects = (ListBucketResultV2) getMinioClient().listObjects(listObjectRequest.build());
+
+            if (listObjects.isTruncated()) {
+                LOG.trace("Returned list is truncated, so setting next marker: {}", continuationToken);
+                continuationToken = listObjects.nextContinuationToken();
+
+            } else {
+                // no more data so clear marker
+                continuationToken = null;
+            }
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Found {} objects in bucket [{}]...", listObjects.contents().size(), bucketName);
+            }
+
+            exchanges = createExchanges(listObjects.contents());
+        }
+        return processBatch(CastUtils.cast(exchanges));
+    }
+
+    protected Queue<Exchange> createExchanges(InputStream objectStream, String objectName) throws Exception {
+        Queue<Exchange> answer = new LinkedList<>();
+        Exchange exchange = getEndpoint().createExchange(objectStream, objectName);
+        answer.add(exchange);
+        IOHelper.close(objectStream);
+        return answer;
+    }
+
+    protected Queue<Exchange> createExchanges(List<Contents> minioObjectSummaries) throws Exception {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Received {} messages in this poll", minioObjectSummaries.size());
+        }
+        String bucketName = getConfiguration().getBucketName();
+        Collection<InputStream> minioObjects = new ArrayList<>();
+        Queue<Exchange> answer = new LinkedList<>();
+        try {
+            if (getConfiguration().isIncludeFolders()) {
+                for (Contents minioObjectSummary : minioObjectSummaries) {
+                    InputStream minioObject = getObject(bucketName, getMinioClient(), minioObjectSummary.objectName());
+                    minioObjects.add(minioObject);
+                    Exchange exchange = getEndpoint().createExchange(minioObject, minioObjectSummary.objectName());
+                    answer.add(exchange);
+                }
+            } else {
+                for (Contents minioObjectSummary : minioObjectSummaries) {
+                    // ignore if directory
+                    if (!minioObjectSummary.isDir()) {
+                        InputStream minioObject = getObject(bucketName, getMinioClient(), minioObjectSummary.objectName());
+                        minioObjects.add(minioObject);
+                        Exchange exchange = getEndpoint().createExchange(minioObject, minioObjectSummary.objectName());
+                        answer.add(exchange);
+                    }
+                }
+            }
+
+        } catch (Throwable e) {
+            LOG.warn("Error getting MinioObject due: {}", e.getMessage());
+            throw e;
+
+        } finally {
+            // ensure all previous gathered minio objects are closed
+            // if there was an exception creating the exchanges in this batch
+            minioObjects.forEach(IOHelper::close);
+        }
+
+        return answer;
+    }
+
+    private InputStream getObject(String bucketName, MinioClient minioClient, String objectName) throws Exception {
+        GetObjectArgs.Builder getObjectRequest = GetObjectArgs.builder().bucket(bucketName).object(objectName);
+
+        if (getConfiguration().getServerSideEncryptionCustomerKey() != null) {
+            getObjectRequest.ssec(getConfiguration().getServerSideEncryptionCustomerKey());
+        }
+        if (getConfiguration().getOffset() != 0) {
+            getObjectRequest.offset(getConfiguration().getOffset());
+        }
+        if (getConfiguration().getLength() != 0) {
+            getObjectRequest.length(getConfiguration().getLength());
+        }
+        if (getConfiguration().getVersionId() != null) {
+            getObjectRequest.versionId(getConfiguration().getVersionId());
+        }
+        if (getConfiguration().getMatchETag() != null) {
+            getObjectRequest.matchETag(getConfiguration().getMatchETag());
+        }
+        if (getConfiguration().getNotMatchETag() != null) {
+            getObjectRequest.notMatchETag(getConfiguration().getNotMatchETag());
+        }
+        if (getConfiguration().getModifiedSince() != null) {
+            getObjectRequest.modifiedSince(getConfiguration().getModifiedSince());
+        }
+        if (getConfiguration().getUnModifiedSince() != null) {
+            getObjectRequest.unmodifiedSince(getConfiguration().getUnModifiedSince());
+        }
+
+        return minioClient.getObject(getObjectRequest.build());
+    }
+
+    @Override
+    public int processBatch(Queue<Object> exchanges) {
+        int total = exchanges.size();
+
+        for (int index = 0; index < total && isBatchAllowed(); index++) {
+            // only loop if we are started (allowed to run)
+            final Exchange exchange = ObjectHelper.cast(Exchange.class, exchanges.poll());
+            // add current index and total as properties
+            exchange.setProperty(Exchange.BATCH_INDEX, index);
+            exchange.setProperty(Exchange.BATCH_SIZE, total);
+            exchange.setProperty(Exchange.BATCH_COMPLETE, index == total - 1);
+
+            // update pending number of exchanges
+            pendingExchanges = total - index - 1;
+
+            // add on completion to handle after work when the exchange is done
+            exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() {
+                public void onComplete(Exchange exchange) {
+                    processCommit(exchange);
+                }
+
+                public void onFailure(Exchange exchange) {
+                    processRollback(exchange);
+                }
+
+                @Override
+                public String toString() {
+                    return "MinioConsumerOnCompletion";
+                }
+            });
+
+            LOG.trace("Processing exchange ...");
+            getAsyncProcessor().process(exchange, doneSync -> LOG.trace("Processing exchange done."));
+        }
+
+        return total;
+    }
+
+    /**
+     * Strategy to delete the message after being processed.
+     *
+     * @param exchange the exchange
+     */
+    protected void processCommit(Exchange exchange) {
+        try {
+            String srcBucketName = exchange.getIn().getHeader(MinioConstants.BUCKET_NAME, String.class);
+            String srcObjectName = exchange.getIn().getHeader(MinioConstants.OBJECT_NAME, String.class);
+
+            if (getConfiguration().isDeleteAfterRead() || getConfiguration().isMoveAfterRead()) {
+                if (getConfiguration().isMoveAfterRead()) {
+                    copyObject(srcBucketName, srcObjectName);
+                    LOG.trace("Copied object from bucket {} with objectName {} to bucket {}...",
+                            srcBucketName, srcObjectName, getConfiguration().getDestinationBucketName());
+                }
+
+                LOG.trace("Deleting object from bucket {} with objectName {}...", srcBucketName, srcObjectName);
+
+                RemoveObjectArgs.Builder removeObjectRequest = RemoveObjectArgs.builder()
+                        .bucket(srcBucketName)
+                        .object(srcObjectName)
+                        .bypassGovernanceMode(getConfiguration().isBypassGovernanceMode());
+
+                if (getConfiguration().getVersionId() != null) {
+                    removeObjectRequest.versionId(getConfiguration().getVersionId());
+                }
+                if (getConfiguration().isBypassGovernanceMode()) {
+                    removeObjectRequest.versionId(getConfiguration().getVersionId());
+                }
+                getMinioClient().removeObject(removeObjectRequest.build());
+
+                LOG.trace("Deleted object from bucket {} with objectName {}...", srcBucketName, srcObjectName);
+            }
+        } catch (MinioException e) {
+            getExceptionHandler().handleException("Error occurred during moving or deleting object. This exception is ignored.",
+                    exchange, e);
+        } catch (Exception e) {
+            LOG.trace("Error process commit...");
+        }
+    }
+
+    private void copyObject(String srcBucketName, String srcObjectName) {
+        String destinationBucketName = getConfiguration().getDestinationBucketName();
+        if (destinationBucketName == null) {
+            throw new IllegalArgumentException("Destination Bucket name must be specified to copy operation");
+        }
+
+        try {
+            // set destination object name as source object name, if not specified
+            String destinationObjectName = (getConfiguration().getDestinationObjectName() != null)
+                    ? getConfiguration().getDestinationObjectName()
+                    : srcObjectName;
+
+
+            LOG.trace("Copying object from bucket {} with objectName {} to bucket {}...",
+                    srcBucketName, srcObjectName, destinationBucketName);
+
+            CopySource.Builder copySourceBuilder = CopySource.builder().bucket(srcBucketName).object(srcObjectName);
+            if (getConfiguration().getServerSideEncryptionCustomerKey() != null) {
+                copySourceBuilder.ssec(getConfiguration().getServerSideEncryptionCustomerKey());
+            }
+            if (getConfiguration().getOffset() != 0) {
+                copySourceBuilder.offset(getConfiguration().getOffset());
+            }
+            if (getConfiguration().getLength() != 0) {
+                copySourceBuilder.length(getConfiguration().getLength());
+            }
+            if (getConfiguration().getVersionId() != null) {
+                copySourceBuilder.versionId(getConfiguration().getVersionId());
+            }
+            if (getConfiguration().getMatchETag() != null) {
+                copySourceBuilder.matchETag(getConfiguration().getMatchETag());
+            }
+            if (getConfiguration().getNotMatchETag() != null) {
+                copySourceBuilder.notMatchETag(getConfiguration().getNotMatchETag());
+            }
+            if (getConfiguration().getModifiedSince() != null) {
+                copySourceBuilder.modifiedSince(getConfiguration().getModifiedSince());
+            }
+            if (getConfiguration().getUnModifiedSince() != null) {
+                copySourceBuilder.unmodifiedSince(getConfiguration().getUnModifiedSince());
+            }
+
+            CopyObjectArgs.Builder copyObjectRequest = CopyObjectArgs.builder()
+                    .source(copySourceBuilder.build())
+                    .bucket(getConfiguration().getDestinationBucketName())
+                    .object(destinationObjectName);
+
+            if (getConfiguration().getServerSideEncryption() != null) {
+                copyObjectRequest.sse(getConfiguration().getServerSideEncryption());
+            }
+
+            getMinioClient().copyObject(copyObjectRequest.build());
+
+        } catch (Exception e) {
+            LOG.warn("Error copy object from bucket {} with objectName {} to bucket {}...",

Review comment:
       Same here, should handle this exception as well?

##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioProducer.java
##########
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio;
+
+import java.io.*;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import io.minio.CopyObjectArgs;
+import io.minio.CopySource;
+import io.minio.GetObjectArgs;
+import io.minio.ListObjectsArgs;
+import io.minio.MinioClient;
+import io.minio.ObjectWriteResponse;
+import io.minio.PutObjectArgs;
+import io.minio.RemoveBucketArgs;
+import io.minio.RemoveObjectArgs;
+import io.minio.RemoveObjectsArgs;
+import io.minio.Result;
+import io.minio.messages.Bucket;
+import io.minio.messages.Item;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.Message;
+import org.apache.camel.WrappedFile;
+import org.apache.camel.support.DefaultProducer;
+import org.apache.camel.util.FileUtil;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.URISupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Producer which sends messages to the Minio Simple Storage
+ */
+public class MinioProducer extends DefaultProducer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MinioProducer.class);
+
+    private transient String minioProducerToString;
+
+    public MinioProducer(final Endpoint endpoint) {
+        super(endpoint);
+    }
+
+    public static Message getMessageForResponse(final Exchange exchange) {
+        return exchange.getMessage();
+    }
+
+    @Override
+    public void process(final Exchange exchange) throws Exception {
+        MinioOperations operation = determineOperation(exchange);
+        MinioClient minioClient = getEndpoint().getMinioClient();
+        if (ObjectHelper.isEmpty(operation)) {
+            putObject(minioClient, exchange);
+        } else {
+            switch (operation) {
+                case copyObject:
+                    copyObject(minioClient, exchange);
+                    break;
+                case deleteObject:
+                    deleteObject(minioClient, exchange);
+                    break;
+                case deleteObjects:
+                    deleteObjects(minioClient, exchange);
+                    break;
+                case listBuckets:
+                    listBuckets(minioClient, exchange);
+                    break;
+                case deleteBucket:
+                    deleteBucket(minioClient, exchange);
+                    break;
+                case listObjects:
+                    listObjects(minioClient, exchange);
+                    break;
+                case getObject:
+                    getObject(minioClient, exchange);
+                    break;
+                case getPartialObject:
+                    getPartialObject(minioClient, exchange);
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unsupported operation");
+            }
+        }
+    }
+
+    public void putObject(MinioClient minioClient, final Exchange exchange) throws Exception {
+
+        if (getConfiguration().isPojoRequest()) {
+            PutObjectArgs.Builder payload = exchange.getIn().getMandatoryBody(PutObjectArgs.Builder.class);
+            if (payload != null) {
+                ObjectWriteResponse putObjectResult = minioClient.putObject(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setHeader(MinioConstants.E_TAG, putObjectResult.etag());
+                if (putObjectResult.versionId() != null) {
+                    message.setHeader(MinioConstants.VERSION_ID, putObjectResult.versionId());
+                }
+            }
+        } else {
+            final String bucketName = determineBucketName(exchange);
+            final String objectName = determineObjectName(exchange);
+            Map<String, String> objectMetadata = determineMetadata(exchange);
+            Map<String, String> extraHeaders = determineExtraHeaders(exchange);
+
+            File filePayload = null;
+            InputStream inputStream;
+            ByteArrayOutputStream baos;
+            Object object = exchange.getIn().getMandatoryBody();
+
+            // Need to check if the message body is WrappedFile
+            if (object instanceof WrappedFile) {
+                object = ((WrappedFile<?>) object).getFile();
+            }
+            if (object instanceof File) {
+                filePayload = (File) object;
+                inputStream = new FileInputStream(filePayload);
+            } else {
+                inputStream = exchange.getIn().getMandatoryBody(InputStream.class);
+                if (objectMetadata.containsKey(Exchange.CONTENT_LENGTH)) {
+                    if (objectMetadata.get("Content-Length").equals("0") && ObjectHelper.isEmpty(exchange.getProperty(Exchange.CONTENT_LENGTH))) {
+                        LOG.debug("The content length is not defined. It needs to be determined by reading the data into memory");
+                        baos = determineLengthInputStream(inputStream);
+                        objectMetadata.put("Content-Length", String.valueOf(baos.size()));
+                        inputStream = new ByteArrayInputStream(baos.toByteArray());
+                    } else {
+                        if (ObjectHelper.isNotEmpty(exchange.getProperty(Exchange.CONTENT_LENGTH))) {
+                            objectMetadata.put("Content-Length", exchange.getProperty(Exchange.CONTENT_LENGTH, String.class));
+                        }
+                    }
+                }
+            }
+            PutObjectArgs.Builder putObjectRequest = PutObjectArgs.builder()
+                    .stream(inputStream, inputStream.available(), -1)
+                    .bucket(bucketName)
+                    .object(objectName)
+                    .userMetadata(objectMetadata);
+
+            if (!extraHeaders.isEmpty()) {
+                putObjectRequest.extraHeaders(extraHeaders);
+            }
+
+            LOG.trace("Put object from exchange...");
+
+            ObjectWriteResponse putObjectResult = getEndpoint().getMinioClient().putObject(putObjectRequest.build());
+
+            LOG.trace("Received result...");
+
+            Message message = getMessageForResponse(exchange);
+            message.setHeader(MinioConstants.E_TAG, putObjectResult.etag());
+            if (putObjectResult.versionId() != null) {
+                message.setHeader(MinioConstants.VERSION_ID, putObjectResult.versionId());
+            }
+
+            IOHelper.close(inputStream);
+
+            if (getConfiguration().isDeleteAfterWrite() && filePayload != null) {
+                FileUtil.deleteFile(filePayload);
+            }
+        }
+    }
+
+    private Map<String, String> determineExtraHeaders(Exchange exchange) {
+        Map<String, String> extraHeaders = new HashMap<>();
+        String storageClass = determineStorageClass(exchange);
+        if (storageClass != null) {
+            extraHeaders.put("X-Amz-Storage-Class", storageClass);
+        }
+
+        String cannedAcl = exchange.getIn().getHeader(MinioConstants.CANNED_ACL, String.class);
+        if (cannedAcl != null) {
+            extraHeaders.put("x-amz-acl", cannedAcl);
+        }
+
+        return extraHeaders;
+    }
+
+    private void copyObject(MinioClient minioClient, Exchange exchange) throws Exception {
+
+        if (getConfiguration().isPojoRequest()) {
+            CopyObjectArgs.Builder payload = exchange.getIn().getMandatoryBody(CopyObjectArgs.Builder.class);
+            if (payload != null) {
+                ObjectWriteResponse result = minioClient.copyObject(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody(result);
+            }
+        } else {
+
+            final String bucketName = determineBucketName(exchange);
+            final String sourceKey = determineObjectName(exchange);
+            final String destinationKey = exchange.getIn().getHeader(MinioConstants.DESTINATION_OBJECT_NAME, String.class);
+            final String destinationBucketName = exchange.getIn().getHeader(MinioConstants.DESTINATION_BUCKET_NAME, String.class);
+
+            if (ObjectHelper.isEmpty(destinationBucketName)) {
+                throw new IllegalArgumentException("Bucket Name Destination must be specified for copyObject Operation");
+            }
+            if (ObjectHelper.isEmpty(destinationKey)) {
+                throw new IllegalArgumentException("Destination Key must be specified for copyObject Operation");
+            }
+
+            CopySource.Builder copySourceBuilder = CopySource.builder()
+                    .bucket(bucketName)
+                    .object(sourceKey);
+
+            CopyObjectArgs.Builder copyObjectRequest = CopyObjectArgs.builder()
+                    .bucket(destinationBucketName)
+                    .object(destinationKey)
+                    .source(copySourceBuilder.build());
+
+            ObjectWriteResponse copyObjectResult = minioClient.copyObject(copyObjectRequest.build());
+
+            Message message = getMessageForResponse(exchange);
+            if (copyObjectResult.versionId() != null) {
+                message.setHeader(MinioConstants.VERSION_ID, copyObjectResult.versionId());
+            }
+        }
+    }
+
+    private void deleteObject(MinioClient minioClient, Exchange exchange) throws Exception {
+        final String bucketName = determineBucketName(exchange);
+        final String sourceKey = determineObjectName(exchange);
+        final String versionId = determineVersionId(exchange);
+        if (getConfiguration().isPojoRequest()) {
+            RemoveObjectArgs.Builder payload = exchange.getIn().getMandatoryBody(RemoveObjectArgs.Builder.class);
+            if (payload != null) {
+                minioClient.removeObject(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody(true);
+            }
+        } else {
+
+            minioClient.removeObject(RemoveObjectArgs.builder()
+                    .bucket(bucketName)
+                    .object(sourceKey).build());
+
+            Message message = getMessageForResponse(exchange);
+            message.setBody(true);
+        }
+    }
+
+    private void deleteObjects(MinioClient minioClient, Exchange exchange) throws Exception {
+        if (getConfiguration().isPojoRequest()) {
+            RemoveObjectsArgs.Builder payload = exchange.getIn().getMandatoryBody(RemoveObjectsArgs.Builder.class);
+            if (payload != null) {
+                minioClient.removeObjects(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody(true);
+            }
+        } else {
+            throw new IllegalArgumentException("Cannot delete multiple objects without a POJO request");
+        }
+    }
+
+    private void listBuckets(MinioClient minioClient, Exchange exchange) throws Exception {
+        List<Bucket> bucketsList = minioClient.listBuckets();
+        Message message = getMessageForResponse(exchange);
+        //returns iterator of bucketList
+        message.setBody(bucketsList.iterator());
+    }
+
+    private void deleteBucket(MinioClient minioClient, Exchange exchange) throws Exception {
+        final String bucketName = determineBucketName(exchange);
+
+        if (getConfiguration().isPojoRequest()) {
+            RemoveBucketArgs.Builder payload = exchange.getIn().getMandatoryBody(RemoveBucketArgs.Builder.class);
+            if (payload != null) {
+                minioClient.removeBucket(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody("ok");
+            }
+        } else {
+
+            minioClient.removeBucket(RemoveBucketArgs.builder().bucket(bucketName).build());
+            Message message = getMessageForResponse(exchange);
+            message.setBody("ok");
+        }
+    }
+
+    private void getObject(MinioClient minioClient, Exchange exchange) throws Exception {
+
+        if (getConfiguration().isPojoRequest()) {
+            GetObjectArgs.Builder payload = exchange.getIn().getMandatoryBody(GetObjectArgs.Builder.class);
+            if (payload != null) {
+                InputStream respond = minioClient.getObject(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody(respond);
+            }
+        } else {
+            final String bucketName = determineBucketName(exchange);
+            final String sourceKey = determineObjectName(exchange);
+
+            InputStream respond = minioClient.getObject(GetObjectArgs.builder()
+                    .bucket(bucketName)
+                    .object(sourceKey)
+                    .build());
+
+            Message message = getMessageForResponse(exchange);
+            message.setBody(respond);
+        }
+    }
+
+    private void getPartialObject(MinioClient minioClient, Exchange exchange) throws Exception {
+
+        if (getConfiguration().isPojoRequest()) {
+            GetObjectArgs.Builder payload = exchange.getIn().getMandatoryBody(GetObjectArgs.Builder.class);
+            if (payload != null) {
+                InputStream respond = minioClient.getObject(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody(respond);
+            }
+        } else {
+            final String bucketName = determineBucketName(exchange);
+            final String sourceKey = determineObjectName(exchange);
+            final String offset = exchange.getIn().getHeader(MinioConstants.OFFSET, String.class);
+            final String length = exchange.getIn().getHeader(MinioConstants.LENGTH, String.class);
+
+            if (ObjectHelper.isEmpty(offset) || ObjectHelper.isEmpty(length)) {
+                throw new IllegalArgumentException("A Offset and length header must be configured to perform a partial get operation.");
+            }
+
+            InputStream respond = minioClient.getObject(GetObjectArgs.builder()
+                    .bucket(bucketName)
+                    .object(sourceKey)
+                    .offset(Long.parseLong(offset))
+                    .length(Long.parseLong(length))
+                    .build());
+
+            Message message = getMessageForResponse(exchange);
+            message.setBody(respond);
+        }
+    }
+
+    private void listObjects(MinioClient minioClient, Exchange exchange) throws InvalidPayloadException {
+
+        if (getConfiguration().isPojoRequest()) {
+            ListObjectsArgs.Builder payload = exchange.getIn().getMandatoryBody(ListObjectsArgs.Builder.class);
+            if (payload != null) {
+                Iterable<Result<Item>> objectList = minioClient.listObjects(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody(objectList);
+            }
+        } else {
+            final String bucketName = determineBucketName(exchange);
+
+            Iterable<Result<Item>> objectList = minioClient.listObjects(ListObjectsArgs.builder()
+                    .bucket(bucketName)
+                    .recursive(getConfiguration().isRecursive())
+                    .build());
+
+            Message message = getMessageForResponse(exchange);
+            message.setBody(objectList);
+        }
+    }
+
+    private MinioOperations determineOperation(Exchange exchange) {
+        MinioOperations operation = exchange.getIn().getHeader(MinioConstants.MINIO_OPERATION, MinioOperations.class);
+        if (operation == null) {
+            operation = getConfiguration().getOperation();
+        }
+        return operation;
+    }
+
+    private Map<String, String> determineMetadata(final Exchange exchange) {
+        Map<String, String> objectMetadata = new HashMap<>();
+
+        Long contentLength = exchange.getIn().getHeader(MinioConstants.CONTENT_LENGTH, Long.class);
+        if (contentLength != null) {
+            objectMetadata.put("Content-Length", String.valueOf(contentLength));
+        }
+
+        String contentType = exchange.getIn().getHeader(MinioConstants.CONTENT_TYPE, String.class);
+        if (contentType != null) {

Review comment:
       `isNotEmpty` as I mentioned earlier would make more sense here

##########
File path: components/camel-minio/src/test/java/org/apache/camel/component/minio/MinioComponentConfigurationTest.java
##########
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio;
+
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class MinioComponentConfigurationTest extends CamelTestSupport {
+
+    @Test
+    public void createEndpointWithMinimalConfiguration() throws Exception {
+        MinioComponent component = context.getComponent("minio", MinioComponent.class);
+        MinioEndpoint endpoint = (MinioEndpoint) component
+                .createEndpoint("minio://TestDomain?accessKey=xxx&secretKey=yyy&region=us-west-1&endpoint=http://localhost:4572");
+        assertEquals(endpoint.getConfiguration().getEndpoint(), "http://localhost:4572");

Review comment:
       Please add more cases for region, secretKey .. etc




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [camel] zregvart commented on a change in pull request #3897: CAMEL-13934 camel-minio - Component to store/load files from blob store

GitBox
In reply to this post by GitBox

zregvart commented on a change in pull request #3897:
URL: https://github.com/apache/camel/pull/3897#discussion_r463681735



##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioComponent.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio;
+
+import java.util.Map;
+import java.util.Set;
+
+import io.minio.MinioClient;
+import org.apache.camel.CamelContext;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.annotations.Component;
+import org.apache.camel.support.DefaultComponent;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Represents the component that manages {@link MinioEndpoint}.
+ */
+@Component("minio")
+public class MinioComponent extends DefaultComponent {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MinioComponent.class);
+
+    @Metadata
+    private MinioConfiguration configuration = new MinioConfiguration();
+
+    public MinioComponent() {
+        this(null);

Review comment:
       Not sure about this, might lead to `NullPointerException`s




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [camel] oscerd commented on a change in pull request #3897: CAMEL-13934 camel-minio - Component to store/load files from blob store

GitBox
In reply to this post by GitBox

oscerd commented on a change in pull request #3897:
URL: https://github.com/apache/camel/pull/3897#discussion_r463684336



##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioComponent.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio;
+
+import java.util.Map;
+import java.util.Set;
+
+import io.minio.MinioClient;
+import org.apache.camel.CamelContext;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.annotations.Component;
+import org.apache.camel.support.DefaultComponent;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Represents the component that manages {@link MinioEndpoint}.
+ */
+@Component("minio")
+public class MinioComponent extends DefaultComponent {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MinioComponent.class);
+
+    @Metadata
+    private MinioConfiguration configuration = new MinioConfiguration();
+
+    public MinioComponent() {
+        this(null);

Review comment:
       No, this is fine, we are using similar approach for component with Extension for validation like this https://github.com/apache/camel/blob/master/components/camel-aws-s3/src/main/java/org/apache/camel/component/aws/s3/S3Component.java#L35




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [camel] DenisIstomin commented on a change in pull request #3897: CAMEL-13934 camel-minio - Component to store/load files from blob store

GitBox
In reply to this post by GitBox

DenisIstomin commented on a change in pull request #3897:
URL: https://github.com/apache/camel/pull/3897#discussion_r463689546



##########
File path: catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/minio-component.adoc
##########
@@ -0,0 +1,162 @@
+[[minio-component]]

Review comment:
       Contend is not updated from original `adoc` file.

##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioEndpoint.java
##########
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio;
+
+import java.io.*;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+import io.minio.BucketExistsArgs;
+import io.minio.MakeBucketArgs;
+import io.minio.MinioClient;
+import io.minio.ObjectStat;
+import io.minio.SetBucketPolicyArgs;
+import io.minio.StatObjectArgs;
+import io.minio.errors.InvalidBucketNameException;
+import org.apache.camel.Category;
+import org.apache.camel.Component;
+import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.ExtendedExchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.component.minio.client.MinioClientFactory;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriPath;
+import org.apache.camel.support.ScheduledPollEndpoint;
+import org.apache.camel.support.SynchronizationAdapter;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Store and retrieve objects from Minio Storage Service using Minio SDK.
+ */
+@UriEndpoint(firstVersion = "3.5.0", scheme = "minio", title = "Minio Storage Service", syntax = "minio://bucketName",
+        category = {Category.CLOUD, Category.FILE})
+
+public class MinioEndpoint extends ScheduledPollEndpoint {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MinioEndpoint.class);
+
+    private MinioClient minioClient;
+
+    @UriPath(description = "Bucket name")
+    @Metadata(required = true)
+    private String bucketName; // to support component docs
+    @UriParam
+    private MinioConfiguration configuration;
+    @UriParam(label = "consumer", defaultValue = "10")
+    private int maxMessagesPerPoll = 10;
+    @UriParam(label = "consumer", defaultValue = "60")
+    private int maxConnections = 50 + maxMessagesPerPoll;
+
+    public MinioEndpoint(String uri, Component component, MinioConfiguration configuration) {
+        super(uri, component);
+        this.configuration = configuration;
+    }
+
+    @Override
+    public Consumer createConsumer(Processor processor) throws Exception {
+        MinioConsumer minioConsumer = new MinioConsumer(this, processor);
+        configureConsumer(minioConsumer);
+        minioConsumer.setMaxMessagesPerPoll(maxMessagesPerPoll);
+        return minioConsumer;
+    }
+
+    @Override
+    public Producer createProducer() {
+        return new MinioProducer(this);
+    }
+
+    @Override
+    public void doStart() throws Exception {
+        super.doStart();
+
+        minioClient = getConfiguration().getMinioClient() != null
+                ? getConfiguration().getMinioClient()
+                : MinioClientFactory.getClient(getConfiguration()).getMinioClient();
+
+        String objectName = getConfiguration().getObjectName();
+
+        if (objectName != null) {
+            LOG.trace("Object name {} requested, so skipping bucket check...", objectName);
+            return;
+        }
+
+        String bucketName = getConfiguration().getBucketName();
+        LOG.trace("Querying whether bucket {} already exists...", bucketName);
+
+        if (bucketExists(bucketName)) {
+            LOG.trace("Bucket {} already exists", bucketName);
+        } else {
+            if (!getConfiguration().isAutoCreateBucket()) {
+                throw new InvalidBucketNameException("Bucket {} does not exists, set autoCreateBucket option for bucket auto creation", bucketName);
+            } else {
+                LOG.trace("AutoCreateBucket set to true, Creating bucket {}...", bucketName);
+                makeBucket(bucketName);
+                LOG.trace("Bucket created");
+            }
+        }
+
+        if (getConfiguration().getPolicy() != null) {
+            setBucketPolicy(bucketName);
+        }
+    }
+
+    @Override
+    public void doStop() throws Exception {
+        if (ObjectHelper.isEmpty(getConfiguration().getMinioClient())) {
+            if (minioClient != null) {
+                minioClient = null;
+            }
+        }
+        super.doStop();
+    }
+
+    public Exchange createExchange(InputStream minioObject, String objectName) throws Exception {
+        return createExchange(getExchangePattern(), minioObject, objectName);
+    }
+
+    public Exchange createExchange(ExchangePattern pattern,
+                                   InputStream minioObject, String objectName) throws Exception {
+        LOG.trace("Getting object with objectName {} from bucket {}...", objectName, getConfiguration().getBucketName());
+
+        Exchange exchange = super.createExchange(pattern);
+        Message message = exchange.getIn();
+        LOG.trace("Got object!");
+
+        getObjectStat(objectName, message);
+
+        if (getConfiguration().isIncludeBody()) {
+            try {
+                message.setBody(readInputStream(minioObject));
+                if (getConfiguration().isAutocloseBody()) {
+                    exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() {
+                        @Override
+                        public void onDone(Exchange exchange) {
+                            IOHelper.close(minioObject);
+                        }
+                    });
+                }
+
+            } catch (IOException e) {
+                // TODO Auto-generated catch block
+                e.printStackTrace();
+            }
+        } else {
+            message.setBody(null);
+            IOHelper.close(minioObject);
+        }
+
+        return exchange;
+    }
+
+    public MinioConfiguration getConfiguration() {
+        return configuration;
+    }
+
+    public void setConfiguration(MinioConfiguration configuration) {
+        this.configuration = configuration;
+    }
+
+    public MinioClient getMinioClient() {
+        return minioClient;
+    }
+
+    public void setMinioClient(MinioClient minioClient) {
+        this.minioClient = minioClient;
+    }
+
+    public int getMaxMessagesPerPoll() {
+        return maxMessagesPerPoll;
+    }
+
+    /**
+     * Gets the maximum number of messages as a limit to poll at each polling.
+     * <p/>
+     * Gets the maximum number of messages as a limit to poll at each polling.
+     * The default value is 10. Use 0 or a negative number to set it as
+     * unlimited.
+     */
+    public void setMaxMessagesPerPoll(int maxMessagesPerPoll) {
+        this.maxMessagesPerPoll = maxMessagesPerPoll;
+    }
+
+    public int getMaxConnections() {
+        return maxConnections;
+    }
+
+    /**
+     * Set the maxConnections parameter in the minio client configuration
+     */
+    public void setMaxConnections(int maxConnections) {
+        this.maxConnections = maxConnections;
+    }
+
+    private String readInputStream(InputStream minioObject) throws IOException {
+        StringBuilder textBuilder = new StringBuilder();
+        try (Reader reader = new BufferedReader(new InputStreamReader(minioObject, Charset.forName(StandardCharsets.UTF_8.name())))) {

Review comment:
       Looks like it could be simplified:
   ```
   Reader reader = new BufferedReader(new InputStreamReader(minioObject, StandardCharsets.UTF_8));
   ```

##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioComponentVerifierExtension.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio;
+
+import java.util.Map;
+
+import io.minio.MinioClient;
+import io.minio.errors.MinioException;
+import org.apache.camel.component.extension.verifier.DefaultComponentVerifierExtension;
+import org.apache.camel.component.extension.verifier.ResultBuilder;
+import org.apache.camel.component.extension.verifier.ResultErrorBuilder;
+import org.apache.camel.component.extension.verifier.ResultErrorHelper;
+
+public class MinioComponentVerifierExtension extends DefaultComponentVerifierExtension {
+
+    public MinioComponentVerifierExtension() {
+        this("minio");
+    }
+
+    public MinioComponentVerifierExtension(String scheme) {
+        super(scheme);
+    }
+
+    // *********************************
+    // Parameters validation
+    // *********************************
+
+    @Override

Review comment:
       And the same below

##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioComponentVerifierExtension.java
##########
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio;
+
+import java.util.Map;
+
+import io.minio.MinioClient;
+import io.minio.errors.MinioException;
+import org.apache.camel.component.extension.verifier.DefaultComponentVerifierExtension;
+import org.apache.camel.component.extension.verifier.ResultBuilder;
+import org.apache.camel.component.extension.verifier.ResultErrorBuilder;
+import org.apache.camel.component.extension.verifier.ResultErrorHelper;
+
+public class MinioComponentVerifierExtension extends DefaultComponentVerifierExtension {
+
+    public MinioComponentVerifierExtension() {
+        this("minio");
+    }
+
+    public MinioComponentVerifierExtension(String scheme) {
+        super(scheme);
+    }
+
+    // *********************************
+    // Parameters validation
+    // *********************************
+
+    @Override

Review comment:
       I would suggest to reformat it like this:
   ```
       @Override
       protected Result verifyParameters(Map<String, Object> parameters) {
           ResultBuilder builder = ResultBuilder.withStatusAndScope(Result.Status.OK, Scope.PARAMETERS)
                                                .error(ResultErrorHelper.requiresOption("accessKey", parameters))
                                                .error(ResultErrorHelper.requiresOption("secretKey", parameters))
                                                .error(ResultErrorHelper.requiresOption("region", parameters));
   
           // Validate using the catalog
           super.verifyParametersAgainstCatalog(builder, parameters);
   
           return builder.build();
       }
   ```

##########
File path: components/camel-minio/src/test/java/org/apache/camel/component/minio/integration/MinioConsumerIntegrationTest.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio.integration;
+
+import io.minio.MinioClient;
+import org.apache.camel.BindToRegistry;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.minio.MinioConstants;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+@Disabled("Must be manually tested. Provide your own accessKey and secretKey!")
+public class MinioConsumerIntegrationTest extends CamelTestSupport {
+
+    @BindToRegistry("minioClient")
+    MinioClient client = MinioClient.builder()
+            .endpoint("https://play.min.io")
+            .credentials("Q3AM3UQ867SPQQA43P2F", "zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG")

Review comment:
       It would be nice to extract all credentials into `.properties` file

##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioEndpoint.java
##########
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio;
+
+import java.io.*;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+import io.minio.BucketExistsArgs;
+import io.minio.MakeBucketArgs;
+import io.minio.MinioClient;
+import io.minio.ObjectStat;
+import io.minio.SetBucketPolicyArgs;
+import io.minio.StatObjectArgs;
+import io.minio.errors.InvalidBucketNameException;
+import org.apache.camel.Category;
+import org.apache.camel.Component;
+import org.apache.camel.Consumer;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.ExtendedExchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.Producer;
+import org.apache.camel.component.minio.client.MinioClientFactory;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.UriEndpoint;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriPath;
+import org.apache.camel.support.ScheduledPollEndpoint;
+import org.apache.camel.support.SynchronizationAdapter;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Store and retrieve objects from Minio Storage Service using Minio SDK.
+ */
+@UriEndpoint(firstVersion = "3.5.0", scheme = "minio", title = "Minio Storage Service", syntax = "minio://bucketName",
+        category = {Category.CLOUD, Category.FILE})
+
+public class MinioEndpoint extends ScheduledPollEndpoint {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MinioEndpoint.class);
+
+    private MinioClient minioClient;
+
+    @UriPath(description = "Bucket name")
+    @Metadata(required = true)
+    private String bucketName; // to support component docs

Review comment:
       This is TODO comment?

##########
File path: components/camel-minio/src/test/java/org/apache/camel/component/minio/integration/MinioConsumerIntegrationTest.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio.integration;
+
+import io.minio.MinioClient;
+import org.apache.camel.BindToRegistry;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.minio.MinioConstants;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+@Disabled("Must be manually tested. Provide your own accessKey and secretKey!")
+public class MinioConsumerIntegrationTest extends CamelTestSupport {
+
+    @BindToRegistry("minioClient")
+    MinioClient client = MinioClient.builder()
+            .endpoint("https://play.min.io")
+            .credentials("Q3AM3UQ867SPQQA43P2F", "zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG")
+            .region("us-west-1")
+            .build();
+
+    @EndpointInject
+    private ProducerTemplate template;
+
+    @EndpointInject("mock:result")
+    private MockEndpoint result;
+
+    @Test
+    public void sendIn() throws Exception {
+        result.expectedMessageCount(3);
+
+        template.send("direct:putObject", exchange -> {
+            exchange.getIn().setHeader(MinioConstants.OBJECT_NAME, "test1.txt");
+            exchange.getIn().setBody("Test1");
+        });
+
+        template.send("direct:putObject", exchange -> {
+            exchange.getIn().setHeader(MinioConstants.OBJECT_NAME, "test2.txt");
+            exchange.getIn().setBody("Test2");
+        });
+
+        template.send("direct:putObject", exchange -> {
+            exchange.getIn().setHeader(MinioConstants.OBJECT_NAME, "test3.txt");
+            exchange.getIn().setBody("Test3");
+        });
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            @Override
+            public void configure() {
+                String minioEndpoint = "minio://mycamel?autoCreateBucket=false";
+
+                from("direct:putObject").startupOrder(1).to(minioEndpoint).to("mock:result");
+
+                // TODO: Check why this is not working

Review comment:
       Please resolve this TODO

##########
File path: components/camel-minio/src/test/java/org/apache/camel/component/minio/integration/MinioComponentIntegrationTest.java
##########
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio.integration;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExchangePattern;
+import org.apache.camel.Message;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.minio.MinioConstants;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.*;

Review comment:
       Please avoid `star import` usage.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [camel] dmvolod commented on a change in pull request #3897: CAMEL-13934 camel-minio - Component to store/load files from blob store

GitBox
In reply to this post by GitBox

dmvolod commented on a change in pull request #3897:
URL: https://github.com/apache/camel/pull/3897#discussion_r463978808



##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConfiguration.java
##########
@@ -0,0 +1,641 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio;
+
+import java.time.ZonedDateTime;
+
+import io.minio.MinioClient;
+import io.minio.ServerSideEncryption;
+import io.minio.ServerSideEncryptionCustomerKey;
+import okhttp3.OkHttpClient;
+import org.apache.camel.RuntimeCamelException;
+import org.apache.camel.spi.UriParam;
+import org.apache.camel.spi.UriParams;
+
+@UriParams
+public class MinioConfiguration implements Cloneable {
+
+    @UriParam
+    private String endpoint;
+    @UriParam
+    private Integer proxyPort;
+
+    @UriParam(label = "security", secret = true)
+    private String accessKey;
+    @UriParam(label = "security", secret = true)
+    private String secretKey;
+    @UriParam(defaultValue = "false")
+    private boolean secure;
+
+    @UriParam
+    private String region;
+
+    @UriParam
+    private OkHttpClient customHttpClient;
+
+    private String bucketName;
+    @UriParam(defaultValue = "true")
+    private boolean autoCreateBucket = true;
+    @UriParam(defaultValue = "false")
+    private boolean objectLock;
+
+    @UriParam
+    private ServerSideEncryptionCustomerKey serverSideEncryptionCustomerKey;
+    @UriParam
+    private ServerSideEncryption serverSideEncryption;
+
+    @UriParam
+    private MinioClient minioClient;
+
+    @UriParam(label = "consumer")
+    private String objectName;
+    @UriParam(label = "consumer")
+    private String delimiter;
+    @UriParam(label = "consumer", defaultValue = "false")
+    private boolean includeUserMetadata;
+    @UriParam(label = "consumer", defaultValue = "false")
+    private boolean includeVersions;
+    @UriParam(label = "consumer")
+    private String prefix;
+    @UriParam(label = "consumer", defaultValue = "false")
+    private boolean recursive;
+    @UriParam(label = "consumer")
+    private String startAfter;
+    @UriParam(label = "consumer", defaultValue = "false")
+    private boolean useVersion1;
+    @UriParam(label = "consumer", defaultValue = "false")
+    private boolean includeFolders;
+    @UriParam(label = "consumer")
+    private long offset;
+    @UriParam(label = "consumer")
+    private long length;
+    @UriParam(label = "consumer")
+    private String matchETag;
+    @UriParam(label = "consumer")
+    private String notMatchETag;
+    @UriParam(label = "consumer")
+    private ZonedDateTime modifiedSince;
+    @UriParam(label = "consumer")
+    private ZonedDateTime unModifiedSince;
+    @UriParam(label = "consumer")
+    private String destinationBucketName;
+    @UriParam(label = "consumer")
+    private String destinationObjectName;
+    @UriParam(label = "consumer", defaultValue = "true")
+    private boolean deleteAfterRead = true;
+    @UriParam(label = "consumer", defaultValue = "false")
+    private boolean moveAfterRead;
+    @UriParam(label = "consumer", defaultValue = "true")
+    private boolean includeBody = true;
+    @UriParam(label = "consumer", defaultValue = "true")
+    private boolean autocloseBody = true;

Review comment:
       ```suggestion
       private boolean autoCloseBody = true;
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [camel] dmvolod commented on a change in pull request #3897: CAMEL-13934 camel-minio - Component to store/load files from blob store

GitBox
In reply to this post by GitBox

dmvolod commented on a change in pull request #3897:
URL: https://github.com/apache/camel/pull/3897#discussion_r463979851



##########
File path: catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/minio-component.adoc
##########
@@ -0,0 +1,162 @@
+[[minio-component]]
+= Minio Storage Service Component
+:docTitle: Minio Storage Service
+:artifactId: camel-minio
+:description: Store and retrie objects from Minio Storage Service using Minio SDK.
+:since: 3.5
+:supportLevel: Preview
+:component-header: Both producer and consumer are supported
+
+*Since Camel {since}*
+
+*{component-header}*
+
+
+// component options: START
+The Minio Storage Service component supports 45 options, which are listed below.
+
+
+
+[width="100%",cols="2,5,^1,2",options="header"]
+|===
+| Name | Description | Default | Type
+| *autoCreateBucket* (common) | Setting the autocreation of the bucket if bucket name not exist. | true | boolean
+| *bypassGovernanceMode* (common) | Set this flag if you want to bypassGovernanceMode when deleting a particular object. | false | boolean
+| *configuration* (common) | The component configuration |  | MinioConfiguration
+| *customHttpClient* (common) | Set custom HTTP client for authenticated access. |  | OkHttpClient
+| *endpoint* (common) | Endpoint can be an URL, domain name, IPv4 address or IPv6 address. |  | String
+| *minioClient* (common) | Reference to a Minio Client object in the registry. |  | MinioClient
+| *objectLock* (common) | Set when creating new bucket. | false | boolean
+| *pojoRequest* (common) | If we want to use a POJO request as body or not. | false | boolean

Review comment:
       @Nayananga if two types of the requests are only possible, it's ok, but if you would like to extend, the requestType enum is better to use. It also common recommendation to review a large number of boolean flags which can be grouped together to the some enums types. Usually the minio API and Camel component pararameters can be not the same types and names.  




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [camel] dmvolod commented on a change in pull request #3897: CAMEL-13934 camel-minio - Component to store/load files from blob store

GitBox
In reply to this post by GitBox

dmvolod commented on a change in pull request #3897:
URL: https://github.com/apache/camel/pull/3897#discussion_r463980253



##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioComponent.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio;
+
+import java.util.Map;
+import java.util.Set;
+
+import io.minio.MinioClient;
+import org.apache.camel.CamelContext;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.annotations.Component;
+import org.apache.camel.support.DefaultComponent;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Represents the component that manages {@link MinioEndpoint}.
+ */
+@Component("minio")
+public class MinioComponent extends DefaultComponent {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MinioComponent.class);
+
+    @Metadata
+    private MinioConfiguration configuration = new MinioConfiguration();
+
+    public MinioComponent() {
+        this(null);
+    }
+
+    public MinioComponent(CamelContext context) {
+        super(context);
+        registerExtension(new MinioComponentVerifierExtension());
+
+    }
+
+    @Override
+    protected MinioEndpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+        if (remaining == null || remaining.trim().length() == 0) {
+            throw new IllegalArgumentException("Bucket name must be specified.");
+        }
+
+        final MinioConfiguration configuration = this.configuration != null ? this.configuration.copy() : new MinioConfiguration();
+        configuration.setBucketName(remaining);
+        MinioEndpoint endpoint = new MinioEndpoint(uri, this, configuration);
+        setProperties(endpoint, parameters);
+        checkAndSetRegistryClient(configuration, endpoint);
+
+        return endpoint;
+    }
+
+    public MinioConfiguration getConfiguration() {
+        return configuration;
+    }
+
+    /**
+     * The component configuration
+     */
+    public void setConfiguration(MinioConfiguration configuration) {
+        this.configuration = configuration;
+    }
+
+    private void checkAndSetRegistryClient(MinioConfiguration configuration, MinioEndpoint endpoint) {
+        if (ObjectHelper.isEmpty(endpoint.getConfiguration().getMinioClient())) {
+            LOG.debug("Looking for an MinioClient instance in the registry");
+            Set<MinioClient> clients = getCamelContext().getRegistry().findByType(MinioClient.class);
+            if (clients.size() == 1) {

Review comment:
       I saw this implementation in large number of components, developed by @omarsmak, @oscerd and other members, but a little bit confusing about error message as getCamelContext().getRegistry().findByType can return more than one record if user suddenly put more than one MinioClient.class in the registry even if it doesn't make sense.
   The error will be "No MinioClient instance in the registry" but can be wrong enought.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [camel] dmvolod commented on a change in pull request #3897: CAMEL-13934 camel-minio - Component to store/load files from blob store

GitBox
In reply to this post by GitBox

dmvolod commented on a change in pull request #3897:
URL: https://github.com/apache/camel/pull/3897#discussion_r463980253



##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioComponent.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio;
+
+import java.util.Map;
+import java.util.Set;
+
+import io.minio.MinioClient;
+import org.apache.camel.CamelContext;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.annotations.Component;
+import org.apache.camel.support.DefaultComponent;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Represents the component that manages {@link MinioEndpoint}.
+ */
+@Component("minio")
+public class MinioComponent extends DefaultComponent {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MinioComponent.class);
+
+    @Metadata
+    private MinioConfiguration configuration = new MinioConfiguration();
+
+    public MinioComponent() {
+        this(null);
+    }
+
+    public MinioComponent(CamelContext context) {
+        super(context);
+        registerExtension(new MinioComponentVerifierExtension());
+
+    }
+
+    @Override
+    protected MinioEndpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+        if (remaining == null || remaining.trim().length() == 0) {
+            throw new IllegalArgumentException("Bucket name must be specified.");
+        }
+
+        final MinioConfiguration configuration = this.configuration != null ? this.configuration.copy() : new MinioConfiguration();
+        configuration.setBucketName(remaining);
+        MinioEndpoint endpoint = new MinioEndpoint(uri, this, configuration);
+        setProperties(endpoint, parameters);
+        checkAndSetRegistryClient(configuration, endpoint);
+
+        return endpoint;
+    }
+
+    public MinioConfiguration getConfiguration() {
+        return configuration;
+    }
+
+    /**
+     * The component configuration
+     */
+    public void setConfiguration(MinioConfiguration configuration) {
+        this.configuration = configuration;
+    }
+
+    private void checkAndSetRegistryClient(MinioConfiguration configuration, MinioEndpoint endpoint) {
+        if (ObjectHelper.isEmpty(endpoint.getConfiguration().getMinioClient())) {
+            LOG.debug("Looking for an MinioClient instance in the registry");
+            Set<MinioClient> clients = getCamelContext().getRegistry().findByType(MinioClient.class);
+            if (clients.size() == 1) {

Review comment:
       I saw this implementation in large number of components, developed by @omarsmak, @oscerd and other members, but a little bit confusing about error message as getCamelContext().getRegistry().findByType can return more than one record if user suddenly put more than one MinioClient.class in the registry even if it doesn't make sense.
   The error will be "No MinioClient instance in the registry" but can be wrong enough.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [camel] dmvolod commented on a change in pull request #3897: CAMEL-13934 camel-minio - Component to store/load files from blob store

GitBox
In reply to this post by GitBox

dmvolod commented on a change in pull request #3897:
URL: https://github.com/apache/camel/pull/3897#discussion_r463980528



##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConstants.java
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio;
+
+/**
+ * Constants used in Camel Minio module
+ */
+public interface MinioConstants {
+
+    String BUCKET_NAME = "CamelMinioBucketName";
+    String DESTINATION_BUCKET_NAME = "CamelMinioDestinationBucketName";
+    String CACHE_CONTROL = "CamelMinioContentControl";
+    String CONTENT_DISPOSITION = "CamelMinioContentDisposition";
+    String CONTENT_ENCODING = "CamelMinioContentEncoding";
+    String CONTENT_LENGTH = "CamelMinioContentLength";
+    String CONTENT_MD5 = "CamelMinioContentMD5";
+    String CONTENT_TYPE = "CamelMinioContentType";
+    String E_TAG = "CamelMinioETag";
+    String OBJECT_NAME = "CamelMinioObjectName";
+    String DESTINATION_OBJECT_NAME = "CamelMinioDestinationObjectName";
+    String LAST_MODIFIED = "CamelMinioLastModified";
+    String STORAGE_CLASS = "CamelMinioStorageClass";
+    String VERSION_ID = "CamelMinioVersionId";
+    String CANNED_ACL = "CamelMinioCannedAcl";
+    String MINIO_OPERATION = "CamelMinioOperation";
+    String SERVER_SIDE_ENCRYPTION = "CamelMinioServerSideEncryption";
+    String EXPIRATION_TIME = "CamelMinioExpirationTime";
+    String REPLICATION_STATUS = "CamelMinioReplicationStatus";
+    String OFFSET = "CamelMinioRangeStart";

Review comment:
       @Nayananga a little bit confusing about key name and key value. The same as for `LENGTH`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [camel] oscerd commented on a change in pull request #3897: CAMEL-13934 camel-minio - Component to store/load files from blob store

GitBox
In reply to this post by GitBox

oscerd commented on a change in pull request #3897:
URL: https://github.com/apache/camel/pull/3897#discussion_r463981326



##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioComponent.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio;
+
+import java.util.Map;
+import java.util.Set;
+
+import io.minio.MinioClient;
+import org.apache.camel.CamelContext;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.annotations.Component;
+import org.apache.camel.support.DefaultComponent;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Represents the component that manages {@link MinioEndpoint}.
+ */
+@Component("minio")
+public class MinioComponent extends DefaultComponent {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MinioComponent.class);
+
+    @Metadata
+    private MinioConfiguration configuration = new MinioConfiguration();
+
+    public MinioComponent() {
+        this(null);
+    }
+
+    public MinioComponent(CamelContext context) {
+        super(context);
+        registerExtension(new MinioComponentVerifierExtension());
+
+    }
+
+    @Override
+    protected MinioEndpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+        if (remaining == null || remaining.trim().length() == 0) {
+            throw new IllegalArgumentException("Bucket name must be specified.");
+        }
+
+        final MinioConfiguration configuration = this.configuration != null ? this.configuration.copy() : new MinioConfiguration();
+        configuration.setBucketName(remaining);
+        MinioEndpoint endpoint = new MinioEndpoint(uri, this, configuration);
+        setProperties(endpoint, parameters);
+        checkAndSetRegistryClient(configuration, endpoint);
+
+        return endpoint;
+    }
+
+    public MinioConfiguration getConfiguration() {
+        return configuration;
+    }
+
+    /**
+     * The component configuration
+     */
+    public void setConfiguration(MinioConfiguration configuration) {
+        this.configuration = configuration;
+    }
+
+    private void checkAndSetRegistryClient(MinioConfiguration configuration, MinioEndpoint endpoint) {
+        if (ObjectHelper.isEmpty(endpoint.getConfiguration().getMinioClient())) {
+            LOG.debug("Looking for an MinioClient instance in the registry");
+            Set<MinioClient> clients = getCamelContext().getRegistry().findByType(MinioClient.class);
+            if (clients.size() == 1) {

Review comment:
       I introduced the autoDisvoverCliebt for this situation in 3.5.0. in case it is false, that method won't be called




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [camel] DenisIstomin commented on a change in pull request #3897: CAMEL-13934 camel-minio - Component to store/load files from blob store

GitBox
In reply to this post by GitBox

DenisIstomin commented on a change in pull request #3897:
URL: https://github.com/apache/camel/pull/3897#discussion_r463689546



##########
File path: catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/minio-component.adoc
##########
@@ -0,0 +1,162 @@
+[[minio-component]]

Review comment:
       Content is not updated from original `adoc` file.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [camel] Nayananga commented on a change in pull request #3897: CAMEL-13934 camel-minio - Component to store/load files from blob store

GitBox
In reply to this post by GitBox

Nayananga commented on a change in pull request #3897:
URL: https://github.com/apache/camel/pull/3897#discussion_r464088247



##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java
##########
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio;
+
+import java.io.InputStream;
+import java.util.*;
+
+import io.minio.CopyObjectArgs;
+import io.minio.CopySource;
+import io.minio.GetObjectArgs;
+import io.minio.ListObjectsArgs;
+import io.minio.MinioClient;
+import io.minio.RemoveObjectArgs;
+import io.minio.errors.MinioException;
+import io.minio.messages.Contents;
+import io.minio.messages.ListBucketResultV2;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
+import org.apache.camel.Processor;
+import org.apache.camel.spi.Synchronization;
+import org.apache.camel.support.ScheduledBatchPollingConsumer;
+import org.apache.camel.util.CastUtils;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.URISupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Consumer of messages from the Minio Storage Service.
+ */
+public class MinioConsumer extends ScheduledBatchPollingConsumer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MinioConsumer.class);
+
+    private String continuationToken;
+    private transient String minioConsumerToString;
+
+    public MinioConsumer(MinioEndpoint endpoint, Processor processor) {
+        super(endpoint, processor);
+    }
+
+    @Override
+    protected int poll() throws Exception {
+        // must reset for each poll
+        shutdownRunningTask = null;
+        pendingExchanges = 0;
+
+        String bucketName = getConfiguration().getBucketName();
+        String objectName = getConfiguration().getObjectName();
+        MinioClient minioClient = getMinioClient();
+        Queue<Exchange> exchanges;
+
+        if (objectName != null) {
+            LOG.trace("Getting object in bucket {} with object name {}...", bucketName, objectName);
+
+            InputStream minioObject = getObject(bucketName, minioClient, objectName);
+            exchanges = createExchanges(minioObject, objectName);
+
+        } else {
+
+            LOG.trace("Queueing objects in bucket [{}]...", bucketName);
+
+            ListObjectsArgs.Builder listObjectRequest = ListObjectsArgs.builder()
+                    .bucket(bucketName)
+                    .includeUserMetadata(getConfiguration().isIncludeUserMetadata())
+                    .includeVersions(getConfiguration().isIncludeVersions())
+                    .recursive(getConfiguration().isRecursive())
+                    .useApiVersion1(getConfiguration().isUseVersion1());
+
+            if (getConfiguration().getDelimiter() != null) {
+                listObjectRequest.delimiter(getConfiguration().getDelimiter());
+            }
+
+            if (maxMessagesPerPoll > 0) {
+                listObjectRequest.maxKeys(maxMessagesPerPoll);
+            }
+
+            if (getConfiguration().getPrefix() != null) {
+                listObjectRequest.prefix(getConfiguration().getPrefix());
+            }
+
+            if (getConfiguration().getStartAfter() != null) {
+                listObjectRequest.startAfter(getConfiguration().getStartAfter());
+            }
+
+            // if there was a marker from previous poll then use that to
+            // continue from where we left last time
+            if (continuationToken != null) {
+                LOG.trace("Resuming from marker: {}", continuationToken);
+                listObjectRequest.continuationToken(continuationToken);
+            }
+
+            // TODO: Check for validity of the statement
+            ListBucketResultV2 listObjects = (ListBucketResultV2) getMinioClient().listObjects(listObjectRequest.build());
+
+            if (listObjects.isTruncated()) {
+                LOG.trace("Returned list is truncated, so setting next marker: {}", continuationToken);
+                continuationToken = listObjects.nextContinuationToken();
+
+            } else {
+                // no more data so clear marker
+                continuationToken = null;
+            }
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Found {} objects in bucket [{}]...", listObjects.contents().size(), bucketName);
+            }
+
+            exchanges = createExchanges(listObjects.contents());
+        }
+        return processBatch(CastUtils.cast(exchanges));
+    }
+
+    protected Queue<Exchange> createExchanges(InputStream objectStream, String objectName) throws Exception {
+        Queue<Exchange> answer = new LinkedList<>();
+        Exchange exchange = getEndpoint().createExchange(objectStream, objectName);
+        answer.add(exchange);
+        IOHelper.close(objectStream);
+        return answer;
+    }
+
+    protected Queue<Exchange> createExchanges(List<Contents> minioObjectSummaries) throws Exception {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Received {} messages in this poll", minioObjectSummaries.size());
+        }
+        String bucketName = getConfiguration().getBucketName();
+        Collection<InputStream> minioObjects = new ArrayList<>();
+        Queue<Exchange> answer = new LinkedList<>();
+        try {
+            if (getConfiguration().isIncludeFolders()) {
+                for (Contents minioObjectSummary : minioObjectSummaries) {
+                    InputStream minioObject = getObject(bucketName, getMinioClient(), minioObjectSummary.objectName());
+                    minioObjects.add(minioObject);
+                    Exchange exchange = getEndpoint().createExchange(minioObject, minioObjectSummary.objectName());
+                    answer.add(exchange);
+                }
+            } else {
+                for (Contents minioObjectSummary : minioObjectSummaries) {
+                    // ignore if directory
+                    if (!minioObjectSummary.isDir()) {
+                        InputStream minioObject = getObject(bucketName, getMinioClient(), minioObjectSummary.objectName());
+                        minioObjects.add(minioObject);
+                        Exchange exchange = getEndpoint().createExchange(minioObject, minioObjectSummary.objectName());
+                        answer.add(exchange);
+                    }
+                }
+            }
+
+        } catch (Throwable e) {
+            LOG.warn("Error getting MinioObject due: {}", e.getMessage());
+            throw e;
+
+        } finally {
+            // ensure all previous gathered minio objects are closed
+            // if there was an exception creating the exchanges in this batch
+            minioObjects.forEach(IOHelper::close);
+        }
+
+        return answer;
+    }
+
+    private InputStream getObject(String bucketName, MinioClient minioClient, String objectName) throws Exception {
+        GetObjectArgs.Builder getObjectRequest = GetObjectArgs.builder().bucket(bucketName).object(objectName);
+
+        if (getConfiguration().getServerSideEncryptionCustomerKey() != null) {
+            getObjectRequest.ssec(getConfiguration().getServerSideEncryptionCustomerKey());
+        }
+        if (getConfiguration().getOffset() != 0) {
+            getObjectRequest.offset(getConfiguration().getOffset());
+        }
+        if (getConfiguration().getLength() != 0) {
+            getObjectRequest.length(getConfiguration().getLength());
+        }
+        if (getConfiguration().getVersionId() != null) {
+            getObjectRequest.versionId(getConfiguration().getVersionId());
+        }
+        if (getConfiguration().getMatchETag() != null) {
+            getObjectRequest.matchETag(getConfiguration().getMatchETag());
+        }
+        if (getConfiguration().getNotMatchETag() != null) {
+            getObjectRequest.notMatchETag(getConfiguration().getNotMatchETag());
+        }
+        if (getConfiguration().getModifiedSince() != null) {
+            getObjectRequest.modifiedSince(getConfiguration().getModifiedSince());
+        }
+        if (getConfiguration().getUnModifiedSince() != null) {
+            getObjectRequest.unmodifiedSince(getConfiguration().getUnModifiedSince());
+        }
+
+        return minioClient.getObject(getObjectRequest.build());
+    }
+
+    @Override
+    public int processBatch(Queue<Object> exchanges) {
+        int total = exchanges.size();
+
+        for (int index = 0; index < total && isBatchAllowed(); index++) {
+            // only loop if we are started (allowed to run)
+            final Exchange exchange = ObjectHelper.cast(Exchange.class, exchanges.poll());
+            // add current index and total as properties
+            exchange.setProperty(Exchange.BATCH_INDEX, index);
+            exchange.setProperty(Exchange.BATCH_SIZE, total);
+            exchange.setProperty(Exchange.BATCH_COMPLETE, index == total - 1);
+
+            // update pending number of exchanges
+            pendingExchanges = total - index - 1;
+
+            // add on completion to handle after work when the exchange is done
+            exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() {
+                public void onComplete(Exchange exchange) {
+                    processCommit(exchange);
+                }
+
+                public void onFailure(Exchange exchange) {
+                    processRollback(exchange);
+                }
+
+                @Override
+                public String toString() {
+                    return "MinioConsumerOnCompletion";
+                }
+            });
+
+            LOG.trace("Processing exchange ...");
+            getAsyncProcessor().process(exchange, doneSync -> LOG.trace("Processing exchange done."));
+        }
+
+        return total;
+    }
+
+    /**
+     * Strategy to delete the message after being processed.
+     *
+     * @param exchange the exchange
+     */
+    protected void processCommit(Exchange exchange) {
+        try {
+            String srcBucketName = exchange.getIn().getHeader(MinioConstants.BUCKET_NAME, String.class);
+            String srcObjectName = exchange.getIn().getHeader(MinioConstants.OBJECT_NAME, String.class);
+
+            if (getConfiguration().isDeleteAfterRead() || getConfiguration().isMoveAfterRead()) {
+                if (getConfiguration().isMoveAfterRead()) {
+                    copyObject(srcBucketName, srcObjectName);
+                    LOG.trace("Copied object from bucket {} with objectName {} to bucket {}...",
+                            srcBucketName, srcObjectName, getConfiguration().getDestinationBucketName());
+                }
+
+                LOG.trace("Deleting object from bucket {} with objectName {}...", srcBucketName, srcObjectName);
+
+                RemoveObjectArgs.Builder removeObjectRequest = RemoveObjectArgs.builder()
+                        .bucket(srcBucketName)
+                        .object(srcObjectName)
+                        .bypassGovernanceMode(getConfiguration().isBypassGovernanceMode());
+
+                if (getConfiguration().getVersionId() != null) {
+                    removeObjectRequest.versionId(getConfiguration().getVersionId());
+                }
+                if (getConfiguration().isBypassGovernanceMode()) {
+                    removeObjectRequest.versionId(getConfiguration().getVersionId());
+                }
+                getMinioClient().removeObject(removeObjectRequest.build());
+
+                LOG.trace("Deleted object from bucket {} with objectName {}...", srcBucketName, srcObjectName);
+            }
+        } catch (MinioException e) {
+            getExceptionHandler().handleException("Error occurred during moving or deleting object. This exception is ignored.",
+                    exchange, e);
+        } catch (Exception e) {
+            LOG.trace("Error process commit...");

Review comment:
       Hi @omarsmak this has to handle since ```getMinioClient().removeObject(removeObjectRequest.build());``` throws an error




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [camel] Nayananga commented on a change in pull request #3897: CAMEL-13934 camel-minio - Component to store/load files from blob store

GitBox
In reply to this post by GitBox

Nayananga commented on a change in pull request #3897:
URL: https://github.com/apache/camel/pull/3897#discussion_r464088357



##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java
##########
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio;
+
+import java.io.InputStream;
+import java.util.*;
+
+import io.minio.CopyObjectArgs;
+import io.minio.CopySource;
+import io.minio.GetObjectArgs;
+import io.minio.ListObjectsArgs;
+import io.minio.MinioClient;
+import io.minio.RemoveObjectArgs;
+import io.minio.errors.MinioException;
+import io.minio.messages.Contents;
+import io.minio.messages.ListBucketResultV2;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
+import org.apache.camel.Processor;
+import org.apache.camel.spi.Synchronization;
+import org.apache.camel.support.ScheduledBatchPollingConsumer;
+import org.apache.camel.util.CastUtils;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.URISupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Consumer of messages from the Minio Storage Service.
+ */
+public class MinioConsumer extends ScheduledBatchPollingConsumer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MinioConsumer.class);
+
+    private String continuationToken;
+    private transient String minioConsumerToString;
+
+    public MinioConsumer(MinioEndpoint endpoint, Processor processor) {
+        super(endpoint, processor);
+    }
+
+    @Override
+    protected int poll() throws Exception {
+        // must reset for each poll
+        shutdownRunningTask = null;
+        pendingExchanges = 0;
+
+        String bucketName = getConfiguration().getBucketName();
+        String objectName = getConfiguration().getObjectName();
+        MinioClient minioClient = getMinioClient();
+        Queue<Exchange> exchanges;
+
+        if (objectName != null) {
+            LOG.trace("Getting object in bucket {} with object name {}...", bucketName, objectName);
+
+            InputStream minioObject = getObject(bucketName, minioClient, objectName);
+            exchanges = createExchanges(minioObject, objectName);
+
+        } else {
+
+            LOG.trace("Queueing objects in bucket [{}]...", bucketName);
+
+            ListObjectsArgs.Builder listObjectRequest = ListObjectsArgs.builder()
+                    .bucket(bucketName)
+                    .includeUserMetadata(getConfiguration().isIncludeUserMetadata())
+                    .includeVersions(getConfiguration().isIncludeVersions())
+                    .recursive(getConfiguration().isRecursive())
+                    .useApiVersion1(getConfiguration().isUseVersion1());
+
+            if (getConfiguration().getDelimiter() != null) {
+                listObjectRequest.delimiter(getConfiguration().getDelimiter());
+            }
+
+            if (maxMessagesPerPoll > 0) {
+                listObjectRequest.maxKeys(maxMessagesPerPoll);
+            }
+
+            if (getConfiguration().getPrefix() != null) {
+                listObjectRequest.prefix(getConfiguration().getPrefix());
+            }
+
+            if (getConfiguration().getStartAfter() != null) {
+                listObjectRequest.startAfter(getConfiguration().getStartAfter());
+            }
+
+            // if there was a marker from previous poll then use that to
+            // continue from where we left last time
+            if (continuationToken != null) {
+                LOG.trace("Resuming from marker: {}", continuationToken);
+                listObjectRequest.continuationToken(continuationToken);
+            }
+
+            // TODO: Check for validity of the statement
+            ListBucketResultV2 listObjects = (ListBucketResultV2) getMinioClient().listObjects(listObjectRequest.build());
+
+            if (listObjects.isTruncated()) {
+                LOG.trace("Returned list is truncated, so setting next marker: {}", continuationToken);
+                continuationToken = listObjects.nextContinuationToken();
+
+            } else {
+                // no more data so clear marker
+                continuationToken = null;
+            }
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Found {} objects in bucket [{}]...", listObjects.contents().size(), bucketName);
+            }
+
+            exchanges = createExchanges(listObjects.contents());
+        }
+        return processBatch(CastUtils.cast(exchanges));
+    }
+
+    protected Queue<Exchange> createExchanges(InputStream objectStream, String objectName) throws Exception {
+        Queue<Exchange> answer = new LinkedList<>();
+        Exchange exchange = getEndpoint().createExchange(objectStream, objectName);
+        answer.add(exchange);
+        IOHelper.close(objectStream);
+        return answer;
+    }
+
+    protected Queue<Exchange> createExchanges(List<Contents> minioObjectSummaries) throws Exception {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Received {} messages in this poll", minioObjectSummaries.size());
+        }
+        String bucketName = getConfiguration().getBucketName();
+        Collection<InputStream> minioObjects = new ArrayList<>();
+        Queue<Exchange> answer = new LinkedList<>();
+        try {
+            if (getConfiguration().isIncludeFolders()) {
+                for (Contents minioObjectSummary : minioObjectSummaries) {
+                    InputStream minioObject = getObject(bucketName, getMinioClient(), minioObjectSummary.objectName());
+                    minioObjects.add(minioObject);
+                    Exchange exchange = getEndpoint().createExchange(minioObject, minioObjectSummary.objectName());
+                    answer.add(exchange);
+                }
+            } else {
+                for (Contents minioObjectSummary : minioObjectSummaries) {
+                    // ignore if directory
+                    if (!minioObjectSummary.isDir()) {
+                        InputStream minioObject = getObject(bucketName, getMinioClient(), minioObjectSummary.objectName());
+                        minioObjects.add(minioObject);
+                        Exchange exchange = getEndpoint().createExchange(minioObject, minioObjectSummary.objectName());
+                        answer.add(exchange);
+                    }
+                }
+            }
+
+        } catch (Throwable e) {
+            LOG.warn("Error getting MinioObject due: {}", e.getMessage());
+            throw e;
+
+        } finally {
+            // ensure all previous gathered minio objects are closed
+            // if there was an exception creating the exchanges in this batch
+            minioObjects.forEach(IOHelper::close);
+        }
+
+        return answer;
+    }
+
+    private InputStream getObject(String bucketName, MinioClient minioClient, String objectName) throws Exception {
+        GetObjectArgs.Builder getObjectRequest = GetObjectArgs.builder().bucket(bucketName).object(objectName);
+
+        if (getConfiguration().getServerSideEncryptionCustomerKey() != null) {
+            getObjectRequest.ssec(getConfiguration().getServerSideEncryptionCustomerKey());
+        }
+        if (getConfiguration().getOffset() != 0) {
+            getObjectRequest.offset(getConfiguration().getOffset());
+        }
+        if (getConfiguration().getLength() != 0) {
+            getObjectRequest.length(getConfiguration().getLength());
+        }
+        if (getConfiguration().getVersionId() != null) {
+            getObjectRequest.versionId(getConfiguration().getVersionId());
+        }
+        if (getConfiguration().getMatchETag() != null) {
+            getObjectRequest.matchETag(getConfiguration().getMatchETag());
+        }
+        if (getConfiguration().getNotMatchETag() != null) {
+            getObjectRequest.notMatchETag(getConfiguration().getNotMatchETag());
+        }
+        if (getConfiguration().getModifiedSince() != null) {
+            getObjectRequest.modifiedSince(getConfiguration().getModifiedSince());
+        }
+        if (getConfiguration().getUnModifiedSince() != null) {
+            getObjectRequest.unmodifiedSince(getConfiguration().getUnModifiedSince());
+        }
+
+        return minioClient.getObject(getObjectRequest.build());
+    }
+
+    @Override
+    public int processBatch(Queue<Object> exchanges) {
+        int total = exchanges.size();
+
+        for (int index = 0; index < total && isBatchAllowed(); index++) {
+            // only loop if we are started (allowed to run)
+            final Exchange exchange = ObjectHelper.cast(Exchange.class, exchanges.poll());
+            // add current index and total as properties
+            exchange.setProperty(Exchange.BATCH_INDEX, index);
+            exchange.setProperty(Exchange.BATCH_SIZE, total);
+            exchange.setProperty(Exchange.BATCH_COMPLETE, index == total - 1);
+
+            // update pending number of exchanges
+            pendingExchanges = total - index - 1;
+
+            // add on completion to handle after work when the exchange is done
+            exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() {
+                public void onComplete(Exchange exchange) {
+                    processCommit(exchange);
+                }
+
+                public void onFailure(Exchange exchange) {
+                    processRollback(exchange);
+                }
+
+                @Override
+                public String toString() {
+                    return "MinioConsumerOnCompletion";
+                }
+            });
+
+            LOG.trace("Processing exchange ...");
+            getAsyncProcessor().process(exchange, doneSync -> LOG.trace("Processing exchange done."));
+        }
+
+        return total;
+    }
+
+    /**
+     * Strategy to delete the message after being processed.
+     *
+     * @param exchange the exchange
+     */
+    protected void processCommit(Exchange exchange) {
+        try {
+            String srcBucketName = exchange.getIn().getHeader(MinioConstants.BUCKET_NAME, String.class);
+            String srcObjectName = exchange.getIn().getHeader(MinioConstants.OBJECT_NAME, String.class);
+
+            if (getConfiguration().isDeleteAfterRead() || getConfiguration().isMoveAfterRead()) {
+                if (getConfiguration().isMoveAfterRead()) {
+                    copyObject(srcBucketName, srcObjectName);
+                    LOG.trace("Copied object from bucket {} with objectName {} to bucket {}...",
+                            srcBucketName, srcObjectName, getConfiguration().getDestinationBucketName());
+                }
+
+                LOG.trace("Deleting object from bucket {} with objectName {}...", srcBucketName, srcObjectName);
+
+                RemoveObjectArgs.Builder removeObjectRequest = RemoveObjectArgs.builder()
+                        .bucket(srcBucketName)
+                        .object(srcObjectName)
+                        .bypassGovernanceMode(getConfiguration().isBypassGovernanceMode());
+
+                if (getConfiguration().getVersionId() != null) {
+                    removeObjectRequest.versionId(getConfiguration().getVersionId());
+                }
+                if (getConfiguration().isBypassGovernanceMode()) {
+                    removeObjectRequest.versionId(getConfiguration().getVersionId());
+                }
+                getMinioClient().removeObject(removeObjectRequest.build());
+
+                LOG.trace("Deleted object from bucket {} with objectName {}...", srcBucketName, srcObjectName);
+            }
+        } catch (MinioException e) {
+            getExceptionHandler().handleException("Error occurred during moving or deleting object. This exception is ignored.",
+                    exchange, e);
+        } catch (Exception e) {
+            LOG.trace("Error process commit...");
+        }
+    }
+
+    private void copyObject(String srcBucketName, String srcObjectName) {
+        String destinationBucketName = getConfiguration().getDestinationBucketName();
+        if (destinationBucketName == null) {
+            throw new IllegalArgumentException("Destination Bucket name must be specified to copy operation");
+        }
+
+        try {
+            // set destination object name as source object name, if not specified
+            String destinationObjectName = (getConfiguration().getDestinationObjectName() != null)
+                    ? getConfiguration().getDestinationObjectName()
+                    : srcObjectName;
+
+
+            LOG.trace("Copying object from bucket {} with objectName {} to bucket {}...",
+                    srcBucketName, srcObjectName, destinationBucketName);
+
+            CopySource.Builder copySourceBuilder = CopySource.builder().bucket(srcBucketName).object(srcObjectName);
+            if (getConfiguration().getServerSideEncryptionCustomerKey() != null) {
+                copySourceBuilder.ssec(getConfiguration().getServerSideEncryptionCustomerKey());
+            }
+            if (getConfiguration().getOffset() != 0) {
+                copySourceBuilder.offset(getConfiguration().getOffset());
+            }
+            if (getConfiguration().getLength() != 0) {
+                copySourceBuilder.length(getConfiguration().getLength());
+            }
+            if (getConfiguration().getVersionId() != null) {
+                copySourceBuilder.versionId(getConfiguration().getVersionId());
+            }
+            if (getConfiguration().getMatchETag() != null) {
+                copySourceBuilder.matchETag(getConfiguration().getMatchETag());
+            }
+            if (getConfiguration().getNotMatchETag() != null) {
+                copySourceBuilder.notMatchETag(getConfiguration().getNotMatchETag());
+            }
+            if (getConfiguration().getModifiedSince() != null) {
+                copySourceBuilder.modifiedSince(getConfiguration().getModifiedSince());
+            }
+            if (getConfiguration().getUnModifiedSince() != null) {
+                copySourceBuilder.unmodifiedSince(getConfiguration().getUnModifiedSince());
+            }
+
+            CopyObjectArgs.Builder copyObjectRequest = CopyObjectArgs.builder()
+                    .source(copySourceBuilder.build())
+                    .bucket(getConfiguration().getDestinationBucketName())
+                    .object(destinationObjectName);
+
+            if (getConfiguration().getServerSideEncryption() != null) {
+                copyObjectRequest.sse(getConfiguration().getServerSideEncryption());
+            }
+
+            getMinioClient().copyObject(copyObjectRequest.build());
+
+        } catch (Exception e) {
+            LOG.warn("Error copy object from bucket {} with objectName {} to bucket {}...",

Review comment:
       Hi @omarsmak this has to handle since ```getMinioClient().copyObject(copyObjectRequest.build());``` throws an error




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [camel] Nayananga commented on a change in pull request #3897: CAMEL-13934 camel-minio - Component to store/load files from blob store

GitBox
In reply to this post by GitBox

Nayananga commented on a change in pull request #3897:
URL: https://github.com/apache/camel/pull/3897#discussion_r464100320



##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/client/GetMinioClient.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio.client;
+
+import io.minio.MinioClient;
+import org.apache.camel.component.minio.MinioConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Creates MinIO client object according to the
+ * given endpoint, port, access key, secret key, region and secure option.
+ */
+public class GetMinioClient implements MinioCamelInternalClient {
+    private static final Logger LOG = LoggerFactory.getLogger(GetMinioClient.class);
+    private final MinioConfiguration configuration;
+
+    /**
+     * Constructor that uses the config file.
+     */
+    public GetMinioClient(MinioConfiguration configuration) {
+        LOG.trace("Creating an Minio client.");
+        this.configuration = configuration;
+    }
+
+    /**
+     * Getting the minio client.
+     *
+     * @return Minio Client.
+     */
+    @Override
+    public MinioClient getMinioClient() {
+        if (configuration.getEndpoint() != null) {
+            MinioClient.Builder minioClientRequest = MinioClient.builder();
+
+            if (configuration.getProxyPort() != null) {
+                minioClientRequest.endpoint(configuration.getEndpoint(), configuration.getProxyPort(), configuration.isSecure());
+            } else {
+                minioClientRequest.endpoint(configuration.getEndpoint());
+            }
+            if (configuration.getAccessKey() != null && configuration.getSecretKey() != null) {

Review comment:
       Hi @omarsmak sorry, I didn't get this question?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [camel] Nayananga commented on a change in pull request #3897: CAMEL-13934 camel-minio - Component to store/load files from blob store

GitBox
In reply to this post by GitBox

Nayananga commented on a change in pull request #3897:
URL: https://github.com/apache/camel/pull/3897#discussion_r464112015



##########
File path: components/camel-minio/src/test/java/org/apache/camel/component/minio/integration/MinioConsumerIntegrationTest.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio.integration;
+
+import io.minio.MinioClient;
+import org.apache.camel.BindToRegistry;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.minio.MinioConstants;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+@Disabled("Must be manually tested. Provide your own accessKey and secretKey!")
+public class MinioConsumerIntegrationTest extends CamelTestSupport {
+
+    @BindToRegistry("minioClient")
+    MinioClient client = MinioClient.builder()
+            .endpoint("https://play.min.io")
+            .credentials("Q3AM3UQ867SPQQA43P2F", "zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG")

Review comment:
       Hi @DenisIstomin, How can I do that?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [camel] Nayananga commented on a change in pull request #3897: CAMEL-13934 camel-minio - Component to store/load files from blob store

GitBox
In reply to this post by GitBox

Nayananga commented on a change in pull request #3897:
URL: https://github.com/apache/camel/pull/3897#discussion_r464113257



##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioProducer.java
##########
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio;
+
+import java.io.*;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import io.minio.CopyObjectArgs;
+import io.minio.CopySource;
+import io.minio.GetObjectArgs;
+import io.minio.ListObjectsArgs;
+import io.minio.MinioClient;
+import io.minio.ObjectWriteResponse;
+import io.minio.PutObjectArgs;
+import io.minio.RemoveBucketArgs;
+import io.minio.RemoveObjectArgs;
+import io.minio.RemoveObjectsArgs;
+import io.minio.Result;
+import io.minio.messages.Bucket;
+import io.minio.messages.Item;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.Message;
+import org.apache.camel.WrappedFile;
+import org.apache.camel.support.DefaultProducer;
+import org.apache.camel.util.FileUtil;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.URISupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Producer which sends messages to the Minio Simple Storage
+ */
+public class MinioProducer extends DefaultProducer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MinioProducer.class);
+
+    private transient String minioProducerToString;
+
+    public MinioProducer(final Endpoint endpoint) {
+        super(endpoint);
+    }
+
+    public static Message getMessageForResponse(final Exchange exchange) {
+        return exchange.getMessage();
+    }
+
+    @Override
+    public void process(final Exchange exchange) throws Exception {
+        MinioOperations operation = determineOperation(exchange);
+        MinioClient minioClient = getEndpoint().getMinioClient();
+        if (ObjectHelper.isEmpty(operation)) {
+            putObject(minioClient, exchange);
+        } else {
+            switch (operation) {
+                case copyObject:
+                    copyObject(minioClient, exchange);
+                    break;
+                case deleteObject:
+                    deleteObject(minioClient, exchange);
+                    break;
+                case deleteObjects:
+                    deleteObjects(minioClient, exchange);
+                    break;
+                case listBuckets:
+                    listBuckets(minioClient, exchange);
+                    break;
+                case deleteBucket:
+                    deleteBucket(minioClient, exchange);
+                    break;
+                case listObjects:
+                    listObjects(minioClient, exchange);
+                    break;
+                case getObject:
+                    getObject(minioClient, exchange);
+                    break;
+                case getPartialObject:
+                    getPartialObject(minioClient, exchange);
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unsupported operation");
+            }
+        }
+    }
+
+    public void putObject(MinioClient minioClient, final Exchange exchange) throws Exception {
+
+        if (getConfiguration().isPojoRequest()) {
+            PutObjectArgs.Builder payload = exchange.getIn().getMandatoryBody(PutObjectArgs.Builder.class);
+            if (payload != null) {
+                ObjectWriteResponse putObjectResult = minioClient.putObject(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setHeader(MinioConstants.E_TAG, putObjectResult.etag());
+                if (putObjectResult.versionId() != null) {
+                    message.setHeader(MinioConstants.VERSION_ID, putObjectResult.versionId());
+                }
+            }
+        } else {
+            final String bucketName = determineBucketName(exchange);
+            final String objectName = determineObjectName(exchange);
+            Map<String, String> objectMetadata = determineMetadata(exchange);
+            Map<String, String> extraHeaders = determineExtraHeaders(exchange);
+
+            File filePayload = null;
+            InputStream inputStream;
+            ByteArrayOutputStream baos;
+            Object object = exchange.getIn().getMandatoryBody();
+
+            // Need to check if the message body is WrappedFile
+            if (object instanceof WrappedFile) {
+                object = ((WrappedFile<?>) object).getFile();
+            }
+            if (object instanceof File) {
+                filePayload = (File) object;
+                inputStream = new FileInputStream(filePayload);
+            } else {
+                inputStream = exchange.getIn().getMandatoryBody(InputStream.class);
+                if (objectMetadata.containsKey(Exchange.CONTENT_LENGTH)) {
+                    if (objectMetadata.get("Content-Length").equals("0") && ObjectHelper.isEmpty(exchange.getProperty(Exchange.CONTENT_LENGTH))) {
+                        LOG.debug("The content length is not defined. It needs to be determined by reading the data into memory");
+                        baos = determineLengthInputStream(inputStream);
+                        objectMetadata.put("Content-Length", String.valueOf(baos.size()));
+                        inputStream = new ByteArrayInputStream(baos.toByteArray());
+                    } else {
+                        if (ObjectHelper.isNotEmpty(exchange.getProperty(Exchange.CONTENT_LENGTH))) {
+                            objectMetadata.put("Content-Length", exchange.getProperty(Exchange.CONTENT_LENGTH, String.class));
+                        }
+                    }
+                }
+            }
+            PutObjectArgs.Builder putObjectRequest = PutObjectArgs.builder()
+                    .stream(inputStream, inputStream.available(), -1)
+                    .bucket(bucketName)
+                    .object(objectName)
+                    .userMetadata(objectMetadata);
+
+            if (!extraHeaders.isEmpty()) {
+                putObjectRequest.extraHeaders(extraHeaders);
+            }
+
+            LOG.trace("Put object from exchange...");
+
+            ObjectWriteResponse putObjectResult = getEndpoint().getMinioClient().putObject(putObjectRequest.build());
+
+            LOG.trace("Received result...");
+
+            Message message = getMessageForResponse(exchange);
+            message.setHeader(MinioConstants.E_TAG, putObjectResult.etag());
+            if (putObjectResult.versionId() != null) {
+                message.setHeader(MinioConstants.VERSION_ID, putObjectResult.versionId());
+            }
+
+            IOHelper.close(inputStream);
+
+            if (getConfiguration().isDeleteAfterWrite() && filePayload != null) {
+                FileUtil.deleteFile(filePayload);
+            }
+        }
+    }
+
+    private Map<String, String> determineExtraHeaders(Exchange exchange) {
+        Map<String, String> extraHeaders = new HashMap<>();
+        String storageClass = determineStorageClass(exchange);
+        if (storageClass != null) {
+            extraHeaders.put("X-Amz-Storage-Class", storageClass);
+        }
+
+        String cannedAcl = exchange.getIn().getHeader(MinioConstants.CANNED_ACL, String.class);
+        if (cannedAcl != null) {
+            extraHeaders.put("x-amz-acl", cannedAcl);
+        }
+
+        return extraHeaders;
+    }
+
+    private void copyObject(MinioClient minioClient, Exchange exchange) throws Exception {
+
+        if (getConfiguration().isPojoRequest()) {
+            CopyObjectArgs.Builder payload = exchange.getIn().getMandatoryBody(CopyObjectArgs.Builder.class);
+            if (payload != null) {
+                ObjectWriteResponse result = minioClient.copyObject(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody(result);
+            }
+        } else {
+
+            final String bucketName = determineBucketName(exchange);
+            final String sourceKey = determineObjectName(exchange);
+            final String destinationKey = exchange.getIn().getHeader(MinioConstants.DESTINATION_OBJECT_NAME, String.class);
+            final String destinationBucketName = exchange.getIn().getHeader(MinioConstants.DESTINATION_BUCKET_NAME, String.class);
+
+            if (ObjectHelper.isEmpty(destinationBucketName)) {
+                throw new IllegalArgumentException("Bucket Name Destination must be specified for copyObject Operation");
+            }
+            if (ObjectHelper.isEmpty(destinationKey)) {
+                throw new IllegalArgumentException("Destination Key must be specified for copyObject Operation");
+            }
+
+            CopySource.Builder copySourceBuilder = CopySource.builder()
+                    .bucket(bucketName)
+                    .object(sourceKey);
+
+            CopyObjectArgs.Builder copyObjectRequest = CopyObjectArgs.builder()
+                    .bucket(destinationBucketName)
+                    .object(destinationKey)
+                    .source(copySourceBuilder.build());
+
+            ObjectWriteResponse copyObjectResult = minioClient.copyObject(copyObjectRequest.build());
+
+            Message message = getMessageForResponse(exchange);
+            if (copyObjectResult.versionId() != null) {
+                message.setHeader(MinioConstants.VERSION_ID, copyObjectResult.versionId());
+            }
+        }
+    }
+
+    private void deleteObject(MinioClient minioClient, Exchange exchange) throws Exception {
+        final String bucketName = determineBucketName(exchange);
+        final String sourceKey = determineObjectName(exchange);
+        final String versionId = determineVersionId(exchange);
+        if (getConfiguration().isPojoRequest()) {
+            RemoveObjectArgs.Builder payload = exchange.getIn().getMandatoryBody(RemoveObjectArgs.Builder.class);
+            if (payload != null) {
+                minioClient.removeObject(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody(true);
+            }
+        } else {
+
+            minioClient.removeObject(RemoveObjectArgs.builder()
+                    .bucket(bucketName)
+                    .object(sourceKey).build());
+
+            Message message = getMessageForResponse(exchange);
+            message.setBody(true);
+        }
+    }
+
+    private void deleteObjects(MinioClient minioClient, Exchange exchange) throws Exception {
+        if (getConfiguration().isPojoRequest()) {
+            RemoveObjectsArgs.Builder payload = exchange.getIn().getMandatoryBody(RemoveObjectsArgs.Builder.class);
+            if (payload != null) {
+                minioClient.removeObjects(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody(true);
+            }
+        } else {
+            throw new IllegalArgumentException("Cannot delete multiple objects without a POJO request");
+        }
+    }
+
+    private void listBuckets(MinioClient minioClient, Exchange exchange) throws Exception {
+        List<Bucket> bucketsList = minioClient.listBuckets();
+        Message message = getMessageForResponse(exchange);
+        //returns iterator of bucketList
+        message.setBody(bucketsList.iterator());
+    }
+
+    private void deleteBucket(MinioClient minioClient, Exchange exchange) throws Exception {
+        final String bucketName = determineBucketName(exchange);
+
+        if (getConfiguration().isPojoRequest()) {
+            RemoveBucketArgs.Builder payload = exchange.getIn().getMandatoryBody(RemoveBucketArgs.Builder.class);
+            if (payload != null) {
+                minioClient.removeBucket(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody("ok");
+            }
+        } else {
+
+            minioClient.removeBucket(RemoveBucketArgs.builder().bucket(bucketName).build());
+            Message message = getMessageForResponse(exchange);
+            message.setBody("ok");
+        }
+    }
+
+    private void getObject(MinioClient minioClient, Exchange exchange) throws Exception {
+
+        if (getConfiguration().isPojoRequest()) {
+            GetObjectArgs.Builder payload = exchange.getIn().getMandatoryBody(GetObjectArgs.Builder.class);
+            if (payload != null) {
+                InputStream respond = minioClient.getObject(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody(respond);
+            }
+        } else {
+            final String bucketName = determineBucketName(exchange);
+            final String sourceKey = determineObjectName(exchange);
+
+            InputStream respond = minioClient.getObject(GetObjectArgs.builder()
+                    .bucket(bucketName)
+                    .object(sourceKey)
+                    .build());
+
+            Message message = getMessageForResponse(exchange);
+            message.setBody(respond);
+        }
+    }
+
+    private void getPartialObject(MinioClient minioClient, Exchange exchange) throws Exception {
+
+        if (getConfiguration().isPojoRequest()) {
+            GetObjectArgs.Builder payload = exchange.getIn().getMandatoryBody(GetObjectArgs.Builder.class);
+            if (payload != null) {
+                InputStream respond = minioClient.getObject(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody(respond);
+            }
+        } else {
+            final String bucketName = determineBucketName(exchange);
+            final String sourceKey = determineObjectName(exchange);
+            final String offset = exchange.getIn().getHeader(MinioConstants.OFFSET, String.class);
+            final String length = exchange.getIn().getHeader(MinioConstants.LENGTH, String.class);
+
+            if (ObjectHelper.isEmpty(offset) || ObjectHelper.isEmpty(length)) {
+                throw new IllegalArgumentException("A Offset and length header must be configured to perform a partial get operation.");
+            }
+
+            InputStream respond = minioClient.getObject(GetObjectArgs.builder()
+                    .bucket(bucketName)
+                    .object(sourceKey)
+                    .offset(Long.parseLong(offset))
+                    .length(Long.parseLong(length))
+                    .build());
+
+            Message message = getMessageForResponse(exchange);
+            message.setBody(respond);
+        }
+    }
+
+    private void listObjects(MinioClient minioClient, Exchange exchange) throws InvalidPayloadException {
+
+        if (getConfiguration().isPojoRequest()) {
+            ListObjectsArgs.Builder payload = exchange.getIn().getMandatoryBody(ListObjectsArgs.Builder.class);
+            if (payload != null) {
+                Iterable<Result<Item>> objectList = minioClient.listObjects(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody(objectList);
+            }
+        } else {
+            final String bucketName = determineBucketName(exchange);
+
+            Iterable<Result<Item>> objectList = minioClient.listObjects(ListObjectsArgs.builder()
+                    .bucket(bucketName)
+                    .recursive(getConfiguration().isRecursive())
+                    .build());
+
+            Message message = getMessageForResponse(exchange);
+            message.setBody(objectList);
+        }
+    }
+
+    private MinioOperations determineOperation(Exchange exchange) {
+        MinioOperations operation = exchange.getIn().getHeader(MinioConstants.MINIO_OPERATION, MinioOperations.class);
+        if (operation == null) {
+            operation = getConfiguration().getOperation();
+        }
+        return operation;
+    }
+
+    private Map<String, String> determineMetadata(final Exchange exchange) {
+        Map<String, String> objectMetadata = new HashMap<>();
+
+        Long contentLength = exchange.getIn().getHeader(MinioConstants.CONTENT_LENGTH, Long.class);
+        if (contentLength != null) {
+            objectMetadata.put("Content-Length", String.valueOf(contentLength));
+        }
+
+        String contentType = exchange.getIn().getHeader(MinioConstants.CONTENT_TYPE, String.class);
+        if (contentType != null) {
+            objectMetadata.put("Content-Type", contentType);
+        }
+
+        String cacheControl = exchange.getIn().getHeader(MinioConstants.CACHE_CONTROL, String.class);
+        if (cacheControl != null) {
+            objectMetadata.put("Cache-Control", cacheControl);
+        }
+
+        String contentDisposition = exchange.getIn().getHeader(MinioConstants.CONTENT_DISPOSITION, String.class);
+        if (contentDisposition != null) {
+            objectMetadata.put("Content-Disposition", contentDisposition);
+        }
+
+        String contentEncoding = exchange.getIn().getHeader(MinioConstants.CONTENT_ENCODING, String.class);
+        if (contentEncoding != null) {
+            objectMetadata.put("Content-Encoding", contentEncoding);
+        }
+
+        String contentMD5 = exchange.getIn().getHeader(MinioConstants.CONTENT_MD5, String.class);
+        if (contentMD5 != null) {
+            objectMetadata.put("Content-Md5", contentMD5);
+        }
+
+        return objectMetadata;
+    }
+
+    /**
+     * Reads the bucket name from the header of the given exchange. If not
+     * provided, it's read from the endpoint configuration.
+     *
+     * @param exchange The exchange to read the header from.
+     * @return The bucket name.
+     * @throws IllegalArgumentException if the header could not be determined.
+     */
+    private String determineBucketName(final Exchange exchange) {
+        String bucketName = exchange.getIn().getHeader(MinioConstants.BUCKET_NAME, String.class);
+
+        if (ObjectHelper.isEmpty(bucketName)) {
+            bucketName = getConfiguration().getBucketName();
+            LOG.trace("Minio Bucket name header is missing, using default one [{}]", bucketName);
+        }
+
+        if (bucketName == null) {
+            throw new IllegalArgumentException("Minio Bucket name header is missing or not configured.");
+        }
+
+        return bucketName;
+    }
+
+    private String determineObjectName(final Exchange exchange) {
+        String objectName = exchange.getIn().getHeader(MinioConstants.OBJECT_NAME, String.class);
+        if (ObjectHelper.isEmpty(objectName)) {
+            objectName = getConfiguration().getKeyName();
+        }
+        if (objectName == null) {
+            throw new IllegalArgumentException("Minio Key header is missing.");
+        }
+        return objectName;
+    }
+
+    private String determineStorageClass(final Exchange exchange) {
+        String storageClass = exchange.getIn().getHeader(MinioConstants.STORAGE_CLASS, String.class);
+        if (storageClass == null) {
+            storageClass = getConfiguration().getStorageClass();
+        }
+
+        return storageClass;
+    }
+
+    private String determineVersionId(final Exchange exchange) {
+        String versionId = exchange.getIn().getHeader(MinioConstants.VERSION_ID, String.class);
+        if (versionId == null) {
+            versionId = getConfiguration().getVersionId();
+        }
+
+        return versionId;
+    }
+
+    private ByteArrayOutputStream determineLengthInputStream(InputStream inputStream) throws IOException {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        byte[] bytes = new byte[1024];

Review comment:
       Hi @omarsmak, I was actually mimicking [this](https://github.com/apache/camel/blob/d4cfbd3631a0b277b5228224e1855c7229062c21/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Producer.java#L573) line. I will put this as a constant in MinioConstants.java

##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioProducer.java
##########
@@ -0,0 +1,494 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio;
+
+import java.io.*;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import io.minio.CopyObjectArgs;
+import io.minio.CopySource;
+import io.minio.GetObjectArgs;
+import io.minio.ListObjectsArgs;
+import io.minio.MinioClient;
+import io.minio.ObjectWriteResponse;
+import io.minio.PutObjectArgs;
+import io.minio.RemoveBucketArgs;
+import io.minio.RemoveObjectArgs;
+import io.minio.RemoveObjectsArgs;
+import io.minio.Result;
+import io.minio.messages.Bucket;
+import io.minio.messages.Item;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.InvalidPayloadException;
+import org.apache.camel.Message;
+import org.apache.camel.WrappedFile;
+import org.apache.camel.support.DefaultProducer;
+import org.apache.camel.util.FileUtil;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.URISupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Producer which sends messages to the Minio Simple Storage
+ */
+public class MinioProducer extends DefaultProducer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MinioProducer.class);
+
+    private transient String minioProducerToString;
+
+    public MinioProducer(final Endpoint endpoint) {
+        super(endpoint);
+    }
+
+    public static Message getMessageForResponse(final Exchange exchange) {
+        return exchange.getMessage();
+    }
+
+    @Override
+    public void process(final Exchange exchange) throws Exception {
+        MinioOperations operation = determineOperation(exchange);
+        MinioClient minioClient = getEndpoint().getMinioClient();
+        if (ObjectHelper.isEmpty(operation)) {
+            putObject(minioClient, exchange);
+        } else {
+            switch (operation) {
+                case copyObject:
+                    copyObject(minioClient, exchange);
+                    break;
+                case deleteObject:
+                    deleteObject(minioClient, exchange);
+                    break;
+                case deleteObjects:
+                    deleteObjects(minioClient, exchange);
+                    break;
+                case listBuckets:
+                    listBuckets(minioClient, exchange);
+                    break;
+                case deleteBucket:
+                    deleteBucket(minioClient, exchange);
+                    break;
+                case listObjects:
+                    listObjects(minioClient, exchange);
+                    break;
+                case getObject:
+                    getObject(minioClient, exchange);
+                    break;
+                case getPartialObject:
+                    getPartialObject(minioClient, exchange);
+                    break;
+                default:
+                    throw new IllegalArgumentException("Unsupported operation");
+            }
+        }
+    }
+
+    public void putObject(MinioClient minioClient, final Exchange exchange) throws Exception {
+
+        if (getConfiguration().isPojoRequest()) {
+            PutObjectArgs.Builder payload = exchange.getIn().getMandatoryBody(PutObjectArgs.Builder.class);
+            if (payload != null) {
+                ObjectWriteResponse putObjectResult = minioClient.putObject(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setHeader(MinioConstants.E_TAG, putObjectResult.etag());
+                if (putObjectResult.versionId() != null) {
+                    message.setHeader(MinioConstants.VERSION_ID, putObjectResult.versionId());
+                }
+            }
+        } else {
+            final String bucketName = determineBucketName(exchange);
+            final String objectName = determineObjectName(exchange);
+            Map<String, String> objectMetadata = determineMetadata(exchange);
+            Map<String, String> extraHeaders = determineExtraHeaders(exchange);
+
+            File filePayload = null;
+            InputStream inputStream;
+            ByteArrayOutputStream baos;
+            Object object = exchange.getIn().getMandatoryBody();
+
+            // Need to check if the message body is WrappedFile
+            if (object instanceof WrappedFile) {
+                object = ((WrappedFile<?>) object).getFile();
+            }
+            if (object instanceof File) {
+                filePayload = (File) object;
+                inputStream = new FileInputStream(filePayload);
+            } else {
+                inputStream = exchange.getIn().getMandatoryBody(InputStream.class);
+                if (objectMetadata.containsKey(Exchange.CONTENT_LENGTH)) {
+                    if (objectMetadata.get("Content-Length").equals("0") && ObjectHelper.isEmpty(exchange.getProperty(Exchange.CONTENT_LENGTH))) {
+                        LOG.debug("The content length is not defined. It needs to be determined by reading the data into memory");
+                        baos = determineLengthInputStream(inputStream);
+                        objectMetadata.put("Content-Length", String.valueOf(baos.size()));
+                        inputStream = new ByteArrayInputStream(baos.toByteArray());
+                    } else {
+                        if (ObjectHelper.isNotEmpty(exchange.getProperty(Exchange.CONTENT_LENGTH))) {
+                            objectMetadata.put("Content-Length", exchange.getProperty(Exchange.CONTENT_LENGTH, String.class));
+                        }
+                    }
+                }
+            }
+            PutObjectArgs.Builder putObjectRequest = PutObjectArgs.builder()
+                    .stream(inputStream, inputStream.available(), -1)
+                    .bucket(bucketName)
+                    .object(objectName)
+                    .userMetadata(objectMetadata);
+
+            if (!extraHeaders.isEmpty()) {
+                putObjectRequest.extraHeaders(extraHeaders);
+            }
+
+            LOG.trace("Put object from exchange...");
+
+            ObjectWriteResponse putObjectResult = getEndpoint().getMinioClient().putObject(putObjectRequest.build());
+
+            LOG.trace("Received result...");
+
+            Message message = getMessageForResponse(exchange);
+            message.setHeader(MinioConstants.E_TAG, putObjectResult.etag());
+            if (putObjectResult.versionId() != null) {
+                message.setHeader(MinioConstants.VERSION_ID, putObjectResult.versionId());
+            }
+
+            IOHelper.close(inputStream);
+
+            if (getConfiguration().isDeleteAfterWrite() && filePayload != null) {
+                FileUtil.deleteFile(filePayload);
+            }
+        }
+    }
+
+    private Map<String, String> determineExtraHeaders(Exchange exchange) {
+        Map<String, String> extraHeaders = new HashMap<>();
+        String storageClass = determineStorageClass(exchange);
+        if (storageClass != null) {
+            extraHeaders.put("X-Amz-Storage-Class", storageClass);
+        }
+
+        String cannedAcl = exchange.getIn().getHeader(MinioConstants.CANNED_ACL, String.class);
+        if (cannedAcl != null) {
+            extraHeaders.put("x-amz-acl", cannedAcl);
+        }
+
+        return extraHeaders;
+    }
+
+    private void copyObject(MinioClient minioClient, Exchange exchange) throws Exception {
+
+        if (getConfiguration().isPojoRequest()) {
+            CopyObjectArgs.Builder payload = exchange.getIn().getMandatoryBody(CopyObjectArgs.Builder.class);
+            if (payload != null) {
+                ObjectWriteResponse result = minioClient.copyObject(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody(result);
+            }
+        } else {
+
+            final String bucketName = determineBucketName(exchange);
+            final String sourceKey = determineObjectName(exchange);
+            final String destinationKey = exchange.getIn().getHeader(MinioConstants.DESTINATION_OBJECT_NAME, String.class);
+            final String destinationBucketName = exchange.getIn().getHeader(MinioConstants.DESTINATION_BUCKET_NAME, String.class);
+
+            if (ObjectHelper.isEmpty(destinationBucketName)) {
+                throw new IllegalArgumentException("Bucket Name Destination must be specified for copyObject Operation");
+            }
+            if (ObjectHelper.isEmpty(destinationKey)) {
+                throw new IllegalArgumentException("Destination Key must be specified for copyObject Operation");
+            }
+
+            CopySource.Builder copySourceBuilder = CopySource.builder()
+                    .bucket(bucketName)
+                    .object(sourceKey);
+
+            CopyObjectArgs.Builder copyObjectRequest = CopyObjectArgs.builder()
+                    .bucket(destinationBucketName)
+                    .object(destinationKey)
+                    .source(copySourceBuilder.build());
+
+            ObjectWriteResponse copyObjectResult = minioClient.copyObject(copyObjectRequest.build());
+
+            Message message = getMessageForResponse(exchange);
+            if (copyObjectResult.versionId() != null) {
+                message.setHeader(MinioConstants.VERSION_ID, copyObjectResult.versionId());
+            }
+        }
+    }
+
+    private void deleteObject(MinioClient minioClient, Exchange exchange) throws Exception {
+        final String bucketName = determineBucketName(exchange);
+        final String sourceKey = determineObjectName(exchange);
+        final String versionId = determineVersionId(exchange);
+        if (getConfiguration().isPojoRequest()) {
+            RemoveObjectArgs.Builder payload = exchange.getIn().getMandatoryBody(RemoveObjectArgs.Builder.class);
+            if (payload != null) {
+                minioClient.removeObject(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody(true);
+            }
+        } else {
+
+            minioClient.removeObject(RemoveObjectArgs.builder()
+                    .bucket(bucketName)
+                    .object(sourceKey).build());
+
+            Message message = getMessageForResponse(exchange);
+            message.setBody(true);
+        }
+    }
+
+    private void deleteObjects(MinioClient minioClient, Exchange exchange) throws Exception {
+        if (getConfiguration().isPojoRequest()) {
+            RemoveObjectsArgs.Builder payload = exchange.getIn().getMandatoryBody(RemoveObjectsArgs.Builder.class);
+            if (payload != null) {
+                minioClient.removeObjects(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody(true);
+            }
+        } else {
+            throw new IllegalArgumentException("Cannot delete multiple objects without a POJO request");
+        }
+    }
+
+    private void listBuckets(MinioClient minioClient, Exchange exchange) throws Exception {
+        List<Bucket> bucketsList = minioClient.listBuckets();
+        Message message = getMessageForResponse(exchange);
+        //returns iterator of bucketList
+        message.setBody(bucketsList.iterator());
+    }
+
+    private void deleteBucket(MinioClient minioClient, Exchange exchange) throws Exception {
+        final String bucketName = determineBucketName(exchange);
+
+        if (getConfiguration().isPojoRequest()) {
+            RemoveBucketArgs.Builder payload = exchange.getIn().getMandatoryBody(RemoveBucketArgs.Builder.class);
+            if (payload != null) {
+                minioClient.removeBucket(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody("ok");
+            }
+        } else {
+
+            minioClient.removeBucket(RemoveBucketArgs.builder().bucket(bucketName).build());
+            Message message = getMessageForResponse(exchange);
+            message.setBody("ok");
+        }
+    }
+
+    private void getObject(MinioClient minioClient, Exchange exchange) throws Exception {
+
+        if (getConfiguration().isPojoRequest()) {
+            GetObjectArgs.Builder payload = exchange.getIn().getMandatoryBody(GetObjectArgs.Builder.class);
+            if (payload != null) {
+                InputStream respond = minioClient.getObject(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody(respond);
+            }
+        } else {
+            final String bucketName = determineBucketName(exchange);
+            final String sourceKey = determineObjectName(exchange);
+
+            InputStream respond = minioClient.getObject(GetObjectArgs.builder()
+                    .bucket(bucketName)
+                    .object(sourceKey)
+                    .build());
+
+            Message message = getMessageForResponse(exchange);
+            message.setBody(respond);
+        }
+    }
+
+    private void getPartialObject(MinioClient minioClient, Exchange exchange) throws Exception {
+
+        if (getConfiguration().isPojoRequest()) {
+            GetObjectArgs.Builder payload = exchange.getIn().getMandatoryBody(GetObjectArgs.Builder.class);
+            if (payload != null) {
+                InputStream respond = minioClient.getObject(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody(respond);
+            }
+        } else {
+            final String bucketName = determineBucketName(exchange);
+            final String sourceKey = determineObjectName(exchange);
+            final String offset = exchange.getIn().getHeader(MinioConstants.OFFSET, String.class);
+            final String length = exchange.getIn().getHeader(MinioConstants.LENGTH, String.class);
+
+            if (ObjectHelper.isEmpty(offset) || ObjectHelper.isEmpty(length)) {
+                throw new IllegalArgumentException("A Offset and length header must be configured to perform a partial get operation.");
+            }
+
+            InputStream respond = minioClient.getObject(GetObjectArgs.builder()
+                    .bucket(bucketName)
+                    .object(sourceKey)
+                    .offset(Long.parseLong(offset))
+                    .length(Long.parseLong(length))
+                    .build());
+
+            Message message = getMessageForResponse(exchange);
+            message.setBody(respond);
+        }
+    }
+
+    private void listObjects(MinioClient minioClient, Exchange exchange) throws InvalidPayloadException {
+
+        if (getConfiguration().isPojoRequest()) {
+            ListObjectsArgs.Builder payload = exchange.getIn().getMandatoryBody(ListObjectsArgs.Builder.class);
+            if (payload != null) {
+                Iterable<Result<Item>> objectList = minioClient.listObjects(payload.build());
+                Message message = getMessageForResponse(exchange);
+                message.setBody(objectList);
+            }
+        } else {
+            final String bucketName = determineBucketName(exchange);
+
+            Iterable<Result<Item>> objectList = minioClient.listObjects(ListObjectsArgs.builder()
+                    .bucket(bucketName)
+                    .recursive(getConfiguration().isRecursive())
+                    .build());
+
+            Message message = getMessageForResponse(exchange);
+            message.setBody(objectList);
+        }
+    }
+
+    private MinioOperations determineOperation(Exchange exchange) {
+        MinioOperations operation = exchange.getIn().getHeader(MinioConstants.MINIO_OPERATION, MinioOperations.class);
+        if (operation == null) {
+            operation = getConfiguration().getOperation();
+        }
+        return operation;
+    }
+
+    private Map<String, String> determineMetadata(final Exchange exchange) {
+        Map<String, String> objectMetadata = new HashMap<>();
+
+        Long contentLength = exchange.getIn().getHeader(MinioConstants.CONTENT_LENGTH, Long.class);
+        if (contentLength != null) {
+            objectMetadata.put("Content-Length", String.valueOf(contentLength));
+        }
+
+        String contentType = exchange.getIn().getHeader(MinioConstants.CONTENT_TYPE, String.class);
+        if (contentType != null) {
+            objectMetadata.put("Content-Type", contentType);
+        }
+
+        String cacheControl = exchange.getIn().getHeader(MinioConstants.CACHE_CONTROL, String.class);
+        if (cacheControl != null) {
+            objectMetadata.put("Cache-Control", cacheControl);
+        }
+
+        String contentDisposition = exchange.getIn().getHeader(MinioConstants.CONTENT_DISPOSITION, String.class);
+        if (contentDisposition != null) {
+            objectMetadata.put("Content-Disposition", contentDisposition);
+        }
+
+        String contentEncoding = exchange.getIn().getHeader(MinioConstants.CONTENT_ENCODING, String.class);
+        if (contentEncoding != null) {
+            objectMetadata.put("Content-Encoding", contentEncoding);
+        }
+
+        String contentMD5 = exchange.getIn().getHeader(MinioConstants.CONTENT_MD5, String.class);
+        if (contentMD5 != null) {
+            objectMetadata.put("Content-Md5", contentMD5);
+        }
+
+        return objectMetadata;
+    }
+
+    /**
+     * Reads the bucket name from the header of the given exchange. If not
+     * provided, it's read from the endpoint configuration.
+     *
+     * @param exchange The exchange to read the header from.
+     * @return The bucket name.
+     * @throws IllegalArgumentException if the header could not be determined.
+     */
+    private String determineBucketName(final Exchange exchange) {
+        String bucketName = exchange.getIn().getHeader(MinioConstants.BUCKET_NAME, String.class);
+
+        if (ObjectHelper.isEmpty(bucketName)) {
+            bucketName = getConfiguration().getBucketName();
+            LOG.trace("Minio Bucket name header is missing, using default one [{}]", bucketName);
+        }
+
+        if (bucketName == null) {
+            throw new IllegalArgumentException("Minio Bucket name header is missing or not configured.");
+        }
+
+        return bucketName;
+    }
+
+    private String determineObjectName(final Exchange exchange) {
+        String objectName = exchange.getIn().getHeader(MinioConstants.OBJECT_NAME, String.class);
+        if (ObjectHelper.isEmpty(objectName)) {
+            objectName = getConfiguration().getKeyName();
+        }
+        if (objectName == null) {
+            throw new IllegalArgumentException("Minio Key header is missing.");
+        }
+        return objectName;
+    }
+
+    private String determineStorageClass(final Exchange exchange) {
+        String storageClass = exchange.getIn().getHeader(MinioConstants.STORAGE_CLASS, String.class);
+        if (storageClass == null) {
+            storageClass = getConfiguration().getStorageClass();
+        }
+
+        return storageClass;
+    }
+
+    private String determineVersionId(final Exchange exchange) {
+        String versionId = exchange.getIn().getHeader(MinioConstants.VERSION_ID, String.class);
+        if (versionId == null) {
+            versionId = getConfiguration().getVersionId();
+        }
+
+        return versionId;
+    }
+
+    private ByteArrayOutputStream determineLengthInputStream(InputStream inputStream) throws IOException {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        byte[] bytes = new byte[1024];

Review comment:
       Hi @omarsmak, I was actually mimicking [this](https://github.com/apache/camel/blob/d4cfbd3631a0b277b5228224e1855c7229062c21/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Producer.java#L573) line. I will put this as a constant in MinioConstants.java




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [camel] omarsmak commented on a change in pull request #3897: CAMEL-13934 camel-minio - Component to store/load files from blob store

GitBox
In reply to this post by GitBox

omarsmak commented on a change in pull request #3897:
URL: https://github.com/apache/camel/pull/3897#discussion_r464269216



##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java
##########
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio;
+
+import java.io.InputStream;
+import java.util.*;
+
+import io.minio.CopyObjectArgs;
+import io.minio.CopySource;
+import io.minio.GetObjectArgs;
+import io.minio.ListObjectsArgs;
+import io.minio.MinioClient;
+import io.minio.RemoveObjectArgs;
+import io.minio.errors.MinioException;
+import io.minio.messages.Contents;
+import io.minio.messages.ListBucketResultV2;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
+import org.apache.camel.Processor;
+import org.apache.camel.spi.Synchronization;
+import org.apache.camel.support.ScheduledBatchPollingConsumer;
+import org.apache.camel.util.CastUtils;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.URISupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Consumer of messages from the Minio Storage Service.
+ */
+public class MinioConsumer extends ScheduledBatchPollingConsumer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MinioConsumer.class);
+
+    private String continuationToken;
+    private transient String minioConsumerToString;
+
+    public MinioConsumer(MinioEndpoint endpoint, Processor processor) {
+        super(endpoint, processor);
+    }
+
+    @Override
+    protected int poll() throws Exception {
+        // must reset for each poll
+        shutdownRunningTask = null;
+        pendingExchanges = 0;
+
+        String bucketName = getConfiguration().getBucketName();
+        String objectName = getConfiguration().getObjectName();
+        MinioClient minioClient = getMinioClient();
+        Queue<Exchange> exchanges;
+
+        if (objectName != null) {
+            LOG.trace("Getting object in bucket {} with object name {}...", bucketName, objectName);
+
+            InputStream minioObject = getObject(bucketName, minioClient, objectName);
+            exchanges = createExchanges(minioObject, objectName);
+
+        } else {
+
+            LOG.trace("Queueing objects in bucket [{}]...", bucketName);
+
+            ListObjectsArgs.Builder listObjectRequest = ListObjectsArgs.builder()
+                    .bucket(bucketName)
+                    .includeUserMetadata(getConfiguration().isIncludeUserMetadata())
+                    .includeVersions(getConfiguration().isIncludeVersions())
+                    .recursive(getConfiguration().isRecursive())
+                    .useApiVersion1(getConfiguration().isUseVersion1());
+
+            if (getConfiguration().getDelimiter() != null) {
+                listObjectRequest.delimiter(getConfiguration().getDelimiter());
+            }
+
+            if (maxMessagesPerPoll > 0) {
+                listObjectRequest.maxKeys(maxMessagesPerPoll);
+            }
+
+            if (getConfiguration().getPrefix() != null) {
+                listObjectRequest.prefix(getConfiguration().getPrefix());
+            }
+
+            if (getConfiguration().getStartAfter() != null) {
+                listObjectRequest.startAfter(getConfiguration().getStartAfter());
+            }
+
+            // if there was a marker from previous poll then use that to
+            // continue from where we left last time
+            if (continuationToken != null) {
+                LOG.trace("Resuming from marker: {}", continuationToken);
+                listObjectRequest.continuationToken(continuationToken);
+            }
+
+            // TODO: Check for validity of the statement
+            ListBucketResultV2 listObjects = (ListBucketResultV2) getMinioClient().listObjects(listObjectRequest.build());
+
+            if (listObjects.isTruncated()) {
+                LOG.trace("Returned list is truncated, so setting next marker: {}", continuationToken);
+                continuationToken = listObjects.nextContinuationToken();
+
+            } else {
+                // no more data so clear marker
+                continuationToken = null;
+            }
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Found {} objects in bucket [{}]...", listObjects.contents().size(), bucketName);
+            }
+
+            exchanges = createExchanges(listObjects.contents());
+        }
+        return processBatch(CastUtils.cast(exchanges));
+    }
+
+    protected Queue<Exchange> createExchanges(InputStream objectStream, String objectName) throws Exception {
+        Queue<Exchange> answer = new LinkedList<>();
+        Exchange exchange = getEndpoint().createExchange(objectStream, objectName);
+        answer.add(exchange);
+        IOHelper.close(objectStream);
+        return answer;
+    }
+
+    protected Queue<Exchange> createExchanges(List<Contents> minioObjectSummaries) throws Exception {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Received {} messages in this poll", minioObjectSummaries.size());
+        }
+        String bucketName = getConfiguration().getBucketName();
+        Collection<InputStream> minioObjects = new ArrayList<>();
+        Queue<Exchange> answer = new LinkedList<>();
+        try {
+            if (getConfiguration().isIncludeFolders()) {
+                for (Contents minioObjectSummary : minioObjectSummaries) {
+                    InputStream minioObject = getObject(bucketName, getMinioClient(), minioObjectSummary.objectName());
+                    minioObjects.add(minioObject);
+                    Exchange exchange = getEndpoint().createExchange(minioObject, minioObjectSummary.objectName());
+                    answer.add(exchange);
+                }
+            } else {
+                for (Contents minioObjectSummary : minioObjectSummaries) {
+                    // ignore if directory
+                    if (!minioObjectSummary.isDir()) {
+                        InputStream minioObject = getObject(bucketName, getMinioClient(), minioObjectSummary.objectName());
+                        minioObjects.add(minioObject);
+                        Exchange exchange = getEndpoint().createExchange(minioObject, minioObjectSummary.objectName());
+                        answer.add(exchange);
+                    }
+                }
+            }
+
+        } catch (Throwable e) {
+            LOG.warn("Error getting MinioObject due: {}", e.getMessage());
+            throw e;
+
+        } finally {
+            // ensure all previous gathered minio objects are closed
+            // if there was an exception creating the exchanges in this batch
+            minioObjects.forEach(IOHelper::close);
+        }
+
+        return answer;
+    }
+
+    private InputStream getObject(String bucketName, MinioClient minioClient, String objectName) throws Exception {
+        GetObjectArgs.Builder getObjectRequest = GetObjectArgs.builder().bucket(bucketName).object(objectName);
+
+        if (getConfiguration().getServerSideEncryptionCustomerKey() != null) {
+            getObjectRequest.ssec(getConfiguration().getServerSideEncryptionCustomerKey());
+        }
+        if (getConfiguration().getOffset() != 0) {
+            getObjectRequest.offset(getConfiguration().getOffset());
+        }
+        if (getConfiguration().getLength() != 0) {
+            getObjectRequest.length(getConfiguration().getLength());
+        }
+        if (getConfiguration().getVersionId() != null) {
+            getObjectRequest.versionId(getConfiguration().getVersionId());
+        }
+        if (getConfiguration().getMatchETag() != null) {
+            getObjectRequest.matchETag(getConfiguration().getMatchETag());
+        }
+        if (getConfiguration().getNotMatchETag() != null) {
+            getObjectRequest.notMatchETag(getConfiguration().getNotMatchETag());
+        }
+        if (getConfiguration().getModifiedSince() != null) {
+            getObjectRequest.modifiedSince(getConfiguration().getModifiedSince());
+        }
+        if (getConfiguration().getUnModifiedSince() != null) {
+            getObjectRequest.unmodifiedSince(getConfiguration().getUnModifiedSince());
+        }
+
+        return minioClient.getObject(getObjectRequest.build());
+    }
+
+    @Override
+    public int processBatch(Queue<Object> exchanges) {
+        int total = exchanges.size();
+
+        for (int index = 0; index < total && isBatchAllowed(); index++) {
+            // only loop if we are started (allowed to run)
+            final Exchange exchange = ObjectHelper.cast(Exchange.class, exchanges.poll());
+            // add current index and total as properties
+            exchange.setProperty(Exchange.BATCH_INDEX, index);
+            exchange.setProperty(Exchange.BATCH_SIZE, total);
+            exchange.setProperty(Exchange.BATCH_COMPLETE, index == total - 1);
+
+            // update pending number of exchanges
+            pendingExchanges = total - index - 1;
+
+            // add on completion to handle after work when the exchange is done
+            exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() {
+                public void onComplete(Exchange exchange) {
+                    processCommit(exchange);
+                }
+
+                public void onFailure(Exchange exchange) {
+                    processRollback(exchange);
+                }
+
+                @Override
+                public String toString() {
+                    return "MinioConsumerOnCompletion";
+                }
+            });
+
+            LOG.trace("Processing exchange ...");
+            getAsyncProcessor().process(exchange, doneSync -> LOG.trace("Processing exchange done."));
+        }
+
+        return total;
+    }
+
+    /**
+     * Strategy to delete the message after being processed.
+     *
+     * @param exchange the exchange
+     */
+    protected void processCommit(Exchange exchange) {
+        try {
+            String srcBucketName = exchange.getIn().getHeader(MinioConstants.BUCKET_NAME, String.class);
+            String srcObjectName = exchange.getIn().getHeader(MinioConstants.OBJECT_NAME, String.class);
+
+            if (getConfiguration().isDeleteAfterRead() || getConfiguration().isMoveAfterRead()) {
+                if (getConfiguration().isMoveAfterRead()) {
+                    copyObject(srcBucketName, srcObjectName);
+                    LOG.trace("Copied object from bucket {} with objectName {} to bucket {}...",
+                            srcBucketName, srcObjectName, getConfiguration().getDestinationBucketName());
+                }
+
+                LOG.trace("Deleting object from bucket {} with objectName {}...", srcBucketName, srcObjectName);
+
+                RemoveObjectArgs.Builder removeObjectRequest = RemoveObjectArgs.builder()
+                        .bucket(srcBucketName)
+                        .object(srcObjectName)
+                        .bypassGovernanceMode(getConfiguration().isBypassGovernanceMode());
+
+                if (getConfiguration().getVersionId() != null) {
+                    removeObjectRequest.versionId(getConfiguration().getVersionId());
+                }
+                if (getConfiguration().isBypassGovernanceMode()) {
+                    removeObjectRequest.versionId(getConfiguration().getVersionId());
+                }
+                getMinioClient().removeObject(removeObjectRequest.build());
+
+                LOG.trace("Deleted object from bucket {} with objectName {}...", srcBucketName, srcObjectName);
+            }
+        } catch (MinioException e) {
+            getExceptionHandler().handleException("Error occurred during moving or deleting object. This exception is ignored.",
+                    exchange, e);
+        } catch (Exception e) {
+            LOG.trace("Error process commit...");

Review comment:
       What I mean here, does it make sense to handle it through  `getExceptionHandler().handleException`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [camel] omarsmak commented on a change in pull request #3897: CAMEL-13934 camel-minio - Component to store/load files from blob store

GitBox
In reply to this post by GitBox

omarsmak commented on a change in pull request #3897:
URL: https://github.com/apache/camel/pull/3897#discussion_r464270607



##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/client/GetMinioClient.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio.client;
+
+import io.minio.MinioClient;
+import org.apache.camel.component.minio.MinioConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Creates MinIO client object according to the
+ * given endpoint, port, access key, secret key, region and secure option.
+ */
+public class GetMinioClient implements MinioCamelInternalClient {
+    private static final Logger LOG = LoggerFactory.getLogger(GetMinioClient.class);
+    private final MinioConfiguration configuration;
+
+    /**
+     * Constructor that uses the config file.
+     */
+    public GetMinioClient(MinioConfiguration configuration) {
+        LOG.trace("Creating an Minio client.");
+        this.configuration = configuration;
+    }
+
+    /**
+     * Getting the minio client.
+     *
+     * @return Minio Client.
+     */
+    @Override
+    public MinioClient getMinioClient() {
+        if (configuration.getEndpoint() != null) {
+            MinioClient.Builder minioClientRequest = MinioClient.builder();
+
+            if (configuration.getProxyPort() != null) {
+                minioClientRequest.endpoint(configuration.getEndpoint(), configuration.getProxyPort(), configuration.isSecure());
+            } else {
+                minioClientRequest.endpoint(configuration.getEndpoint());
+            }
+            if (configuration.getAccessKey() != null && configuration.getSecretKey() != null) {

Review comment:
       What I mean here, are the `accessKey`, `secretKey` and `region` are required in order to initiate the client? What happens if one of these is null or not set?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [camel] omarsmak commented on a change in pull request #3897: CAMEL-13934 camel-minio - Component to store/load files from blob store

GitBox
In reply to this post by GitBox

omarsmak commented on a change in pull request #3897:
URL: https://github.com/apache/camel/pull/3897#discussion_r464272194



##########
File path: components/camel-minio/src/test/java/org/apache/camel/component/minio/integration/MinioConsumerIntegrationTest.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio.integration;
+
+import io.minio.MinioClient;
+import org.apache.camel.BindToRegistry;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.minio.MinioConstants;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+@Disabled("Must be manually tested. Provide your own accessKey and secretKey!")
+public class MinioConsumerIntegrationTest extends CamelTestSupport {
+
+    @BindToRegistry("minioClient")
+    MinioClient client = MinioClient.builder()
+            .endpoint("https://play.min.io")
+            .credentials("Q3AM3UQ867SPQQA43P2F", "zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG")

Review comment:
       An example [here](https://github.com/apache/camel/blob/master/components/camel-azure-storage-blob/src/test/java/org/apache/camel/component/azure/storage/blob/BlobTestUtils.java)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [camel] omarsmak commented on a change in pull request #3897: CAMEL-13934 camel-minio - Component to store/load files from blob store

GitBox
In reply to this post by GitBox

omarsmak commented on a change in pull request #3897:
URL: https://github.com/apache/camel/pull/3897#discussion_r464275690



##########
File path: components/camel-minio/src/test/java/org/apache/camel/component/minio/integration/MinioCopyObjectCustomerKeyOperationIntegrationTest.java
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio.integration;
+
+import java.io.*;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.security.SecureRandom;
+import java.util.UUID;
+
+import javax.crypto.KeyGenerator;
+
+import io.minio.CopyObjectArgs;
+import io.minio.CopySource;
+import io.minio.GetObjectArgs;
+import io.minio.MinioClient;
+import io.minio.PutObjectArgs;
+import io.minio.ServerSideEncryption;
+import io.minio.ServerSideEncryptionCustomerKey;
+import org.apache.camel.BindToRegistry;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.minio.MinioConstants;
+import org.apache.camel.component.minio.MinioOperations;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.fail;
+
+@Disabled("Must be manually tested. Provide your own accessKey and secretKey!")

Review comment:
       @Nayananga a tip: I think it will be easier for these Integration Tests classes to run through maven command line instead of having to remove the `@Disabled`. An example, take a look at this [component IT tests](https://github.com/apache/camel/tree/master/components/camel-azure-storage-blob/src/test/java/org/apache/camel/component/azure/storage/blob). You will need to add a maven profile that will run IT tests that end with IT [here](https://github.com/apache/camel/blob/master/components/camel-azure-storage-blob/pom.xml#L87). And then, you can add a [Util class](https://github.com/apache/camel/blob/master/components/camel-azure-storage-blob/src/test/java/org/apache/camel/component/azure/storage/blob/BlobTestUtils.java#L40) to load the credentials from the maven command line. Therefore after you have done all of the mentioned steps, you will just run your IT like this:
   `mvn clean test -PfullTests`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


123