Change Shard Iterator State Reset to Recreation

Changed the shard iterator reset to simply recreate the iterator.
This commit is contained in:
Pfifer, Justin 2017-03-27 07:58:28 -07:00 committed by Justin Pfifer
parent e45f59c73b
commit fb4270e98b

View file

@ -252,9 +252,10 @@ public class KinesisProxy implements IKinesisProxyExtended {
shardIterationState.update(response.getStreamDescription().getShards()); shardIterationState.update(response.getStreamDescription().getShards());
} }
} while (response.getStreamDescription().isHasMoreShards()); } while (response.getStreamDescription().isHasMoreShards());
this.listOfShardsSinceLastGet.set(shardIterationState.getCollected()); this.listOfShardsSinceLastGet.set(shardIterationState.getShards());
return shardIterationState.getAndReset(); shardIterationState = new ShardIterationState();
return listOfShardsSinceLastGet.get();
} }
/** /**
@ -359,30 +360,23 @@ public class KinesisProxy implements IKinesisProxyExtended {
@Data @Data
static class ShardIterationState { static class ShardIterationState {
private List<Shard> collected; private List<Shard> shards;
private String lastShardId; private String lastShardId;
public ShardIterationState() { public ShardIterationState() {
collected = new ArrayList<>(); shards = new ArrayList<>();
} }
public void update(List<Shard> shards) { public void update(List<Shard> shards) {
if (shards == null || shards.isEmpty()) { if (shards == null || shards.isEmpty()) {
return; return;
} }
collected.addAll(shards); this.shards.addAll(shards);
Shard lastShard = shards.get(shards.size() - 1); Shard lastShard = shards.get(shards.size() - 1);
if (lastShardId == null || lastShardId.compareTo(lastShard.getShardId()) < 0) { if (lastShardId == null || lastShardId.compareTo(lastShard.getShardId()) < 0) {
lastShardId = lastShard.getShardId(); lastShardId = lastShard.getShardId();
} }
} }
public List<Shard> getAndReset() {
List<Shard> result = collected;
collected = new ArrayList<>();
lastShardId = null;
return result;
}
} }
} }