Changed getAllShardIds to use the 'shards' variable rather than calling getShardList() twice, added private set class variable 'shardIDSet' to hold shard IDs, logic to save shard IDs in getShardList(), and logic to check whether the shard ID exists in getShard(). This is a fix for issue 55

This commit is contained in:
Niu 2017-06-13 16:29:24 -04:00
parent e121691ac2
commit 3e2423ad67

View file

@ -66,6 +66,8 @@ public class KinesisProxy implements IKinesisProxyExtended {
private final String streamName; private final String streamName;
private Set<String> shardIDSet;
private static final long DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS = 1000L; private static final long DEFAULT_DESCRIBE_STREAM_BACKOFF_MILLIS = 1000L;
private static final int DEFAULT_DESCRIBE_STREAM_RETRY_TIMES = 50; private static final int DEFAULT_DESCRIBE_STREAM_RETRY_TIMES = 50;
private final long describeStreamBackoffTimeInMillis; private final long describeStreamBackoffTimeInMillis;
@ -213,7 +215,7 @@ public class KinesisProxy implements IKinesisProxyExtended {
*/ */
@Override @Override
public Shard getShard(String shardId) { public Shard getShard(String shardId) {
if (this.listOfShardsSinceLastGet.get() == null) { if (this.listOfShardsSinceLastGet.get() == null || !shardIDSet.contains(shardId)) {
//Update this.listOfShardsSinceLastGet as needed. //Update this.listOfShardsSinceLastGet as needed.
this.getShardList(); this.getShardList();
} }
@ -234,6 +236,7 @@ public class KinesisProxy implements IKinesisProxyExtended {
@Override @Override
public synchronized List<Shard> getShardList() { public synchronized List<Shard> getShardList() {
shardIDSet = new HashSet<>();
DescribeStreamResult response; DescribeStreamResult response;
if (shardIterationState == null) { if (shardIterationState == null) {
shardIterationState = new ShardIterationState(); shardIterationState = new ShardIterationState();
@ -255,6 +258,11 @@ public class KinesisProxy implements IKinesisProxyExtended {
this.listOfShardsSinceLastGet.set(shardIterationState.getShards()); this.listOfShardsSinceLastGet.set(shardIterationState.getShards());
shardIterationState = new ShardIterationState(); shardIterationState = new ShardIterationState();
for(Shard shard : listOfShardsSinceLastGet.get() ){
shardIDSet.add(shard.getShardId());
}
return listOfShardsSinceLastGet.get(); return listOfShardsSinceLastGet.get();
} }
@ -269,7 +277,7 @@ public class KinesisProxy implements IKinesisProxyExtended {
} else { } else {
Set<String> shardIds = new HashSet<String>(); Set<String> shardIds = new HashSet<String>();
for (Shard shard : getShardList()) { for (Shard shard : shards) {
shardIds.add(shard.getShardId()); shardIds.add(shard.getShardId());
} }