Removing shard cache

This commit is contained in:
Joshua Kim 2021-04-29 13:50:34 -07:00
parent c5bb0d6bcd
commit 572073f0c4

View file

@ -82,8 +82,6 @@ public class KinesisProxy implements IKinesisProxyExtended {
private AmazonKinesis client; private AmazonKinesis client;
private AWSCredentialsProvider credentialsProvider; private AWSCredentialsProvider credentialsProvider;
private ShardIterationState shardIterationState = null;
@Setter(AccessLevel.PACKAGE) @Setter(AccessLevel.PACKAGE)
private volatile Map<String, Shard> cachedShardMap = null; private volatile Map<String, Shard> cachedShardMap = null;
@Setter(AccessLevel.PACKAGE) @Setter(AccessLevel.PACKAGE)
@ -442,9 +440,7 @@ public class KinesisProxy implements IKinesisProxyExtended {
*/ */
@Override @Override
public synchronized List<Shard> getShardListWithFilter(ShardFilter shardFilter) { public synchronized List<Shard> getShardListWithFilter(ShardFilter shardFilter) {
if (shardIterationState == null) { final List<Shard> shards = new ArrayList<>();
shardIterationState = new ShardIterationState();
}
if (isKinesisClient) { if (isKinesisClient) {
ListShardsResult result; ListShardsResult result;
@ -460,16 +456,17 @@ public class KinesisProxy implements IKinesisProxyExtended {
*/ */
return null; return null;
} else { } else {
shardIterationState.update(result.getShards()); shards.addAll(result.getShards());
nextToken = result.getNextToken(); nextToken = result.getNextToken();
} }
} while (StringUtils.isNotEmpty(result.getNextToken())); } while (StringUtils.isNotEmpty(result.getNextToken()));
} else { } else {
DescribeStreamResult response; DescribeStreamResult response;
String lastShardId = null;
do { do {
response = getStreamInfo(shardIterationState.getLastShardId()); response = getStreamInfo(lastShardId);
if (response == null) { if (response == null) {
/* /*
@ -478,15 +475,19 @@ public class KinesisProxy implements IKinesisProxyExtended {
*/ */
return null; return null;
} else { } else {
shardIterationState.update(response.getStreamDescription().getShards()); final List<Shard> pageOfShards = response.getStreamDescription().getShards();
shards.addAll(pageOfShards);
final Shard lastShard = pageOfShards.get(pageOfShards.size() - 1);
if (lastShardId == null || lastShardId.compareTo(lastShard.getShardId()) < 0) {
lastShardId = lastShard.getShardId();
}
} }
} while (response.getStreamDescription().isHasMoreShards()); } while (response.getStreamDescription().isHasMoreShards());
} }
List<Shard> shards = shardIterationState.getShards();
this.cachedShardMap = shards.stream().collect(Collectors.toMap(Shard::getShardId, Function.identity())); this.cachedShardMap = shards.stream().collect(Collectors.toMap(Shard::getShardId, Function.identity()));
this.lastCacheUpdateTime = Instant.now(); this.lastCacheUpdateTime = Instant.now();
shardIterationState = new ShardIterationState();
return shards; return shards;
} }
@ -617,27 +618,4 @@ public class KinesisProxy implements IKinesisProxyExtended {
final PutRecordResult response = client.putRecord(putRecordRequest); final PutRecordResult response = client.putRecord(putRecordRequest);
return response; return response;
} }
@Data
static class ShardIterationState {
private List<Shard> shards;
private String lastShardId;
public ShardIterationState() {
shards = new ArrayList<>();
}
public void update(List<Shard> shards) {
if (shards == null || shards.isEmpty()) {
return;
}
this.shards.addAll(shards);
Shard lastShard = shards.get(shards.size() - 1);
if (lastShardId == null || lastShardId.compareTo(lastShard.getShardId()) < 0) {
lastShardId = lastShard.getShardId();
}
}
}
} }