[camel] branch master updated: CAMEL-15358: avoid an IndexOutOfBoundsException if there are no shards (#4065)

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[camel] branch master updated: CAMEL-15358: avoid an IndexOutOfBoundsException if there are no shards (#4065)

acosentino
This is an automated email from the ASF dual-hosted git repository.

acosentino pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 7ca80f2  CAMEL-15358: avoid an IndexOutOfBoundsException if there are no shards (#4065)
7ca80f2 is described below

commit 7ca80f2ce3ce85df53ca5e3e325318e02df653fc
Author: Otavio Rodolfo Piske <[hidden email]>
AuthorDate: Sat Aug 1 16:27:28 2020 +0200

    CAMEL-15358: avoid an IndexOutOfBoundsException if there are no shards (#4065)
---
 .../component/aws/kinesis/KinesisConsumer.java     | 34 ++++++++++++++++++----
 1 file changed, 29 insertions(+), 5 deletions(-)

diff --git a/components/camel-aws-kinesis/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java b/components/camel-aws-kinesis/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java
index adbf237..7c30c34 100644
--- a/components/camel-aws-kinesis/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java
+++ b/components/camel-aws-kinesis/src/main/java/org/apache/camel/component/aws/kinesis/KinesisConsumer.java
@@ -52,7 +52,20 @@ public class KinesisConsumer extends ScheduledBatchPollingConsumer {
 
     @Override
     protected int poll() throws Exception {
-        GetRecordsRequest req = new GetRecordsRequest().withShardIterator(getShardItertor()).withLimit(getEndpoint().getConfiguration().getMaxResultsPerRequest());
+        String shardIterator = getShardIterator();
+
+        if (shardIterator == null) {
+            // probably closed. Returning 0 as nothing was processed
+
+            return 0;
+        }
+
+        GetRecordsRequest req = new GetRecordsRequest()
+                .withShardIterator(shardIterator)
+                .withLimit(getEndpoint()
+                        .getConfiguration()
+                        .getMaxResultsPerRequest());
+
         GetRecordsResult result = getClient().getRecords(req);
 
         Queue<Exchange> exchanges = createExchanges(result.getRecords());
@@ -61,7 +74,7 @@ public class KinesisConsumer extends ScheduledBatchPollingConsumer {
         // May cache the last successful sequence number, and pass it to the
         // getRecords request. That way, on the next poll, we start from where
         // we left off, however, I don't know what happens to subsequent
-        // exchanges when an earlier echangee fails.
+        // exchanges when an earlier exchange fails.
 
         currentShardIterator = result.getNextShardIterator();
         if (isShardClosed) {
@@ -109,7 +122,7 @@ public class KinesisConsumer extends ScheduledBatchPollingConsumer {
         return (KinesisEndpoint)super.getEndpoint();
     }
 
-    private String getShardItertor() {
+    private String getShardIterator() {
         // either return a cached one or get a new one via a GetShardIterator
         // request.
         if (currentShardIterator == null) {
@@ -129,11 +142,22 @@ public class KinesisConsumer extends ScheduledBatchPollingConsumer {
             } else {
                 DescribeStreamRequest req1 = new DescribeStreamRequest().withStreamName(getEndpoint().getConfiguration().getStreamName());
                 DescribeStreamResult res1 = getClient().describeStream(req1);
-                shardId = res1.getStreamDescription().getShards().get(0).getShardId();
-                isShardClosed = res1.getStreamDescription().getShards().get(0).getSequenceNumberRange().getEndingSequenceNumber() != null;
+
+                List<Shard> shards = res1.getStreamDescription().getShards();
+
+                if (shards.size() == 0) {
+                    LOG.warn("There are no shards in the stream");
+                    // NOTE: Should we also set isShardClosed to true?
+                    return null;
+                }
+
+                shardId = shards.get(0).getShardId();
+                isShardClosed = shards.get(0).getSequenceNumberRange().getEndingSequenceNumber() != null;
             }
+
             LOG.debug("ShardId is: {}", shardId);
 
+
             GetShardIteratorRequest req = new GetShardIteratorRequest().withStreamName(getEndpoint().getConfiguration().getStreamName()).withShardId(shardId)
                     .withShardIteratorType(getEndpoint().getConfiguration().getIteratorType());