[GitHub] [camel-kafka-connector] ruchirvaninasdaq edited a comment on issue #97: S3 source connector delete files from source bucket

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[GitHub] [camel-kafka-connector] ruchirvaninasdaq edited a comment on issue #97: S3 source connector delete files from source bucket

GitBox

ruchirvaninasdaq edited a comment on issue #97:
URL: https://github.com/apache/camel-kafka-connector/issues/97#issuecomment-673628462


   In S3Enpoint.java class:
   
   ```
   
   `public Exchange createExchange(ExchangePattern pattern, S3Object s3Object) {
           LOG.trace("Getting object with key [{}] from bucket [{}]...", s3Object.getKey(), s3Object.getBucketName());
   
           ObjectMetadata objectMetadata = s3Object.getObjectMetadata();
   
           LOG.trace("Got object [{}]", s3Object);
   
           Exchange exchange = super.createExchange(pattern);
           Message message = exchange.getIn();
   
           if (configuration.isIncludeBody()) {
               if(verifySchema()){
                   message.setBody(s3Object.getObjectContent());
                   message.setHeader(S3Constants.SUCCESSFUL_MESSAGE, "true");
               } else {
                   Log.error()
                   message.setHeader(S3Constants.SUCCESSFUL_MESSAGE, "false");
                   message.setBody(null);
               }
           } else {
               message.setBody(null);
           }
   
           message.setHeader(S3Constants.KEY, s3Object.getKey());
           message.setHeader(S3Constants.BUCKET_NAME, s3Object.getBucketName());
           message.setHeader(S3Constants.E_TAG, objectMetadata.getETag());
           message.setHeader(S3Constants.LAST_MODIFIED, objectMetadata.getLastModified());
           message.setHeader(S3Constants.VERSION_ID, objectMetadata.getVersionId());
           message.setHeader(S3Constants.CONTENT_TYPE, objectMetadata.getContentType());
           message.setHeader(S3Constants.CONTENT_MD5, objectMetadata.getContentMD5());
           message.setHeader(S3Constants.CONTENT_LENGTH, objectMetadata.getContentLength());
           message.setHeader(S3Constants.CONTENT_ENCODING, objectMetadata.getContentEncoding());
           message.setHeader(S3Constants.CONTENT_DISPOSITION, objectMetadata.getContentDisposition());
           message.setHeader(S3Constants.CACHE_CONTROL, objectMetadata.getCacheControl());
           message.setHeader(S3Constants.S3_HEADERS, objectMetadata.getRawMetadata());
           message.setHeader(S3Constants.SERVER_SIDE_ENCRYPTION, objectMetadata.getSSEAlgorithm());
           message.setHeader(S3Constants.USER_METADATA, objectMetadata.getUserMetadata());
           message.setHeader(S3Constants.EXPIRATION_TIME, objectMetadata.getExpirationTime());
           message.setHeader(S3Constants.REPLICATION_STATUS, objectMetadata.getReplicationStatus());
           message.setHeader(S3Constants.STORAGE_CLASS, objectMetadata.getStorageClass());
   
           /**
            * If includeBody != true, it is safe to close the object here. If
            * includeBody == true, the caller is responsible for closing the stream
            * and object once the body has been fully consumed. As of 2.17, the
            * consumer does not close the stream or object on commit.
            */
           if (!configuration.isIncludeBody()) {
               IOHelper.close(s3Object);
           } else {
               if (configuration.isAutocloseBody()) {
                   exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() {
                       @Override
                       public void onDone(Exchange exchange) {
                           IOHelper.close(s3Object);
                       }
                   });
               }
           }
   
           return exchange;
       }`
   
   ```
   
   
   And within S3Consumer:
   
   ```
   `
   protected void processCommit(Exchange exchange) {
           
   try {
               if (getConfiguration().isDeleteAfterRead()) {
                   String bucketName = exchange.getIn().getHeader(S3Constants.BUCKET_NAME, String.class);
                   String key = exchange.getIn().getHeader(S3Constants.KEY, String.class);
   
   
                   LOG.trace("Deleting object from bucket {} with key {}...", bucketName, key);
                   if (exchange.getIn().getHeader(S3Constants.SUCCESSFUL_MESSAGE, String.class).equals("true")) {
                       getAmazonS3Client().copyObject(bucketName, key, getConfiguration().getTargetBucketName(), generateSuccessKey(exchange));
   
                   }
                   else {
                       getAmazonS3Client().copyObject(bucketName, key, getConfiguration().getTargetBucketName(), generatefailedKey(exchange));
   
                   }
                   getAmazonS3Client().deleteObject(bucketName, key);
   
                   LOG.trace("Deleted object from bucket {} with key {}...", bucketName, key);
               }
           } catch (AmazonClientException e) {
               getExceptionHandler().handleException("Error occurred during deleting object. This exception is ignored.", exchange, e);
           }
       }`
   
   
   ```


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[hidden email]