diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java index a29c5ce4..9eb23c85 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java @@ -197,7 +197,7 @@ public class KinesisShardDetector implements ShardDetector { try { try { - result = FutureUtils.resolveOrCancelFuture(kinesisClient.listShards(request.build()), kinesisRequestTimeout); + result = getListShardsResponse(request.build()); } catch (ExecutionException e) { throw exceptionManager.apply(e.getCause()); } catch (InterruptedException e) { @@ -248,4 +248,10 @@ public class KinesisShardDetector implements ShardDetector { log.debug("{}. Age doesn't exceed limit of {} seconds.", message, listShardsCacheAllowedAgeInSeconds); return false; } + + @Override + public ListShardsResponse getListShardsResponse(ListShardsRequest request) throws + ExecutionException, TimeoutException, InterruptedException { + return FutureUtils.resolveOrCancelFuture(kinesisClient.listShards(request), kinesisRequestTimeout); + } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java index 2967a9fb..9eb2d17b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java @@ -16,6 +16,8 @@ package software.amazon.kinesis.leases; import java.util.List; +import software.amazon.awssdk.services.kinesis.model.ListShardsRequest; +import software.amazon.awssdk.services.kinesis.model.ListShardsResponse; import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.services.kinesis.model.ShardFilter; import software.amazon.kinesis.common.StreamIdentifier; @@ -24,13 +26,44 @@ import software.amazon.kinesis.common.StreamIdentifier; * */ public interface ShardDetector { + + /** + * Gets shard based on shardId. + * + * @param shardId + * @return Shard + */ Shard shard(String shardId); + /** + * List shards. + * + * @return Shards + */ List listShards(); + /** + * List shards with shard filter. + * + * @param ShardFilter + * @return Shards + */ List listShardsWithFilter(ShardFilter shardFilter); + /** + * Gets stream identifier. + * + * @return StreamIdentifier + */ default StreamIdentifier streamIdentifier() { throw new UnsupportedOperationException("StreamName not available"); } + + /** + * Gets a list shards response based on the request. + * + * @param request list shards request + * @return ListShardsResponse which contains list shards response + */ + ListShardsResponse getListShardsResponse(ListShardsRequest request) throws Exception; }