Exposing interface for getting child shards (#81)

* Exposing interface for getting child shards

* javadocs

* more java docs

* Addressing comments

* Tuning random max range

Co-authored-by: Joshua Kim <kimjos@amazon.com>
This commit is contained in:
Joshua Kim 2020-07-17 18:32:50 -04:00 committed by GitHub
parent c68ab705bd
commit ff703459e1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 96 additions and 83 deletions

View file

@ -35,6 +35,11 @@ import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.ChildShard;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.kinesis.model.KinesisException; import software.amazon.awssdk.services.kinesis.model.KinesisException;
import software.amazon.awssdk.services.kinesis.model.LimitExceededException; import software.amazon.awssdk.services.kinesis.model.LimitExceededException;
import software.amazon.awssdk.services.kinesis.model.ListShardsRequest; import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
@ -43,6 +48,7 @@ import software.amazon.awssdk.services.kinesis.model.ResourceInUseException;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.awssdk.services.kinesis.model.ShardFilter; import software.amazon.awssdk.services.kinesis.model.ShardFilter;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.FutureUtils; import software.amazon.kinesis.common.FutureUtils;
@ -262,4 +268,25 @@ public class KinesisShardDetector implements ShardDetector {
ExecutionException, TimeoutException, InterruptedException { ExecutionException, TimeoutException, InterruptedException {
return FutureUtils.resolveOrCancelFuture(kinesisClient.listShards(request), kinesisRequestTimeout); return FutureUtils.resolveOrCancelFuture(kinesisClient.listShards(request), kinesisRequestTimeout);
} }
@Override
public List<ChildShard> getChildShards(final String shardId) throws InterruptedException, ExecutionException, TimeoutException {
final GetShardIteratorRequest getShardIteratorRequest = KinesisRequestsBuilder.getShardIteratorRequestBuilder()
.streamName(streamIdentifier.streamName())
.shardIteratorType(ShardIteratorType.LATEST)
.shardId(shardId)
.build();
final GetShardIteratorResponse getShardIteratorResponse =
FutureUtils.resolveOrCancelFuture(kinesisClient.getShardIterator(getShardIteratorRequest), kinesisRequestTimeout);
final GetRecordsRequest getRecordsRequest = KinesisRequestsBuilder.getRecordsRequestBuilder()
.shardIterator(getShardIteratorResponse.shardIterator())
.build();
final GetRecordsResponse getRecordsResponse =
FutureUtils.resolveOrCancelFuture(kinesisClient.getRecords(getRecordsRequest), kinesisRequestTimeout);
return getRecordsResponse.childShards();
}
} }

View file

