[GitHub] [camel] Nayananga opened a new pull request #3897: CAMEL-13934 camel-minio - Component to store/load files from blob store

classic Classic list List threaded Threaded
53 messages Options
123
Reply | Threaded
Open this post in threaded view
|

[GitHub] [camel] omarsmak commented on a change in pull request #3897: CAMEL-13934 camel-minio - Component to store/load files from blob store

GitBox

omarsmak commented on a change in pull request #3897:
URL: https://github.com/apache/camel/pull/3897#discussion_r464280762



##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioComponent.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio;
+
+import java.util.Map;
+import java.util.Set;
+
+import io.minio.MinioClient;
+import org.apache.camel.CamelContext;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.annotations.Component;
+import org.apache.camel.support.DefaultComponent;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Represents the component that manages {@link MinioEndpoint}.
+ */
+@Component("minio")
+public class MinioComponent extends DefaultComponent {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MinioComponent.class);
+
+    @Metadata
+    private MinioConfiguration configuration = new MinioConfiguration();
+
+    public MinioComponent() {
+        this(null);
+    }
+
+    public MinioComponent(CamelContext context) {
+        super(context);
+        registerExtension(new MinioComponentVerifierExtension());
+
+    }
+
+    @Override
+    protected MinioEndpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+        if (remaining == null || remaining.trim().length() == 0) {
+            throw new IllegalArgumentException("Bucket name must be specified.");
+        }
+
+        final MinioConfiguration configuration = this.configuration != null ? this.configuration.copy() : new MinioConfiguration();
+        configuration.setBucketName(remaining);
+        MinioEndpoint endpoint = new MinioEndpoint(uri, this, configuration);
+        setProperties(endpoint, parameters);
+        checkAndSetRegistryClient(configuration, endpoint);
+
+        return endpoint;
+    }
+
+    public MinioConfiguration getConfiguration() {
+        return configuration;
+    }
+
+    /**
+     * The component configuration
+     */
+    public void setConfiguration(MinioConfiguration configuration) {
+        this.configuration = configuration;
+    }
+
+    private void checkAndSetRegistryClient(MinioConfiguration configuration, MinioEndpoint endpoint) {
+        if (ObjectHelper.isEmpty(endpoint.getConfiguration().getMinioClient())) {
+            LOG.debug("Looking for an MinioClient instance in the registry");
+            Set<MinioClient> clients = getCamelContext().getRegistry().findByType(MinioClient.class);
+            if (clients.size() == 1) {

Review comment:
       @dmvolod yes, I agree, this error message could be misleading, I think error message could be changed if there are two cases, one for more than one `MinioClient` instance and other one for in case there is no instance of `MinioClient`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [camel] Nayananga commented on a change in pull request #3897: CAMEL-13934 camel-minio - Component to store/load files from blob store

GitBox
In reply to this post by GitBox

Nayananga commented on a change in pull request #3897:
URL: https://github.com/apache/camel/pull/3897#discussion_r464614021



##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java
##########
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio;
+
+import java.io.InputStream;
+import java.util.*;
+
+import io.minio.CopyObjectArgs;
+import io.minio.CopySource;
+import io.minio.GetObjectArgs;
+import io.minio.ListObjectsArgs;
+import io.minio.MinioClient;
+import io.minio.RemoveObjectArgs;
+import io.minio.errors.MinioException;
+import io.minio.messages.Contents;
+import io.minio.messages.ListBucketResultV2;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
+import org.apache.camel.Processor;
+import org.apache.camel.spi.Synchronization;
+import org.apache.camel.support.ScheduledBatchPollingConsumer;
+import org.apache.camel.util.CastUtils;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.URISupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Consumer of messages from the Minio Storage Service.
+ */
+public class MinioConsumer extends ScheduledBatchPollingConsumer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MinioConsumer.class);
+
+    private String continuationToken;
+    private transient String minioConsumerToString;
+
+    public MinioConsumer(MinioEndpoint endpoint, Processor processor) {
+        super(endpoint, processor);
+    }
+
+    @Override
+    protected int poll() throws Exception {
+        // must reset for each poll
+        shutdownRunningTask = null;
+        pendingExchanges = 0;
+
+        String bucketName = getConfiguration().getBucketName();
+        String objectName = getConfiguration().getObjectName();
+        MinioClient minioClient = getMinioClient();
+        Queue<Exchange> exchanges;
+
+        if (objectName != null) {
+            LOG.trace("Getting object in bucket {} with object name {}...", bucketName, objectName);
+
+            InputStream minioObject = getObject(bucketName, minioClient, objectName);
+            exchanges = createExchanges(minioObject, objectName);
+
+        } else {
+
+            LOG.trace("Queueing objects in bucket [{}]...", bucketName);
+
+            ListObjectsArgs.Builder listObjectRequest = ListObjectsArgs.builder()
+                    .bucket(bucketName)
+                    .includeUserMetadata(getConfiguration().isIncludeUserMetadata())
+                    .includeVersions(getConfiguration().isIncludeVersions())
+                    .recursive(getConfiguration().isRecursive())
+                    .useApiVersion1(getConfiguration().isUseVersion1());
+
+            if (getConfiguration().getDelimiter() != null) {
+                listObjectRequest.delimiter(getConfiguration().getDelimiter());
+            }
+
+            if (maxMessagesPerPoll > 0) {
+                listObjectRequest.maxKeys(maxMessagesPerPoll);
+            }
+
+            if (getConfiguration().getPrefix() != null) {
+                listObjectRequest.prefix(getConfiguration().getPrefix());
+            }
+
+            if (getConfiguration().getStartAfter() != null) {
+                listObjectRequest.startAfter(getConfiguration().getStartAfter());
+            }
+
+            // if there was a marker from previous poll then use that to
+            // continue from where we left last time
+            if (continuationToken != null) {
+                LOG.trace("Resuming from marker: {}", continuationToken);
+                listObjectRequest.continuationToken(continuationToken);
+            }
+
+            // TODO: Check for validity of the statement
+            ListBucketResultV2 listObjects = (ListBucketResultV2) getMinioClient().listObjects(listObjectRequest.build());
+
+            if (listObjects.isTruncated()) {
+                LOG.trace("Returned list is truncated, so setting next marker: {}", continuationToken);
+                continuationToken = listObjects.nextContinuationToken();
+
+            } else {
+                // no more data so clear marker
+                continuationToken = null;
+            }
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Found {} objects in bucket [{}]...", listObjects.contents().size(), bucketName);
+            }
+
+            exchanges = createExchanges(listObjects.contents());
+        }
+        return processBatch(CastUtils.cast(exchanges));
+    }
+
+    protected Queue<Exchange> createExchanges(InputStream objectStream, String objectName) throws Exception {
+        Queue<Exchange> answer = new LinkedList<>();
+        Exchange exchange = getEndpoint().createExchange(objectStream, objectName);
+        answer.add(exchange);
+        IOHelper.close(objectStream);
+        return answer;
+    }
+
+    protected Queue<Exchange> createExchanges(List<Contents> minioObjectSummaries) throws Exception {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Received {} messages in this poll", minioObjectSummaries.size());
+        }
+        String bucketName = getConfiguration().getBucketName();
+        Collection<InputStream> minioObjects = new ArrayList<>();
+        Queue<Exchange> answer = new LinkedList<>();
+        try {
+            if (getConfiguration().isIncludeFolders()) {
+                for (Contents minioObjectSummary : minioObjectSummaries) {
+                    InputStream minioObject = getObject(bucketName, getMinioClient(), minioObjectSummary.objectName());
+                    minioObjects.add(minioObject);
+                    Exchange exchange = getEndpoint().createExchange(minioObject, minioObjectSummary.objectName());
+                    answer.add(exchange);
+                }
+            } else {
+                for (Contents minioObjectSummary : minioObjectSummaries) {
+                    // ignore if directory
+                    if (!minioObjectSummary.isDir()) {
+                        InputStream minioObject = getObject(bucketName, getMinioClient(), minioObjectSummary.objectName());
+                        minioObjects.add(minioObject);
+                        Exchange exchange = getEndpoint().createExchange(minioObject, minioObjectSummary.objectName());
+                        answer.add(exchange);
+                    }
+                }
+            }
+
+        } catch (Throwable e) {
+            LOG.warn("Error getting MinioObject due: {}", e.getMessage());
+            throw e;
+
+        } finally {
+            // ensure all previous gathered minio objects are closed
+            // if there was an exception creating the exchanges in this batch
+            minioObjects.forEach(IOHelper::close);
+        }
+
+        return answer;
+    }
+
+    private InputStream getObject(String bucketName, MinioClient minioClient, String objectName) throws Exception {
+        GetObjectArgs.Builder getObjectRequest = GetObjectArgs.builder().bucket(bucketName).object(objectName);
+
+        if (getConfiguration().getServerSideEncryptionCustomerKey() != null) {
+            getObjectRequest.ssec(getConfiguration().getServerSideEncryptionCustomerKey());
+        }
+        if (getConfiguration().getOffset() != 0) {
+            getObjectRequest.offset(getConfiguration().getOffset());
+        }
+        if (getConfiguration().getLength() != 0) {
+            getObjectRequest.length(getConfiguration().getLength());
+        }
+        if (getConfiguration().getVersionId() != null) {
+            getObjectRequest.versionId(getConfiguration().getVersionId());
+        }
+        if (getConfiguration().getMatchETag() != null) {
+            getObjectRequest.matchETag(getConfiguration().getMatchETag());
+        }
+        if (getConfiguration().getNotMatchETag() != null) {
+            getObjectRequest.notMatchETag(getConfiguration().getNotMatchETag());
+        }
+        if (getConfiguration().getModifiedSince() != null) {
+            getObjectRequest.modifiedSince(getConfiguration().getModifiedSince());
+        }
+        if (getConfiguration().getUnModifiedSince() != null) {
+            getObjectRequest.unmodifiedSince(getConfiguration().getUnModifiedSince());
+        }
+
+        return minioClient.getObject(getObjectRequest.build());
+    }
+
+    @Override
+    public int processBatch(Queue<Object> exchanges) {
+        int total = exchanges.size();
+
+        for (int index = 0; index < total && isBatchAllowed(); index++) {
+            // only loop if we are started (allowed to run)
+            final Exchange exchange = ObjectHelper.cast(Exchange.class, exchanges.poll());
+            // add current index and total as properties
+            exchange.setProperty(Exchange.BATCH_INDEX, index);
+            exchange.setProperty(Exchange.BATCH_SIZE, total);
+            exchange.setProperty(Exchange.BATCH_COMPLETE, index == total - 1);
+
+            // update pending number of exchanges
+            pendingExchanges = total - index - 1;
+
+            // add on completion to handle after work when the exchange is done
+            exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() {
+                public void onComplete(Exchange exchange) {
+                    processCommit(exchange);
+                }
+
+                public void onFailure(Exchange exchange) {
+                    processRollback(exchange);
+                }
+
+                @Override
+                public String toString() {
+                    return "MinioConsumerOnCompletion";
+                }
+            });
+
+            LOG.trace("Processing exchange ...");
+            getAsyncProcessor().process(exchange, doneSync -> LOG.trace("Processing exchange done."));
+        }
+
+        return total;
+    }
+
+    /**
+     * Strategy to delete the message after being processed.
+     *
+     * @param exchange the exchange
+     */
+    protected void processCommit(Exchange exchange) {
+        try {
+            String srcBucketName = exchange.getIn().getHeader(MinioConstants.BUCKET_NAME, String.class);
+            String srcObjectName = exchange.getIn().getHeader(MinioConstants.OBJECT_NAME, String.class);
+
+            if (getConfiguration().isDeleteAfterRead() || getConfiguration().isMoveAfterRead()) {
+                if (getConfiguration().isMoveAfterRead()) {
+                    copyObject(srcBucketName, srcObjectName);
+                    LOG.trace("Copied object from bucket {} with objectName {} to bucket {}...",
+                            srcBucketName, srcObjectName, getConfiguration().getDestinationBucketName());
+                }
+
+                LOG.trace("Deleting object from bucket {} with objectName {}...", srcBucketName, srcObjectName);
+
+                RemoveObjectArgs.Builder removeObjectRequest = RemoveObjectArgs.builder()
+                        .bucket(srcBucketName)
+                        .object(srcObjectName)
+                        .bypassGovernanceMode(getConfiguration().isBypassGovernanceMode());
+
+                if (getConfiguration().getVersionId() != null) {
+                    removeObjectRequest.versionId(getConfiguration().getVersionId());
+                }
+                if (getConfiguration().isBypassGovernanceMode()) {
+                    removeObjectRequest.versionId(getConfiguration().getVersionId());
+                }
+                getMinioClient().removeObject(removeObjectRequest.build());
+
+                LOG.trace("Deleted object from bucket {} with objectName {}...", srcBucketName, srcObjectName);
+            }
+        } catch (MinioException e) {
+            getExceptionHandler().handleException("Error occurred during moving or deleting object. This exception is ignored.",
+                    exchange, e);
+        } catch (Exception e) {
+            LOG.trace("Error process commit...");

Review comment:
       Hi, @omarsmak I was mimicking [this](https://github.com/apache/camel/blob/d4cfbd3631a0b277b5228224e1855c7229062c21/components/camel-aws2-s3/src/main/java/org/apache/camel/component/aws2/s3/AWS2S3Consumer.java#L288) line. should I keep this or change it to maybe ```LOG.warn```?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [camel] Nayananga commented on a change in pull request #3897: CAMEL-13934 camel-minio - Component to store/load files from blob store

GitBox
In reply to this post by GitBox

Nayananga commented on a change in pull request #3897:
URL: https://github.com/apache/camel/pull/3897#discussion_r464620523



##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java
##########
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio;
+
+import java.io.InputStream;
+import java.util.*;
+
+import io.minio.CopyObjectArgs;
+import io.minio.CopySource;
+import io.minio.GetObjectArgs;
+import io.minio.ListObjectsArgs;
+import io.minio.MinioClient;
+import io.minio.RemoveObjectArgs;
+import io.minio.errors.MinioException;
+import io.minio.messages.Contents;
+import io.minio.messages.ListBucketResultV2;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
+import org.apache.camel.Processor;
+import org.apache.camel.spi.Synchronization;
+import org.apache.camel.support.ScheduledBatchPollingConsumer;
+import org.apache.camel.util.CastUtils;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.URISupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Consumer of messages from the Minio Storage Service.
+ */
+public class MinioConsumer extends ScheduledBatchPollingConsumer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MinioConsumer.class);
+
+    private String continuationToken;
+    private transient String minioConsumerToString;
+
+    public MinioConsumer(MinioEndpoint endpoint, Processor processor) {
+        super(endpoint, processor);
+    }
+
+    @Override
+    protected int poll() throws Exception {
+        // must reset for each poll
+        shutdownRunningTask = null;
+        pendingExchanges = 0;
+
+        String bucketName = getConfiguration().getBucketName();
+        String objectName = getConfiguration().getObjectName();
+        MinioClient minioClient = getMinioClient();
+        Queue<Exchange> exchanges;
+
+        if (objectName != null) {
+            LOG.trace("Getting object in bucket {} with object name {}...", bucketName, objectName);
+
+            InputStream minioObject = getObject(bucketName, minioClient, objectName);
+            exchanges = createExchanges(minioObject, objectName);
+
+        } else {
+
+            LOG.trace("Queueing objects in bucket [{}]...", bucketName);
+
+            ListObjectsArgs.Builder listObjectRequest = ListObjectsArgs.builder()
+                    .bucket(bucketName)
+                    .includeUserMetadata(getConfiguration().isIncludeUserMetadata())
+                    .includeVersions(getConfiguration().isIncludeVersions())
+                    .recursive(getConfiguration().isRecursive())
+                    .useApiVersion1(getConfiguration().isUseVersion1());
+
+            if (getConfiguration().getDelimiter() != null) {
+                listObjectRequest.delimiter(getConfiguration().getDelimiter());
+            }
+
+            if (maxMessagesPerPoll > 0) {
+                listObjectRequest.maxKeys(maxMessagesPerPoll);
+            }
+
+            if (getConfiguration().getPrefix() != null) {
+                listObjectRequest.prefix(getConfiguration().getPrefix());
+            }
+
+            if (getConfiguration().getStartAfter() != null) {
+                listObjectRequest.startAfter(getConfiguration().getStartAfter());
+            }
+
+            // if there was a marker from previous poll then use that to
+            // continue from where we left last time
+            if (continuationToken != null) {
+                LOG.trace("Resuming from marker: {}", continuationToken);
+                listObjectRequest.continuationToken(continuationToken);
+            }
+
+            // TODO: Check for validity of the statement
+            ListBucketResultV2 listObjects = (ListBucketResultV2) getMinioClient().listObjects(listObjectRequest.build());
+
+            if (listObjects.isTruncated()) {
+                LOG.trace("Returned list is truncated, so setting next marker: {}", continuationToken);
+                continuationToken = listObjects.nextContinuationToken();
+
+            } else {
+                // no more data so clear marker
+                continuationToken = null;
+            }
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Found {} objects in bucket [{}]...", listObjects.contents().size(), bucketName);
+            }
+
+            exchanges = createExchanges(listObjects.contents());
+        }
+        return processBatch(CastUtils.cast(exchanges));
+    }
+
+    protected Queue<Exchange> createExchanges(InputStream objectStream, String objectName) throws Exception {
+        Queue<Exchange> answer = new LinkedList<>();
+        Exchange exchange = getEndpoint().createExchange(objectStream, objectName);
+        answer.add(exchange);
+        IOHelper.close(objectStream);
+        return answer;
+    }
+
+    protected Queue<Exchange> createExchanges(List<Contents> minioObjectSummaries) throws Exception {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Received {} messages in this poll", minioObjectSummaries.size());
+        }
+        String bucketName = getConfiguration().getBucketName();
+        Collection<InputStream> minioObjects = new ArrayList<>();
+        Queue<Exchange> answer = new LinkedList<>();
+        try {
+            if (getConfiguration().isIncludeFolders()) {
+                for (Contents minioObjectSummary : minioObjectSummaries) {
+                    InputStream minioObject = getObject(bucketName, getMinioClient(), minioObjectSummary.objectName());
+                    minioObjects.add(minioObject);
+                    Exchange exchange = getEndpoint().createExchange(minioObject, minioObjectSummary.objectName());
+                    answer.add(exchange);
+                }
+            } else {
+                for (Contents minioObjectSummary : minioObjectSummaries) {
+                    // ignore if directory
+                    if (!minioObjectSummary.isDir()) {
+                        InputStream minioObject = getObject(bucketName, getMinioClient(), minioObjectSummary.objectName());
+                        minioObjects.add(minioObject);
+                        Exchange exchange = getEndpoint().createExchange(minioObject, minioObjectSummary.objectName());
+                        answer.add(exchange);
+                    }
+                }
+            }
+
+        } catch (Throwable e) {
+            LOG.warn("Error getting MinioObject due: {}", e.getMessage());
+            throw e;
+
+        } finally {
+            // ensure all previous gathered minio objects are closed
+            // if there was an exception creating the exchanges in this batch
+            minioObjects.forEach(IOHelper::close);
+        }
+
+        return answer;
+    }
+
+    private InputStream getObject(String bucketName, MinioClient minioClient, String objectName) throws Exception {
+        GetObjectArgs.Builder getObjectRequest = GetObjectArgs.builder().bucket(bucketName).object(objectName);
+
+        if (getConfiguration().getServerSideEncryptionCustomerKey() != null) {
+            getObjectRequest.ssec(getConfiguration().getServerSideEncryptionCustomerKey());
+        }
+        if (getConfiguration().getOffset() != 0) {
+            getObjectRequest.offset(getConfiguration().getOffset());
+        }
+        if (getConfiguration().getLength() != 0) {
+            getObjectRequest.length(getConfiguration().getLength());
+        }
+        if (getConfiguration().getVersionId() != null) {
+            getObjectRequest.versionId(getConfiguration().getVersionId());
+        }
+        if (getConfiguration().getMatchETag() != null) {
+            getObjectRequest.matchETag(getConfiguration().getMatchETag());
+        }
+        if (getConfiguration().getNotMatchETag() != null) {
+            getObjectRequest.notMatchETag(getConfiguration().getNotMatchETag());
+        }
+        if (getConfiguration().getModifiedSince() != null) {
+            getObjectRequest.modifiedSince(getConfiguration().getModifiedSince());
+        }
+        if (getConfiguration().getUnModifiedSince() != null) {
+            getObjectRequest.unmodifiedSince(getConfiguration().getUnModifiedSince());
+        }
+
+        return minioClient.getObject(getObjectRequest.build());
+    }
+
+    @Override
+    public int processBatch(Queue<Object> exchanges) {
+        int total = exchanges.size();
+
+        for (int index = 0; index < total && isBatchAllowed(); index++) {
+            // only loop if we are started (allowed to run)
+            final Exchange exchange = ObjectHelper.cast(Exchange.class, exchanges.poll());
+            // add current index and total as properties
+            exchange.setProperty(Exchange.BATCH_INDEX, index);
+            exchange.setProperty(Exchange.BATCH_SIZE, total);
+            exchange.setProperty(Exchange.BATCH_COMPLETE, index == total - 1);
+
+            // update pending number of exchanges
+            pendingExchanges = total - index - 1;
+
+            // add on completion to handle after work when the exchange is done
+            exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() {
+                public void onComplete(Exchange exchange) {
+                    processCommit(exchange);
+                }
+
+                public void onFailure(Exchange exchange) {
+                    processRollback(exchange);
+                }
+
+                @Override
+                public String toString() {
+                    return "MinioConsumerOnCompletion";
+                }
+            });
+
+            LOG.trace("Processing exchange ...");
+            getAsyncProcessor().process(exchange, doneSync -> LOG.trace("Processing exchange done."));
+        }
+
+        return total;
+    }
+
+    /**
+     * Strategy to delete the message after being processed.
+     *
+     * @param exchange the exchange
+     */
+    protected void processCommit(Exchange exchange) {
+        try {
+            String srcBucketName = exchange.getIn().getHeader(MinioConstants.BUCKET_NAME, String.class);
+            String srcObjectName = exchange.getIn().getHeader(MinioConstants.OBJECT_NAME, String.class);
+
+            if (getConfiguration().isDeleteAfterRead() || getConfiguration().isMoveAfterRead()) {
+                if (getConfiguration().isMoveAfterRead()) {
+                    copyObject(srcBucketName, srcObjectName);
+                    LOG.trace("Copied object from bucket {} with objectName {} to bucket {}...",
+                            srcBucketName, srcObjectName, getConfiguration().getDestinationBucketName());
+                }
+
+                LOG.trace("Deleting object from bucket {} with objectName {}...", srcBucketName, srcObjectName);
+
+                RemoveObjectArgs.Builder removeObjectRequest = RemoveObjectArgs.builder()
+                        .bucket(srcBucketName)
+                        .object(srcObjectName)
+                        .bypassGovernanceMode(getConfiguration().isBypassGovernanceMode());
+
+                if (getConfiguration().getVersionId() != null) {
+                    removeObjectRequest.versionId(getConfiguration().getVersionId());
+                }
+                if (getConfiguration().isBypassGovernanceMode()) {
+                    removeObjectRequest.versionId(getConfiguration().getVersionId());
+                }
+                getMinioClient().removeObject(removeObjectRequest.build());
+
+                LOG.trace("Deleted object from bucket {} with objectName {}...", srcBucketName, srcObjectName);
+            }
+        } catch (MinioException e) {
+            getExceptionHandler().handleException("Error occurred during moving or deleting object. This exception is ignored.",
+                    exchange, e);
+        } catch (Exception e) {
+            LOG.trace("Error process commit...");
+        }
+    }
+
+    private void copyObject(String srcBucketName, String srcObjectName) {
+        String destinationBucketName = getConfiguration().getDestinationBucketName();
+        if (destinationBucketName == null) {
+            throw new IllegalArgumentException("Destination Bucket name must be specified to copy operation");
+        }
+
+        try {
+            // set destination object name as source object name, if not specified
+            String destinationObjectName = (getConfiguration().getDestinationObjectName() != null)
+                    ? getConfiguration().getDestinationObjectName()
+                    : srcObjectName;
+
+
+            LOG.trace("Copying object from bucket {} with objectName {} to bucket {}...",
+                    srcBucketName, srcObjectName, destinationBucketName);
+
+            CopySource.Builder copySourceBuilder = CopySource.builder().bucket(srcBucketName).object(srcObjectName);
+            if (getConfiguration().getServerSideEncryptionCustomerKey() != null) {
+                copySourceBuilder.ssec(getConfiguration().getServerSideEncryptionCustomerKey());
+            }
+            if (getConfiguration().getOffset() != 0) {
+                copySourceBuilder.offset(getConfiguration().getOffset());
+            }
+            if (getConfiguration().getLength() != 0) {
+                copySourceBuilder.length(getConfiguration().getLength());
+            }
+            if (getConfiguration().getVersionId() != null) {
+                copySourceBuilder.versionId(getConfiguration().getVersionId());
+            }
+            if (getConfiguration().getMatchETag() != null) {
+                copySourceBuilder.matchETag(getConfiguration().getMatchETag());
+            }
+            if (getConfiguration().getNotMatchETag() != null) {
+                copySourceBuilder.notMatchETag(getConfiguration().getNotMatchETag());
+            }
+            if (getConfiguration().getModifiedSince() != null) {
+                copySourceBuilder.modifiedSince(getConfiguration().getModifiedSince());
+            }
+            if (getConfiguration().getUnModifiedSince() != null) {
+                copySourceBuilder.unmodifiedSince(getConfiguration().getUnModifiedSince());
+            }
+
+            CopyObjectArgs.Builder copyObjectRequest = CopyObjectArgs.builder()
+                    .source(copySourceBuilder.build())
+                    .bucket(getConfiguration().getDestinationBucketName())
+                    .object(destinationObjectName);
+
+            if (getConfiguration().getServerSideEncryption() != null) {
+                copyObjectRequest.sse(getConfiguration().getServerSideEncryption());
+            }
+
+            getMinioClient().copyObject(copyObjectRequest.build());
+
+        } catch (Exception e) {
+            LOG.warn("Error copy object from bucket {} with objectName {} to bucket {}...",

Review comment:
       I'll remove this error handle since this is redundant




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [camel] Nayananga commented on a change in pull request #3897: CAMEL-13934 camel-minio - Component to store/load files from blob store

GitBox
In reply to this post by GitBox

Nayananga commented on a change in pull request #3897:
URL: https://github.com/apache/camel/pull/3897#discussion_r464623864



##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/client/GetMinioClient.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio.client;
+
+import io.minio.MinioClient;
+import org.apache.camel.component.minio.MinioConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Creates MinIO client object according to the
+ * given endpoint, port, access key, secret key, region and secure option.
+ */
+public class GetMinioClient implements MinioCamelInternalClient {
+    private static final Logger LOG = LoggerFactory.getLogger(GetMinioClient.class);
+    private final MinioConfiguration configuration;
+
+    /**
+     * Constructor that uses the config file.
+     */
+    public GetMinioClient(MinioConfiguration configuration) {
+        LOG.trace("Creating an Minio client.");
+        this.configuration = configuration;
+    }
+
+    /**
+     * Getting the minio client.
+     *
+     * @return Minio Client.
+     */
+    @Override
+    public MinioClient getMinioClient() {
+        if (configuration.getEndpoint() != null) {
+            MinioClient.Builder minioClientRequest = MinioClient.builder();
+
+            if (configuration.getProxyPort() != null) {
+                minioClientRequest.endpoint(configuration.getEndpoint(), configuration.getProxyPort(), configuration.isSecure());
+            } else {
+                minioClientRequest.endpoint(configuration.getEndpoint());
+            }
+            if (configuration.getAccessKey() != null && configuration.getSecretKey() != null) {

Review comment:
       @omarsmak, As per [Java Client API Reference](https://docs.min.io/docs/java-client-api-reference.html) you can create MinioClient with or without ```accessKey```, ```secretKey``` (namely **anonymous access**) or ```region```.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [camel] omarsmak commented on a change in pull request #3897: CAMEL-13934 camel-minio - Component to store/load files from blob store

GitBox
In reply to this post by GitBox

omarsmak commented on a change in pull request #3897:
URL: https://github.com/apache/camel/pull/3897#discussion_r464886215



##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java
##########
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio;
+
+import java.io.InputStream;
+import java.util.*;
+
+import io.minio.CopyObjectArgs;
+import io.minio.CopySource;
+import io.minio.GetObjectArgs;
+import io.minio.ListObjectsArgs;
+import io.minio.MinioClient;
+import io.minio.RemoveObjectArgs;
+import io.minio.errors.MinioException;
+import io.minio.messages.Contents;
+import io.minio.messages.ListBucketResultV2;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
+import org.apache.camel.Processor;
+import org.apache.camel.spi.Synchronization;
+import org.apache.camel.support.ScheduledBatchPollingConsumer;
+import org.apache.camel.util.CastUtils;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.URISupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Consumer of messages from the Minio Storage Service.
+ */
+public class MinioConsumer extends ScheduledBatchPollingConsumer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MinioConsumer.class);
+
+    private String continuationToken;
+    private transient String minioConsumerToString;
+
+    public MinioConsumer(MinioEndpoint endpoint, Processor processor) {
+        super(endpoint, processor);
+    }
+
+    @Override
+    protected int poll() throws Exception {
+        // must reset for each poll
+        shutdownRunningTask = null;
+        pendingExchanges = 0;
+
+        String bucketName = getConfiguration().getBucketName();
+        String objectName = getConfiguration().getObjectName();
+        MinioClient minioClient = getMinioClient();
+        Queue<Exchange> exchanges;
+
+        if (objectName != null) {
+            LOG.trace("Getting object in bucket {} with object name {}...", bucketName, objectName);
+
+            InputStream minioObject = getObject(bucketName, minioClient, objectName);
+            exchanges = createExchanges(minioObject, objectName);
+
+        } else {
+
+            LOG.trace("Queueing objects in bucket [{}]...", bucketName);
+
+            ListObjectsArgs.Builder listObjectRequest = ListObjectsArgs.builder()
+                    .bucket(bucketName)
+                    .includeUserMetadata(getConfiguration().isIncludeUserMetadata())
+                    .includeVersions(getConfiguration().isIncludeVersions())
+                    .recursive(getConfiguration().isRecursive())
+                    .useApiVersion1(getConfiguration().isUseVersion1());
+
+            if (getConfiguration().getDelimiter() != null) {
+                listObjectRequest.delimiter(getConfiguration().getDelimiter());
+            }
+
+            if (maxMessagesPerPoll > 0) {
+                listObjectRequest.maxKeys(maxMessagesPerPoll);
+            }
+
+            if (getConfiguration().getPrefix() != null) {
+                listObjectRequest.prefix(getConfiguration().getPrefix());
+            }
+
+            if (getConfiguration().getStartAfter() != null) {
+                listObjectRequest.startAfter(getConfiguration().getStartAfter());
+            }
+
+            // if there was a marker from previous poll then use that to
+            // continue from where we left last time
+            if (continuationToken != null) {
+                LOG.trace("Resuming from marker: {}", continuationToken);
+                listObjectRequest.continuationToken(continuationToken);
+            }
+
+            // TODO: Check for validity of the statement
+            ListBucketResultV2 listObjects = (ListBucketResultV2) getMinioClient().listObjects(listObjectRequest.build());
+
+            if (listObjects.isTruncated()) {
+                LOG.trace("Returned list is truncated, so setting next marker: {}", continuationToken);
+                continuationToken = listObjects.nextContinuationToken();
+
+            } else {
+                // no more data so clear marker
+                continuationToken = null;
+            }
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Found {} objects in bucket [{}]...", listObjects.contents().size(), bucketName);
+            }
+
+            exchanges = createExchanges(listObjects.contents());
+        }
+        return processBatch(CastUtils.cast(exchanges));
+    }
+
+    protected Queue<Exchange> createExchanges(InputStream objectStream, String objectName) throws Exception {
+        Queue<Exchange> answer = new LinkedList<>();
+        Exchange exchange = getEndpoint().createExchange(objectStream, objectName);
+        answer.add(exchange);
+        IOHelper.close(objectStream);
+        return answer;
+    }
+
+    protected Queue<Exchange> createExchanges(List<Contents> minioObjectSummaries) throws Exception {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Received {} messages in this poll", minioObjectSummaries.size());
+        }
+        String bucketName = getConfiguration().getBucketName();
+        Collection<InputStream> minioObjects = new ArrayList<>();
+        Queue<Exchange> answer = new LinkedList<>();
+        try {
+            if (getConfiguration().isIncludeFolders()) {
+                for (Contents minioObjectSummary : minioObjectSummaries) {
+                    InputStream minioObject = getObject(bucketName, getMinioClient(), minioObjectSummary.objectName());
+                    minioObjects.add(minioObject);
+                    Exchange exchange = getEndpoint().createExchange(minioObject, minioObjectSummary.objectName());
+                    answer.add(exchange);
+                }
+            } else {
+                for (Contents minioObjectSummary : minioObjectSummaries) {
+                    // ignore if directory
+                    if (!minioObjectSummary.isDir()) {
+                        InputStream minioObject = getObject(bucketName, getMinioClient(), minioObjectSummary.objectName());
+                        minioObjects.add(minioObject);
+                        Exchange exchange = getEndpoint().createExchange(minioObject, minioObjectSummary.objectName());
+                        answer.add(exchange);
+                    }
+                }
+            }
+
+        } catch (Throwable e) {
+            LOG.warn("Error getting MinioObject due: {}", e.getMessage());
+            throw e;
+
+        } finally {
+            // ensure all previous gathered minio objects are closed
+            // if there was an exception creating the exchanges in this batch
+            minioObjects.forEach(IOHelper::close);
+        }
+
+        return answer;
+    }
+
+    private InputStream getObject(String bucketName, MinioClient minioClient, String objectName) throws Exception {
+        GetObjectArgs.Builder getObjectRequest = GetObjectArgs.builder().bucket(bucketName).object(objectName);
+
+        if (getConfiguration().getServerSideEncryptionCustomerKey() != null) {
+            getObjectRequest.ssec(getConfiguration().getServerSideEncryptionCustomerKey());
+        }
+        if (getConfiguration().getOffset() != 0) {
+            getObjectRequest.offset(getConfiguration().getOffset());
+        }
+        if (getConfiguration().getLength() != 0) {
+            getObjectRequest.length(getConfiguration().getLength());
+        }
+        if (getConfiguration().getVersionId() != null) {
+            getObjectRequest.versionId(getConfiguration().getVersionId());
+        }
+        if (getConfiguration().getMatchETag() != null) {
+            getObjectRequest.matchETag(getConfiguration().getMatchETag());
+        }
+        if (getConfiguration().getNotMatchETag() != null) {
+            getObjectRequest.notMatchETag(getConfiguration().getNotMatchETag());
+        }
+        if (getConfiguration().getModifiedSince() != null) {
+            getObjectRequest.modifiedSince(getConfiguration().getModifiedSince());
+        }
+        if (getConfiguration().getUnModifiedSince() != null) {
+            getObjectRequest.unmodifiedSince(getConfiguration().getUnModifiedSince());
+        }
+
+        return minioClient.getObject(getObjectRequest.build());
+    }
+
+    @Override
+    public int processBatch(Queue<Object> exchanges) {
+        int total = exchanges.size();
+
+        for (int index = 0; index < total && isBatchAllowed(); index++) {
+            // only loop if we are started (allowed to run)
+            final Exchange exchange = ObjectHelper.cast(Exchange.class, exchanges.poll());
+            // add current index and total as properties
+            exchange.setProperty(Exchange.BATCH_INDEX, index);
+            exchange.setProperty(Exchange.BATCH_SIZE, total);
+            exchange.setProperty(Exchange.BATCH_COMPLETE, index == total - 1);
+
+            // update pending number of exchanges
+            pendingExchanges = total - index - 1;
+
+            // add on completion to handle after work when the exchange is done
+            exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() {
+                public void onComplete(Exchange exchange) {
+                    processCommit(exchange);
+                }
+
+                public void onFailure(Exchange exchange) {
+                    processRollback(exchange);
+                }
+
+                @Override
+                public String toString() {
+                    return "MinioConsumerOnCompletion";
+                }
+            });
+
+            LOG.trace("Processing exchange ...");
+            getAsyncProcessor().process(exchange, doneSync -> LOG.trace("Processing exchange done."));
+        }
+
+        return total;
+    }
+
+    /**
+     * Strategy to delete the message after being processed.
+     *
+     * @param exchange the exchange
+     */
+    protected void processCommit(Exchange exchange) {
+        try {
+            String srcBucketName = exchange.getIn().getHeader(MinioConstants.BUCKET_NAME, String.class);
+            String srcObjectName = exchange.getIn().getHeader(MinioConstants.OBJECT_NAME, String.class);
+
+            if (getConfiguration().isDeleteAfterRead() || getConfiguration().isMoveAfterRead()) {
+                if (getConfiguration().isMoveAfterRead()) {
+                    copyObject(srcBucketName, srcObjectName);
+                    LOG.trace("Copied object from bucket {} with objectName {} to bucket {}...",
+                            srcBucketName, srcObjectName, getConfiguration().getDestinationBucketName());
+                }
+
+                LOG.trace("Deleting object from bucket {} with objectName {}...", srcBucketName, srcObjectName);
+
+                RemoveObjectArgs.Builder removeObjectRequest = RemoveObjectArgs.builder()
+                        .bucket(srcBucketName)
+                        .object(srcObjectName)
+                        .bypassGovernanceMode(getConfiguration().isBypassGovernanceMode());
+
+                if (getConfiguration().getVersionId() != null) {
+                    removeObjectRequest.versionId(getConfiguration().getVersionId());
+                }
+                if (getConfiguration().isBypassGovernanceMode()) {
+                    removeObjectRequest.versionId(getConfiguration().getVersionId());
+                }
+                getMinioClient().removeObject(removeObjectRequest.build());
+
+                LOG.trace("Deleted object from bucket {} with objectName {}...", srcBucketName, srcObjectName);
+            }
+        } catch (MinioException e) {
+            getExceptionHandler().handleException("Error occurred during moving or deleting object. This exception is ignored.",
+                    exchange, e);
+        } catch (Exception e) {
+            LOG.trace("Error process commit...");

Review comment:
       @Nayananga my question is about catching `Exception e`, do you need your consumer to ignore *all* exceptions (as the case now) or you need your consumer to throw  and halt camel process in case minio client throw any exceptions other than `MinioException`? My gut feeling, you may just need to remove `catch (Exception e)` block and only handle `MinioException` as you are currently doing, by that camel will halt its process in any unexpected process.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [camel] Nayananga commented on a change in pull request #3897: CAMEL-13934 camel-minio - Component to store/load files from blob store

GitBox
In reply to this post by GitBox

Nayananga commented on a change in pull request #3897:
URL: https://github.com/apache/camel/pull/3897#discussion_r466215578



##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioConsumer.java
##########
@@ -0,0 +1,381 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio;
+
+import java.io.InputStream;
+import java.util.*;
+
+import io.minio.CopyObjectArgs;
+import io.minio.CopySource;
+import io.minio.GetObjectArgs;
+import io.minio.ListObjectsArgs;
+import io.minio.MinioClient;
+import io.minio.RemoveObjectArgs;
+import io.minio.errors.MinioException;
+import io.minio.messages.Contents;
+import io.minio.messages.ListBucketResultV2;
+import org.apache.camel.Exchange;
+import org.apache.camel.ExtendedExchange;
+import org.apache.camel.Processor;
+import org.apache.camel.spi.Synchronization;
+import org.apache.camel.support.ScheduledBatchPollingConsumer;
+import org.apache.camel.util.CastUtils;
+import org.apache.camel.util.IOHelper;
+import org.apache.camel.util.ObjectHelper;
+import org.apache.camel.util.URISupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A Consumer of messages from the Minio Storage Service.
+ */
+public class MinioConsumer extends ScheduledBatchPollingConsumer {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MinioConsumer.class);
+
+    private String continuationToken;
+    private transient String minioConsumerToString;
+
+    public MinioConsumer(MinioEndpoint endpoint, Processor processor) {
+        super(endpoint, processor);
+    }
+
+    @Override
+    protected int poll() throws Exception {
+        // must reset for each poll
+        shutdownRunningTask = null;
+        pendingExchanges = 0;
+
+        String bucketName = getConfiguration().getBucketName();
+        String objectName = getConfiguration().getObjectName();
+        MinioClient minioClient = getMinioClient();
+        Queue<Exchange> exchanges;
+
+        if (objectName != null) {
+            LOG.trace("Getting object in bucket {} with object name {}...", bucketName, objectName);
+
+            InputStream minioObject = getObject(bucketName, minioClient, objectName);
+            exchanges = createExchanges(minioObject, objectName);
+
+        } else {
+
+            LOG.trace("Queueing objects in bucket [{}]...", bucketName);
+
+            ListObjectsArgs.Builder listObjectRequest = ListObjectsArgs.builder()
+                    .bucket(bucketName)
+                    .includeUserMetadata(getConfiguration().isIncludeUserMetadata())
+                    .includeVersions(getConfiguration().isIncludeVersions())
+                    .recursive(getConfiguration().isRecursive())
+                    .useApiVersion1(getConfiguration().isUseVersion1());
+
+            if (getConfiguration().getDelimiter() != null) {
+                listObjectRequest.delimiter(getConfiguration().getDelimiter());
+            }
+
+            if (maxMessagesPerPoll > 0) {
+                listObjectRequest.maxKeys(maxMessagesPerPoll);
+            }
+
+            if (getConfiguration().getPrefix() != null) {
+                listObjectRequest.prefix(getConfiguration().getPrefix());
+            }
+
+            if (getConfiguration().getStartAfter() != null) {
+                listObjectRequest.startAfter(getConfiguration().getStartAfter());
+            }
+
+            // if there was a marker from previous poll then use that to
+            // continue from where we left last time
+            if (continuationToken != null) {
+                LOG.trace("Resuming from marker: {}", continuationToken);
+                listObjectRequest.continuationToken(continuationToken);
+            }
+
+            // TODO: Check for validity of the statement
+            ListBucketResultV2 listObjects = (ListBucketResultV2) getMinioClient().listObjects(listObjectRequest.build());
+
+            if (listObjects.isTruncated()) {
+                LOG.trace("Returned list is truncated, so setting next marker: {}", continuationToken);
+                continuationToken = listObjects.nextContinuationToken();
+
+            } else {
+                // no more data so clear marker
+                continuationToken = null;
+            }
+            if (LOG.isTraceEnabled()) {
+                LOG.trace("Found {} objects in bucket [{}]...", listObjects.contents().size(), bucketName);
+            }
+
+            exchanges = createExchanges(listObjects.contents());
+        }
+        return processBatch(CastUtils.cast(exchanges));
+    }
+
+    protected Queue<Exchange> createExchanges(InputStream objectStream, String objectName) throws Exception {
+        Queue<Exchange> answer = new LinkedList<>();
+        Exchange exchange = getEndpoint().createExchange(objectStream, objectName);
+        answer.add(exchange);
+        IOHelper.close(objectStream);
+        return answer;
+    }
+
+    protected Queue<Exchange> createExchanges(List<Contents> minioObjectSummaries) throws Exception {
+        if (LOG.isTraceEnabled()) {
+            LOG.trace("Received {} messages in this poll", minioObjectSummaries.size());
+        }
+        String bucketName = getConfiguration().getBucketName();
+        Collection<InputStream> minioObjects = new ArrayList<>();
+        Queue<Exchange> answer = new LinkedList<>();
+        try {
+            if (getConfiguration().isIncludeFolders()) {
+                for (Contents minioObjectSummary : minioObjectSummaries) {
+                    InputStream minioObject = getObject(bucketName, getMinioClient(), minioObjectSummary.objectName());
+                    minioObjects.add(minioObject);
+                    Exchange exchange = getEndpoint().createExchange(minioObject, minioObjectSummary.objectName());
+                    answer.add(exchange);
+                }
+            } else {
+                for (Contents minioObjectSummary : minioObjectSummaries) {
+                    // ignore if directory
+                    if (!minioObjectSummary.isDir()) {
+                        InputStream minioObject = getObject(bucketName, getMinioClient(), minioObjectSummary.objectName());
+                        minioObjects.add(minioObject);
+                        Exchange exchange = getEndpoint().createExchange(minioObject, minioObjectSummary.objectName());
+                        answer.add(exchange);
+                    }
+                }
+            }
+
+        } catch (Throwable e) {
+            LOG.warn("Error getting MinioObject due: {}", e.getMessage());
+            throw e;
+
+        } finally {
+            // ensure all previous gathered minio objects are closed
+            // if there was an exception creating the exchanges in this batch
+            minioObjects.forEach(IOHelper::close);
+        }
+
+        return answer;
+    }
+
+    private InputStream getObject(String bucketName, MinioClient minioClient, String objectName) throws Exception {
+        GetObjectArgs.Builder getObjectRequest = GetObjectArgs.builder().bucket(bucketName).object(objectName);
+
+        if (getConfiguration().getServerSideEncryptionCustomerKey() != null) {
+            getObjectRequest.ssec(getConfiguration().getServerSideEncryptionCustomerKey());
+        }
+        if (getConfiguration().getOffset() != 0) {
+            getObjectRequest.offset(getConfiguration().getOffset());
+        }
+        if (getConfiguration().getLength() != 0) {
+            getObjectRequest.length(getConfiguration().getLength());
+        }
+        if (getConfiguration().getVersionId() != null) {
+            getObjectRequest.versionId(getConfiguration().getVersionId());
+        }
+        if (getConfiguration().getMatchETag() != null) {
+            getObjectRequest.matchETag(getConfiguration().getMatchETag());
+        }
+        if (getConfiguration().getNotMatchETag() != null) {
+            getObjectRequest.notMatchETag(getConfiguration().getNotMatchETag());
+        }
+        if (getConfiguration().getModifiedSince() != null) {
+            getObjectRequest.modifiedSince(getConfiguration().getModifiedSince());
+        }
+        if (getConfiguration().getUnModifiedSince() != null) {
+            getObjectRequest.unmodifiedSince(getConfiguration().getUnModifiedSince());
+        }
+
+        return minioClient.getObject(getObjectRequest.build());
+    }
+
+    @Override
+    public int processBatch(Queue<Object> exchanges) {
+        int total = exchanges.size();
+
+        for (int index = 0; index < total && isBatchAllowed(); index++) {
+            // only loop if we are started (allowed to run)
+            final Exchange exchange = ObjectHelper.cast(Exchange.class, exchanges.poll());
+            // add current index and total as properties
+            exchange.setProperty(Exchange.BATCH_INDEX, index);
+            exchange.setProperty(Exchange.BATCH_SIZE, total);
+            exchange.setProperty(Exchange.BATCH_COMPLETE, index == total - 1);
+
+            // update pending number of exchanges
+            pendingExchanges = total - index - 1;
+
+            // add on completion to handle after work when the exchange is done
+            exchange.adapt(ExtendedExchange.class).addOnCompletion(new Synchronization() {
+                public void onComplete(Exchange exchange) {
+                    processCommit(exchange);
+                }
+
+                public void onFailure(Exchange exchange) {
+                    processRollback(exchange);
+                }
+
+                @Override
+                public String toString() {
+                    return "MinioConsumerOnCompletion";
+                }
+            });
+
+            LOG.trace("Processing exchange ...");
+            getAsyncProcessor().process(exchange, doneSync -> LOG.trace("Processing exchange done."));
+        }
+
+        return total;
+    }
+
+    /**
+     * Strategy to delete the message after being processed.
+     *
+     * @param exchange the exchange
+     */
+    protected void processCommit(Exchange exchange) {
+        try {
+            String srcBucketName = exchange.getIn().getHeader(MinioConstants.BUCKET_NAME, String.class);
+            String srcObjectName = exchange.getIn().getHeader(MinioConstants.OBJECT_NAME, String.class);
+
+            if (getConfiguration().isDeleteAfterRead() || getConfiguration().isMoveAfterRead()) {
+                if (getConfiguration().isMoveAfterRead()) {
+                    copyObject(srcBucketName, srcObjectName);
+                    LOG.trace("Copied object from bucket {} with objectName {} to bucket {}...",
+                            srcBucketName, srcObjectName, getConfiguration().getDestinationBucketName());
+                }
+
+                LOG.trace("Deleting object from bucket {} with objectName {}...", srcBucketName, srcObjectName);
+
+                RemoveObjectArgs.Builder removeObjectRequest = RemoveObjectArgs.builder()
+                        .bucket(srcBucketName)
+                        .object(srcObjectName)
+                        .bypassGovernanceMode(getConfiguration().isBypassGovernanceMode());
+
+                if (getConfiguration().getVersionId() != null) {
+                    removeObjectRequest.versionId(getConfiguration().getVersionId());
+                }
+                if (getConfiguration().isBypassGovernanceMode()) {
+                    removeObjectRequest.versionId(getConfiguration().getVersionId());
+                }
+                getMinioClient().removeObject(removeObjectRequest.build());
+
+                LOG.trace("Deleted object from bucket {} with objectName {}...", srcBucketName, srcObjectName);
+            }
+        } catch (MinioException e) {
+            getExceptionHandler().handleException("Error occurred during moving or deleting object. This exception is ignored.",
+                    exchange, e);
+        } catch (Exception e) {
+            LOG.trace("Error process commit...");

Review comment:
       Hi @omarsmak, thank you for your suggestion. I did some changes :)




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [camel] Nayananga commented on a change in pull request #3897: CAMEL-13934 camel-minio - Component to store/load files from blob store

GitBox
In reply to this post by GitBox

Nayananga commented on a change in pull request #3897:
URL: https://github.com/apache/camel/pull/3897#discussion_r466216204



##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/client/GetMinioClient.java
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio.client;
+
+import io.minio.MinioClient;
+import org.apache.camel.component.minio.MinioConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Creates MinIO client object according to the
+ * given endpoint, port, access key, secret key, region and secure option.
+ */
+public class GetMinioClient implements MinioCamelInternalClient {
+    private static final Logger LOG = LoggerFactory.getLogger(GetMinioClient.class);
+    private final MinioConfiguration configuration;
+
+    /**
+     * Constructor that uses the config file.
+     */
+    public GetMinioClient(MinioConfiguration configuration) {
+        LOG.trace("Creating an Minio client.");
+        this.configuration = configuration;
+    }
+
+    /**
+     * Getting the minio client.
+     *
+     * @return Minio Client.
+     */
+    @Override
+    public MinioClient getMinioClient() {
+        if (configuration.getEndpoint() != null) {
+            MinioClient.Builder minioClientRequest = MinioClient.builder();
+
+            if (configuration.getProxyPort() != null) {
+                minioClientRequest.endpoint(configuration.getEndpoint(), configuration.getProxyPort(), configuration.isSecure());
+            } else {
+                minioClientRequest.endpoint(configuration.getEndpoint());
+            }
+            if (configuration.getAccessKey() != null && configuration.getSecretKey() != null) {

Review comment:
       @omarsmak is it okay to mark this as resolved?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [camel] Nayananga commented on a change in pull request #3897: CAMEL-13934 camel-minio - Component to store/load files from blob store

GitBox
In reply to this post by GitBox

Nayananga commented on a change in pull request #3897:
URL: https://github.com/apache/camel/pull/3897#discussion_r466837828



##########
File path: components/camel-minio/src/test/java/org/apache/camel/component/minio/integration/MinioConsumerIntegrationTest.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio.integration;
+
+import io.minio.MinioClient;
+import org.apache.camel.BindToRegistry;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.minio.MinioConstants;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+@Disabled("Must be manually tested. Provide your own accessKey and secretKey!")
+public class MinioConsumerIntegrationTest extends CamelTestSupport {
+
+    @BindToRegistry("minioClient")
+    MinioClient client = MinioClient.builder()
+            .endpoint("https://play.min.io")
+            .credentials("Q3AM3UQ867SPQQA43P2F", "zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG")

Review comment:
       @omarsmak, @DenisIstomin I have put minio_key.properties file in test/resources file along with the log4j2.properties file,
   ```
   endpoint=https://play.min.io
   accessKey=Q3AM3UQ867SPQQA43P2F
   secretKey=zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG
   region=us-west-1
   ```
   but then it throws this error ```java.lang.Exception: Make sure to supply minio endpoint and credentials``` as throws in [this](https://github.com/apache/camel/blob/0370bc71394616c407048594bd840f93a11f23f5/components/camel-azure-storage-blob/src/test/java/org/apache/camel/component/azure/storage/blob/BlobTestUtils.java#L43) line. Any suggestions?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [camel] Nayananga commented on a change in pull request #3897: CAMEL-13934 camel-minio - Component to store/load files from blob store

GitBox
In reply to this post by GitBox

Nayananga commented on a change in pull request #3897:
URL: https://github.com/apache/camel/pull/3897#discussion_r466840183



##########
File path: components/camel-minio/src/test/java/org/apache/camel/component/minio/integration/MinioConsumerIntegrationTest.java
##########
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio.integration;
+
+import io.minio.MinioClient;
+import org.apache.camel.BindToRegistry;
+import org.apache.camel.EndpointInject;
+import org.apache.camel.ProducerTemplate;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.minio.MinioConstants;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.junit5.CamelTestSupport;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+
+@Disabled("Must be manually tested. Provide your own accessKey and secretKey!")
+public class MinioConsumerIntegrationTest extends CamelTestSupport {
+
+    @BindToRegistry("minioClient")
+    MinioClient client = MinioClient.builder()
+            .endpoint("https://play.min.io")
+            .credentials("Q3AM3UQ867SPQQA43P2F", "zuf+tfteSlswRu7BJ86wekitnifILbZam1KYY3TG")

Review comment:
       Hi this problem solved




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [camel] Nayananga commented on a change in pull request #3897: CAMEL-13934 camel-minio - Component to store/load files from blob store

GitBox
In reply to this post by GitBox

Nayananga commented on a change in pull request #3897:
URL: https://github.com/apache/camel/pull/3897#discussion_r467483667



##########
File path: catalog/camel-catalog/src/generated/resources/org/apache/camel/catalog/docs/minio-component.adoc
##########
@@ -0,0 +1,162 @@
+[[minio-component]]
+= Minio Storage Service Component
+:docTitle: Minio Storage Service
+:artifactId: camel-minio
+:description: Store and retrie objects from Minio Storage Service using Minio SDK.
+:since: 3.5
+:supportLevel: Preview
+:component-header: Both producer and consumer are supported
+
+*Since Camel {since}*
+
+*{component-header}*
+
+
+// component options: START
+The Minio Storage Service component supports 45 options, which are listed below.
+
+
+
+[width="100%",cols="2,5,^1,2",options="header"]
+|===
+| Name | Description | Default | Type
+| *autoCreateBucket* (common) | Setting the autocreation of the bucket if bucket name not exist. | true | boolean
+| *bypassGovernanceMode* (common) | Set this flag if you want to bypassGovernanceMode when deleting a particular object. | false | boolean
+| *configuration* (common) | The component configuration |  | MinioConfiguration
+| *customHttpClient* (common) | Set custom HTTP client for authenticated access. |  | OkHttpClient
+| *endpoint* (common) | Endpoint can be an URL, domain name, IPv4 address or IPv6 address. |  | String
+| *minioClient* (common) | Reference to a Minio Client object in the registry. |  | MinioClient
+| *objectLock* (common) | Set when creating new bucket. | false | boolean
+| *pojoRequest* (common) | If we want to use a POJO request as body or not. | false | boolean

Review comment:
       Hi @dmvolod, are you suggesting that I can group boolean flags with the same default value together to some enums types?
   and also
   
   > Usually the minio API and Camel component pararameters can be not the same types and names.
   
   could you provide me with an example as such?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [camel] Nayananga commented on a change in pull request #3897: CAMEL-13934 camel-minio - Component to store/load files from blob store

GitBox
In reply to this post by GitBox

Nayananga commented on a change in pull request #3897:
URL: https://github.com/apache/camel/pull/3897#discussion_r467484654



##########
File path: components/camel-minio/src/main/java/org/apache/camel/component/minio/MinioComponent.java
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.minio;
+
+import java.util.Map;
+import java.util.Set;
+
+import io.minio.MinioClient;
+import org.apache.camel.CamelContext;
+import org.apache.camel.spi.Metadata;
+import org.apache.camel.spi.annotations.Component;
+import org.apache.camel.support.DefaultComponent;
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Represents the component that manages {@link MinioEndpoint}.
+ */
+@Component("minio")
+public class MinioComponent extends DefaultComponent {
+
+    private static final Logger LOG = LoggerFactory.getLogger(MinioComponent.class);
+
+    @Metadata
+    private MinioConfiguration configuration = new MinioConfiguration();
+
+    public MinioComponent() {
+        this(null);
+    }
+
+    public MinioComponent(CamelContext context) {
+        super(context);
+        registerExtension(new MinioComponentVerifierExtension());
+
+    }
+
+    @Override
+    protected MinioEndpoint createEndpoint(String uri, String remaining, Map<String, Object> parameters) throws Exception {
+        if (remaining == null || remaining.trim().length() == 0) {
+            throw new IllegalArgumentException("Bucket name must be specified.");
+        }
+
+        final MinioConfiguration configuration = this.configuration != null ? this.configuration.copy() : new MinioConfiguration();
+        configuration.setBucketName(remaining);
+        MinioEndpoint endpoint = new MinioEndpoint(uri, this, configuration);
+        setProperties(endpoint, parameters);
+        checkAndSetRegistryClient(configuration, endpoint);
+
+        return endpoint;
+    }
+
+    public MinioConfiguration getConfiguration() {
+        return configuration;
+    }
+
+    /**
+     * The component configuration
+     */
+    public void setConfiguration(MinioConfiguration configuration) {
+        this.configuration = configuration;
+    }
+
+    private void checkAndSetRegistryClient(MinioConfiguration configuration, MinioEndpoint endpoint) {
+        if (ObjectHelper.isEmpty(endpoint.getConfiguration().getMinioClient())) {
+            LOG.debug("Looking for an MinioClient instance in the registry");
+            Set<MinioClient> clients = getCamelContext().getRegistry().findByType(MinioClient.class);
+            if (clients.size() == 1) {

Review comment:
       Hi all,
   ```
   if (clients.size() > 1) {
                   LOG.debug("Found more than one MinioClient instance in the registry, getting the first one...");
                   configuration.setMinioClient(clients.stream().findFirst().get());
               } else if (clients.size() == 1) {
                   LOG.debug("Found exactly one MinioClient instance in the registry");
                   configuration.setMinioClient(clients.stream().findFirst().get());
               } else {
                   LOG.debug("No MinioClient instance in the registry");
               }
   ```
   is this approach okay?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [camel] davsclaus commented on pull request #3897: CAMEL-13934 camel-minio - Component to store/load files from blob store

GitBox
In reply to this post by GitBox

davsclaus commented on pull request #3897:
URL: https://github.com/apache/camel/pull/3897#issuecomment-671331237


   Yeah lets get it merged and polish on it a bit.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


Reply | Threaded
Open this post in threaded view
|

[GitHub] [camel] davsclaus merged pull request #3897: CAMEL-13934 camel-minio - Component to store/load files from blob store

GitBox
In reply to this post by GitBox

davsclaus merged pull request #3897:
URL: https://github.com/apache/camel/pull/3897


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]


123