diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java index b60fc938..79d15e54 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java @@ -66,6 +66,8 @@ public class KinesisProxy implements IKinesisProxyExtended { private final String streamName; + private Set shardIDSet; + private static final long DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS = 1000L; private static final int DEFAULT_DESCRIBE_STREAM_RETRY_TIMES = 50; private final long describeStreamBackoffTimeInMillis; @@ -213,7 +215,7 @@ public class KinesisProxy implements IKinesisProxyExtended { */ @Override public Shard getShard(String shardId) { - if (this.listOfShardsSinceLastGet.get() == null) { + if (this.listOfShardsSinceLastGet.get() == null || !shardIDSet.contains(shardId)) { //Update this.listOfShardsSinceLastGet as needed. this.getShardList(); } @@ -234,6 +236,7 @@ public class KinesisProxy implements IKinesisProxyExtended { @Override public synchronized List getShardList() { + shardIDSet = new HashSet<>(); DescribeStreamResult response; if (shardIterationState == null) { shardIterationState = new ShardIterationState(); @@ -255,6 +258,11 @@ public class KinesisProxy implements IKinesisProxyExtended { this.listOfShardsSinceLastGet.set(shardIterationState.getShards()); shardIterationState = new ShardIterationState(); + + for(Shard shard : listOfShardsSinceLastGet.get() ){ + shardIDSet.add(shard.getShardId()); + } + return listOfShardsSinceLastGet.get(); } @@ -269,7 +277,7 @@ public class KinesisProxy implements IKinesisProxyExtended { } else { Set shardIds = new HashSet(); - for (Shard shard : getShardList()) { + for (Shard shard : shards) { shardIds.add(shard.getShardId()); }