Add getResponse override for ShardDetector (#19) (#21)

This commit is contained in:
Renju Radhakrishnan 2020-04-28 16:32:29 -07:00 committed by GitHub
parent 038524e0b1
commit f69e9cf3ba
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 40 additions and 1 deletions

View file

@ -197,7 +197,7 @@ public class KinesisShardDetector implements ShardDetector {
try {
try {
result = FutureUtils.resolveOrCancelFuture(kinesisClient.listShards(request.build()), kinesisRequestTimeout);
result = getListShardsResponse(request.build());
} catch (ExecutionException e) {
throw exceptionManager.apply(e.getCause());
} catch (InterruptedException e) {
@ -248,4 +248,10 @@ public class KinesisShardDetector implements ShardDetector {
log.debug("{}. Age doesn't exceed limit of {} seconds.", message, listShardsCacheAllowedAgeInSeconds);
return false;
}
@Override
public ListShardsResponse getListShardsResponse(ListShardsRequest request) throws
ExecutionException, TimeoutException, InterruptedException {
return FutureUtils.resolveOrCancelFuture(kinesisClient.listShards(request), kinesisRequestTimeout);
}
}

View file

@ -16,6 +16,8 @@
package software.amazon.kinesis.leases;
import java.util.List;
import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardFilter;
import software.amazon.kinesis.common.StreamIdentifier;
@ -24,13 +26,44 @@ import software.amazon.kinesis.common.StreamIdentifier;
*
*/
public interface ShardDetector {
/**
* Gets shard based on shardId.
*
* @param shardId
* @return Shard
*/
Shard shard(String shardId);
/**
* List shards.
*
* @return Shards
*/
List<Shard> listShards();
/**
* List shards with shard filter.
*
* @param ShardFilter
* @return Shards
*/
List<Shard> listShardsWithFilter(ShardFilter shardFilter);
/**
* Gets stream identifier.
*
* @return StreamIdentifier
*/
default StreamIdentifier streamIdentifier() {
throw new UnsupportedOperationException("StreamName not available");
}
/**
* Gets a list shards response based on the request.
*
* @param request list shards request
* @return ListShardsResponse which contains list shards response
*/
ListShardsResponse getListShardsResponse(ListShardsRequest request) throws Exception;
}