From 3e2423ad67c1bafe54ce3d2aafd6cf858508b1c4 Mon Sep 17 00:00:00 2001 From: Niu Date: Tue, 13 Jun 2017 16:29:24 -0400 Subject: [PATCH] Changed getAllShardIds to use the 'shards' variable rather than calling getShardList() twice, added private set class variable 'shardIDSet' to hold shard IDs, logic to save shard IDs in getShardList(), and logic to check whether the shard ID exists in getShard(). This is a fix for issue 55 --- .../kinesis/clientlibrary/proxies/KinesisProxy.java | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java index b60fc938..79d15e54 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java @@ -66,6 +66,8 @@ public class KinesisProxy implements IKinesisProxyExtended { private final String streamName; + private Set shardIDSet; + private static final long DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS = 1000L; private static final int DEFAULT_DESCRIBE_STREAM_RETRY_TIMES = 50; private final long describeStreamBackoffTimeInMillis; @@ -213,7 +215,7 @@ public class KinesisProxy implements IKinesisProxyExtended { */ @Override public Shard getShard(String shardId) { - if (this.listOfShardsSinceLastGet.get() == null) { + if (this.listOfShardsSinceLastGet.get() == null || !shardIDSet.contains(shardId)) { //Update this.listOfShardsSinceLastGet as needed. this.getShardList(); } @@ -234,6 +236,7 @@ public class KinesisProxy implements IKinesisProxyExtended { @Override public synchronized List getShardList() { + shardIDSet = new HashSet<>(); DescribeStreamResult response; if (shardIterationState == null) { shardIterationState = new ShardIterationState(); @@ -255,6 +258,11 @@ public class KinesisProxy implements IKinesisProxyExtended { this.listOfShardsSinceLastGet.set(shardIterationState.getShards()); shardIterationState = new ShardIterationState(); + + for(Shard shard : listOfShardsSinceLastGet.get() ){ + shardIDSet.add(shard.getShardId()); + } + return listOfShardsSinceLastGet.get(); } @@ -269,7 +277,7 @@ public class KinesisProxy implements IKinesisProxyExtended { } else { Set shardIds = new HashSet(); - for (Shard shard : getShardList()) { + for (Shard shard : shards) { shardIds.add(shard.getShardId()); }