@ -24,16 +24,8 @@ import lombok.RequiredArgsConstructor;
import lombok.Value; import lombok.Value;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.awssdk.utils.CollectionUtils;
import software.amazon.kinesis.common.FutureUtils;
import software.amazon.kinesis.common.KinesisRequestsBuilder;
import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.LeasePendingDeletion; import software.amazon.kinesis.leases.exceptions.LeasePendingDeletion;
@ -43,8 +35,6 @@ import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.retrieval.AWSExceptionManager; import software.amazon.kinesis.retrieval.AWSExceptionManager;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import java.time.Duration;
import java.util.Collections;
import java.util.HashSet; import java.util.HashSet;
import java.util.Objects; import java.util.Objects;
import java.util.Optional; import java.util.Optional;
@ -69,12 +59,8 @@ public class LeaseCleanupManager {
@NonNull @NonNull
private final LeaseCoordinator leaseCoordinator; private final LeaseCoordinator leaseCoordinator;
@NonNull @NonNull
private final KinesisAsyncClient kinesisClient;
@NonNull
private final MetricsFactory metricsFactory; private final MetricsFactory metricsFactory;
@NonNull @NonNull
private final Duration maxFutureWait;
@NonNull
private final ScheduledExecutorService deletionThreadPool; private final ScheduledExecutorService deletionThreadPool;
private final boolean cleanupLeasesUponShardCompletion; private final boolean cleanupLeasesUponShardCompletion;
private final long leaseCleanupIntervalMillis; private final long leaseCleanupIntervalMillis;
@ -85,7 +71,6 @@ public class LeaseCleanupManager {
private final Queue<LeasePendingDeletion> deletionQueue = new ConcurrentLinkedQueue<>(); private final Queue<LeasePendingDeletion> deletionQueue = new ConcurrentLinkedQueue<>();
private static final int MAX_RECORDS = 1;
private static final long INITIAL_DELAY = 0L; private static final long INITIAL_DELAY = 0L;
@Getter @Getter
@ -170,7 +155,7 @@ public class LeaseCleanupManager {
Set<String> childShardKeys = leaseFromDDB.childShardIds(); Set<String> childShardKeys = leaseFromDDB.childShardIds();
if (CollectionUtils.isNullOrEmpty(childShardKeys)) { if (CollectionUtils.isNullOrEmpty(childShardKeys)) {
try { try {
childShardKeys = getChildShardsFromService(shardInfo, streamIdentifier); childShardKeys = leasePendingDeletion.getChildShardsFromService();
if (CollectionUtils.isNullOrEmpty(childShardKeys)) { if (CollectionUtils.isNullOrEmpty(childShardKeys)) {
log.error( log.error(
@ -203,7 +188,7 @@ public class LeaseCleanupManager {
if (!alreadyCheckedForGarbageCollection && timeToCheckForGarbageShard) { if (!alreadyCheckedForGarbageCollection && timeToCheckForGarbageShard) {
try { try {
wereChildShardsPresent = !CollectionUtils wereChildShardsPresent = !CollectionUtils
.isNullOrEmpty(getChildShardsFromService(shardInfo, streamIdentifier)); .isNullOrEmpty(leasePendingDeletion.getChildShardsFromService());
} catch (ExecutionException e) { } catch (ExecutionException e) {
throw exceptionManager.apply(e.getCause()); throw exceptionManager.apply(e.getCause());
} }
@ -217,29 +202,6 @@ public class LeaseCleanupManager {
wasResourceNotFound); wasResourceNotFound);
} }
private Set<String> getChildShardsFromService(ShardInfo shardInfo, StreamIdentifier streamIdentifier)
throws InterruptedException, ExecutionException, TimeoutException {
final GetShardIteratorRequest getShardIteratorRequest = KinesisRequestsBuilder.getShardIteratorRequestBuilder()
.streamName(streamIdentifier.streamName())
.shardIteratorType(ShardIteratorType.LATEST)
.shardId(shardInfo.shardId())
.build();
final GetShardIteratorResponse getShardIteratorResponse =
FutureUtils.resolveOrCancelFuture(kinesisClient.getShardIterator(getShardIteratorRequest), maxFutureWait);
final GetRecordsRequest getRecordsRequest = KinesisRequestsBuilder.getRecordsRequestBuilder()
.shardIterator(getShardIteratorResponse.shardIterator())
.limit(MAX_RECORDS)
.build();
final GetRecordsResponse getRecordsResponse =
FutureUtils.resolveOrCancelFuture(kinesisClient.getRecords(getRecordsRequest), maxFutureWait);
return getRecordsResponse.childShards().stream().map(c -> c.shardId()).collect(Collectors.toSet());
}
// A lease that ended with SHARD_END from ResourceNotFoundException is safe to delete if it no longer exists in the // A lease that ended with SHARD_END from ResourceNotFoundException is safe to delete if it no longer exists in the
// stream (known explicitly from ResourceNotFound being thrown when processing this shard), // stream (known explicitly from ResourceNotFound being thrown when processing this shard),
private boolean cleanupLeaseForGarbageShard(Lease lease, Throwable e) throws DependencyException, ProvisionedThroughputException, InvalidStateException { private boolean cleanupLeaseForGarbageShard(Lease lease, Throwable e) throws DependencyException, ProvisionedThroughputException, InvalidStateException {

View file

@ -16,6 +16,10 @@
package software.amazon.kinesis.leases; package software.amazon.kinesis.leases;
import java.util.List; import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import software.amazon.awssdk.services.kinesis.model.ChildShard;
import software.amazon.awssdk.services.kinesis.model.ListShardsRequest; import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
import software.amazon.awssdk.services.kinesis.model.ListShardsResponse; import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.services.kinesis.model.Shard;
@ -48,7 +52,9 @@ public interface ShardDetector {
* @param ShardFilter * @param ShardFilter
* @return Shards * @return Shards
*/ */
List<Shard> listShardsWithFilter(ShardFilter shardFilter); default List<Shard> listShardsWithFilter(ShardFilter shardFilter) {
throw new UnsupportedOperationException("listShardsWithFilter not available.");
}
/** /**
* Gets stream identifier. * Gets stream identifier.
@ -65,5 +71,19 @@ public interface ShardDetector {
* @param request list shards request * @param request list shards request
* @return ListShardsResponse which contains list shards response * @return ListShardsResponse which contains list shards response
*/ */
ListShardsResponse getListShardsResponse(ListShardsRequest request) throws Exception; default ListShardsResponse getListShardsResponse(ListShardsRequest request) throws Exception {
throw new UnsupportedOperationException("getListShardsResponse not available.");
}
/**
* Gets the children shards of a shard.
* @param shardId
* @return
* @throws InterruptedException
* @throws ExecutionException
* @throws TimeoutException
*/
default List<ChildShard> getChildShards(String shardId) throws InterruptedException, ExecutionException, TimeoutException {
throw new UnsupportedOperationException("getChildShards not available.");
}
} }

View file

@ -551,8 +551,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
*/ */
@Override @Override
public LeaseCleanupManager createLeaseCleanupManager(MetricsFactory metricsFactory) { public LeaseCleanupManager createLeaseCleanupManager(MetricsFactory metricsFactory) {
return new LeaseCleanupManager(createLeaseCoordinator(metricsFactory), kinesisClient, return new LeaseCleanupManager(createLeaseCoordinator(metricsFactory),
metricsFactory, dynamoDbRequestTimeout, Executors.newSingleThreadScheduledExecutor(), metricsFactory, Executors.newSingleThreadScheduledExecutor(),
cleanupLeasesUponShardCompletion, leaseCleanupConfig.leaseCleanupIntervalMillis(), cleanupLeasesUponShardCompletion, leaseCleanupConfig.leaseCleanupIntervalMillis(),
leaseCleanupConfig.completedLeaseCleanupIntervalMillis(), leaseCleanupConfig.completedLeaseCleanupIntervalMillis(),
leaseCleanupConfig.garbageLeaseCleanupIntervalMillis()); leaseCleanupConfig.garbageLeaseCleanupIntervalMillis());

View file

@ -20,8 +20,14 @@ import lombok.Value;
import lombok.experimental.Accessors; import lombok.experimental.Accessors;
import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardInfo;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
/** /**
* Helper class for cleaning up leases. * Helper class for cleaning up leases.
*/ */
@ -32,4 +38,16 @@ public class LeasePendingDeletion {
private final StreamIdentifier streamIdentifier; private final StreamIdentifier streamIdentifier;
private final Lease lease; private final Lease lease;
private final ShardInfo shardInfo; private final ShardInfo shardInfo;
private final ShardDetector shardDetector;
/**
* Discovers the child shards for this lease.
* @return
* @throws InterruptedException
* @throws ExecutionException
* @throws TimeoutException
*/
public Set<String> getChildShardsFromService() throws InterruptedException, ExecutionException, TimeoutException {
return shardDetector.getChildShards(shardInfo.shardId()).stream().map(c -> c.shardId()).collect(Collectors.toSet());
}
} }

View file

@ -67,7 +67,7 @@ public class ShutdownTask implements ConsumerTask {
private static final String SHUTDOWN_TASK_OPERATION = "ShutdownTask"; private static final String SHUTDOWN_TASK_OPERATION = "ShutdownTask";
private static final String RECORD_PROCESSOR_SHUTDOWN_METRIC = "RecordProcessor.shutdown"; private static final String RECORD_PROCESSOR_SHUTDOWN_METRIC = "RecordProcessor.shutdown";
@VisibleForTesting @VisibleForTesting
static final int RETRY_RANDOM_MAX_RANGE = 10; static final int RETRY_RANDOM_MAX_RANGE = 30;
@NonNull @NonNull
private final ShardInfo shardInfo; private final ShardInfo shardInfo;
@ -185,7 +185,7 @@ public class ShutdownTask implements ConsumerTask {
updateLeaseWithChildShards(currentShardLease); updateLeaseWithChildShards(currentShardLease);
} }
final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(streamIdentifier, currentShardLease, final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(streamIdentifier, currentShardLease,
shardInfo); shardInfo, shardDetector);
if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) { if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) {
boolean isSuccess = false; boolean isSuccess = false;
try { try {

View file

@ -15,14 +15,6 @@
package software.amazon.kinesis.leases; package software.amazon.kinesis.leases;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule; import org.junit.Rule;
@ -38,6 +30,16 @@ import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
import software.amazon.awssdk.services.kinesis.model.ResourceInUseException; import software.amazon.awssdk.services.kinesis.model.ResourceInUseException;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.services.kinesis.model.Shard;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.isA; import static org.hamcrest.CoreMatchers.isA;
import static org.hamcrest.CoreMatchers.nullValue; import static org.hamcrest.CoreMatchers.nullValue;

View file

@ -15,18 +15,12 @@
package software.amazon.kinesis.leases; package software.amazon.kinesis.leases;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner; import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.model.ChildShard; import software.amazon.awssdk.services.kinesis.model.ChildShard;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse;
import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException;
import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.exceptions.LeasePendingDeletion; import software.amazon.kinesis.leases.exceptions.LeasePendingDeletion;
@ -39,7 +33,6 @@ import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -73,7 +66,7 @@ public class LeaseCleanupManagerTest {
@Mock @Mock
private LeaseCoordinator leaseCoordinator; private LeaseCoordinator leaseCoordinator;
@Mock @Mock
private KinesisAsyncClient kinesis; private ShardDetector shardDetector;
@Mock @Mock
private ScheduledExecutorService deletionThreadPool; private ScheduledExecutorService deletionThreadPool;
@ -82,9 +75,9 @@ public class LeaseCleanupManagerTest {
shardInfo = new ShardInfo(shardId, concurrencyToken, Collections.emptySet(), shardInfo = new ShardInfo(shardId, concurrencyToken, Collections.emptySet(),
ExtendedSequenceNumber.LATEST); ExtendedSequenceNumber.LATEST);
streamIdentifier = StreamIdentifier.singleStreamInstance("streamName"); streamIdentifier = StreamIdentifier.singleStreamInstance("streamName");
leaseCleanupManager = new LeaseCleanupManager(leaseCoordinator, kinesis, leaseCleanupManager = new LeaseCleanupManager(leaseCoordinator, NULL_METRICS_FACTORY, deletionThreadPool,
NULL_METRICS_FACTORY, maxFutureWait, deletionThreadPool, cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis, cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis, completedLeaseCleanupIntervalMillis,
completedLeaseCleanupIntervalMillis, garbageLeaseCleanupIntervalMillis); garbageLeaseCleanupIntervalMillis);
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher); when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
when(leaseCoordinator.updateLease(any(Lease.class), any(UUID.class), any(String.class), any(String.class))).thenReturn(true); when(leaseCoordinator.updateLease(any(Lease.class), any(UUID.class), any(String.class), any(String.class))).thenReturn(true);
@ -124,8 +117,8 @@ public class LeaseCleanupManagerTest {
ExtendedSequenceNumber.LATEST); ExtendedSequenceNumber.LATEST);
cleanupLeasesOfCompletedShards = false; cleanupLeasesOfCompletedShards = false;
leaseCleanupManager = new LeaseCleanupManager(leaseCoordinator, kinesis, NULL_METRICS_FACTORY, maxFutureWait, leaseCleanupManager = new LeaseCleanupManager(leaseCoordinator, NULL_METRICS_FACTORY, deletionThreadPool,
deletionThreadPool, cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis, completedLeaseCleanupIntervalMillis, cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis, completedLeaseCleanupIntervalMillis,
garbageLeaseCleanupIntervalMillis); garbageLeaseCleanupIntervalMillis);
verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShardsForSplit(), ExtendedSequenceNumber.LATEST, 0); verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShardsForSplit(), ExtendedSequenceNumber.LATEST, 0);
@ -206,8 +199,8 @@ public class LeaseCleanupManagerTest {
cleanupLeasesOfCompletedShards = false; cleanupLeasesOfCompletedShards = false;
leaseCleanupManager = new LeaseCleanupManager(leaseCoordinator, kinesis, NULL_METRICS_FACTORY, maxFutureWait, leaseCleanupManager = new LeaseCleanupManager(leaseCoordinator, NULL_METRICS_FACTORY, deletionThreadPool,
deletionThreadPool, cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis, completedLeaseCleanupIntervalMillis, cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis, completedLeaseCleanupIntervalMillis,
garbageLeaseCleanupIntervalMillis); garbageLeaseCleanupIntervalMillis);
testLeaseDeletedWhenShardDoesNotExist(heldLease); testLeaseDeletedWhenShardDoesNotExist(heldLease);
@ -216,12 +209,13 @@ public class LeaseCleanupManagerTest {
public void testLeaseDeletedWhenShardDoesNotExist(Lease heldLease) throws Exception { public void testLeaseDeletedWhenShardDoesNotExist(Lease heldLease) throws Exception {
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher); when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.shardId())).thenReturn(heldLease); when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.shardId())).thenReturn(heldLease);
when(kinesis.getShardIterator(any(GetShardIteratorRequest.class))).thenThrow(ResourceNotFoundException.class); when(shardDetector.getChildShards(any(String.class))).thenThrow(ResourceNotFoundException.class);
when(leaseRefresher.getLease(heldLease.leaseKey())).thenReturn(heldLease); when(leaseRefresher.getLease(heldLease.leaseKey())).thenReturn(heldLease);
leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, heldLease, shardInfo)); leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, heldLease, shardInfo, shardDetector));
leaseCleanupManager.cleanupLeases(); leaseCleanupManager.cleanupLeases();
verify(shardDetector, times(1)).getChildShards(shardInfo.shardId());
verify(leaseRefresher, times(1)).deleteLease(heldLease); verify(leaseRefresher, times(1)).deleteLease(heldLease);
} }
@ -256,20 +250,10 @@ public class LeaseCleanupManagerTest {
} }
} }
GetShardIteratorResponse getShardIteratorResponse = GetShardIteratorResponse.builder() leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, lease, shardInfo, shardDetector));
.shardIterator("123")
.build();
when(kinesis.getShardIterator(any(GetShardIteratorRequest.class))).thenReturn(CompletableFuture.completedFuture(getShardIteratorResponse));
GetRecordsResponse getRecordsResponse = GetRecordsResponse.builder()
.records(Collections.emptyList())
.childShards(childShards)
.build();
when(kinesis.getRecords(any(GetRecordsRequest.class))).thenReturn(CompletableFuture.completedFuture(getRecordsResponse));
leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, lease, shardInfo));
leaseCleanupManager.cleanupLeases(); leaseCleanupManager.cleanupLeases();
verify(shardDetector, times(1)).getChildShards(shardInfo.shardId());
verify(leaseRefresher, times(expectedDeletedLeases)).deleteLease(any(Lease.class)); verify(leaseRefresher, times(expectedDeletedLeases)).deleteLease(any(Lease.class));
} }