From 572073f0c4b080344d81b71e42e707678381152d Mon Sep 17 00:00:00 2001 From: Joshua Kim Date: Thu, 29 Apr 2021 13:50:34 -0700 Subject: [PATCH] Removing shard cache --- .../clientlibrary/proxies/KinesisProxy.java | 44 +++++-------------- 1 file changed, 11 insertions(+), 33 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java index c1d7f10d..3c890ed4 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/proxies/KinesisProxy.java @@ -82,8 +82,6 @@ public class KinesisProxy implements IKinesisProxyExtended { private AmazonKinesis client; private AWSCredentialsProvider credentialsProvider; - private ShardIterationState shardIterationState = null; - @Setter(AccessLevel.PACKAGE) private volatile Map cachedShardMap = null; @Setter(AccessLevel.PACKAGE) @@ -442,9 +440,7 @@ public class KinesisProxy implements IKinesisProxyExtended { */ @Override public synchronized List getShardListWithFilter(ShardFilter shardFilter) { - if (shardIterationState == null) { - shardIterationState = new ShardIterationState(); - } + final List shards = new ArrayList<>(); if (isKinesisClient) { ListShardsResult result; @@ -460,16 +456,17 @@ public class KinesisProxy implements IKinesisProxyExtended { */ return null; } else { - shardIterationState.update(result.getShards()); + shards.addAll(result.getShards()); nextToken = result.getNextToken(); } } while (StringUtils.isNotEmpty(result.getNextToken())); } else { DescribeStreamResult response; + String lastShardId = null; do { - response = getStreamInfo(shardIterationState.getLastShardId()); + response = getStreamInfo(lastShardId); if (response == null) { /* @@ -478,15 +475,19 @@ public class KinesisProxy implements IKinesisProxyExtended { */ return null; } else { - shardIterationState.update(response.getStreamDescription().getShards()); + final List pageOfShards = response.getStreamDescription().getShards(); + shards.addAll(pageOfShards); + + final Shard lastShard = pageOfShards.get(pageOfShards.size() - 1); + if (lastShardId == null || lastShardId.compareTo(lastShard.getShardId()) < 0) { + lastShardId = lastShard.getShardId(); + } } } while (response.getStreamDescription().isHasMoreShards()); } - List shards = shardIterationState.getShards(); this.cachedShardMap = shards.stream().collect(Collectors.toMap(Shard::getShardId, Function.identity())); this.lastCacheUpdateTime = Instant.now(); - shardIterationState = new ShardIterationState(); return shards; } @@ -617,27 +618,4 @@ public class KinesisProxy implements IKinesisProxyExtended { final PutRecordResult response = client.putRecord(putRecordRequest); return response; } - - @Data - static class ShardIterationState { - - private List shards; - private String lastShardId; - - public ShardIterationState() { - shards = new ArrayList<>(); - } - - public void update(List shards) { - if (shards == null || shards.isEmpty()) { - return; - } - this.shards.addAll(shards); - Shard lastShard = shards.get(shards.size() - 1); - if (lastShardId == null || lastShardId.compareTo(lastShard.getShardId()) < 0) { - lastShardId = lastShard.getShardId(); - } - } - } - }