diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java index f5ef482e..96a0de6a 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/KinesisShardDetector.java @@ -35,6 +35,11 @@ import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; import org.apache.commons.lang3.StringUtils; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.model.ChildShard; +import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; +import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; +import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; +import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse; import software.amazon.awssdk.services.kinesis.model.KinesisException; import software.amazon.awssdk.services.kinesis.model.LimitExceededException; import software.amazon.awssdk.services.kinesis.model.ListShardsRequest; @@ -43,6 +48,7 @@ import software.amazon.awssdk.services.kinesis.model.ResourceInUseException; import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.awssdk.services.kinesis.model.ShardFilter; +import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.FutureUtils; @@ -262,4 +268,25 @@ public class KinesisShardDetector implements ShardDetector { ExecutionException, TimeoutException, InterruptedException { return FutureUtils.resolveOrCancelFuture(kinesisClient.listShards(request), kinesisRequestTimeout); } + + @Override + public List getChildShards(final String shardId) throws InterruptedException, ExecutionException, TimeoutException { + final GetShardIteratorRequest getShardIteratorRequest = KinesisRequestsBuilder.getShardIteratorRequestBuilder() + .streamName(streamIdentifier.streamName()) + .shardIteratorType(ShardIteratorType.LATEST) + .shardId(shardId) + .build(); + + final GetShardIteratorResponse getShardIteratorResponse = + FutureUtils.resolveOrCancelFuture(kinesisClient.getShardIterator(getShardIteratorRequest), kinesisRequestTimeout); + + final GetRecordsRequest getRecordsRequest = KinesisRequestsBuilder.getRecordsRequestBuilder() + .shardIterator(getShardIteratorResponse.shardIterator()) + .build(); + + final GetRecordsResponse getRecordsResponse = + FutureUtils.resolveOrCancelFuture(kinesisClient.getRecords(getRecordsRequest), kinesisRequestTimeout); + + return getRecordsResponse.childShards(); + } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java index 4b11b627..de734646 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java @@ -24,16 +24,8 @@ import lombok.RequiredArgsConstructor; import lombok.Value; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; -import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; -import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; -import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; -import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; -import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse; import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; -import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; import software.amazon.awssdk.utils.CollectionUtils; -import software.amazon.kinesis.common.FutureUtils; -import software.amazon.kinesis.common.KinesisRequestsBuilder; import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.LeasePendingDeletion; @@ -43,8 +35,6 @@ import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.retrieval.AWSExceptionManager; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; -import java.time.Duration; -import java.util.Collections; import java.util.HashSet; import java.util.Objects; import java.util.Optional; @@ -69,12 +59,8 @@ public class LeaseCleanupManager { @NonNull private final LeaseCoordinator leaseCoordinator; @NonNull - private final KinesisAsyncClient kinesisClient; - @NonNull private final MetricsFactory metricsFactory; @NonNull - private final Duration maxFutureWait; - @NonNull private final ScheduledExecutorService deletionThreadPool; private final boolean cleanupLeasesUponShardCompletion; private final long leaseCleanupIntervalMillis; @@ -85,7 +71,6 @@ public class LeaseCleanupManager { private final Queue deletionQueue = new ConcurrentLinkedQueue<>(); - private static final int MAX_RECORDS = 1; private static final long INITIAL_DELAY = 0L; @Getter @@ -170,7 +155,7 @@ public class LeaseCleanupManager { Set childShardKeys = leaseFromDDB.childShardIds(); if (CollectionUtils.isNullOrEmpty(childShardKeys)) { try { - childShardKeys = getChildShardsFromService(shardInfo, streamIdentifier); + childShardKeys = leasePendingDeletion.getChildShardsFromService(); if (CollectionUtils.isNullOrEmpty(childShardKeys)) { log.error( @@ -203,7 +188,7 @@ public class LeaseCleanupManager { if (!alreadyCheckedForGarbageCollection && timeToCheckForGarbageShard) { try { wereChildShardsPresent = !CollectionUtils - .isNullOrEmpty(getChildShardsFromService(shardInfo, streamIdentifier)); + .isNullOrEmpty(leasePendingDeletion.getChildShardsFromService()); } catch (ExecutionException e) { throw exceptionManager.apply(e.getCause()); } @@ -217,29 +202,6 @@ public class LeaseCleanupManager { wasResourceNotFound); } - private Set getChildShardsFromService(ShardInfo shardInfo, StreamIdentifier streamIdentifier) - throws InterruptedException, ExecutionException, TimeoutException { - final GetShardIteratorRequest getShardIteratorRequest = KinesisRequestsBuilder.getShardIteratorRequestBuilder() - .streamName(streamIdentifier.streamName()) - .shardIteratorType(ShardIteratorType.LATEST) - .shardId(shardInfo.shardId()) - .build(); - - final GetShardIteratorResponse getShardIteratorResponse = - FutureUtils.resolveOrCancelFuture(kinesisClient.getShardIterator(getShardIteratorRequest), maxFutureWait); - - final GetRecordsRequest getRecordsRequest = KinesisRequestsBuilder.getRecordsRequestBuilder() - .shardIterator(getShardIteratorResponse.shardIterator()) - .limit(MAX_RECORDS) - .build(); - - final GetRecordsResponse getRecordsResponse = - FutureUtils.resolveOrCancelFuture(kinesisClient.getRecords(getRecordsRequest), maxFutureWait); - - return getRecordsResponse.childShards().stream().map(c -> c.shardId()).collect(Collectors.toSet()); - } - - // A lease that ended with SHARD_END from ResourceNotFoundException is safe to delete if it no longer exists in the // stream (known explicitly from ResourceNotFound being thrown when processing this shard), private boolean cleanupLeaseForGarbageShard(Lease lease, Throwable e) throws DependencyException, ProvisionedThroughputException, InvalidStateException { diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java index 9eb2d17b..62b93855 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/ShardDetector.java @@ -16,6 +16,10 @@ package software.amazon.kinesis.leases; import java.util.List; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; + +import software.amazon.awssdk.services.kinesis.model.ChildShard; import software.amazon.awssdk.services.kinesis.model.ListShardsRequest; import software.amazon.awssdk.services.kinesis.model.ListShardsResponse; import software.amazon.awssdk.services.kinesis.model.Shard; @@ -48,7 +52,9 @@ public interface ShardDetector { * @param ShardFilter * @return Shards */ - List listShardsWithFilter(ShardFilter shardFilter); + default List listShardsWithFilter(ShardFilter shardFilter) { + throw new UnsupportedOperationException("listShardsWithFilter not available."); + } /** * Gets stream identifier. @@ -65,5 +71,19 @@ public interface ShardDetector { * @param request list shards request * @return ListShardsResponse which contains list shards response */ - ListShardsResponse getListShardsResponse(ListShardsRequest request) throws Exception; + default ListShardsResponse getListShardsResponse(ListShardsRequest request) throws Exception { + throw new UnsupportedOperationException("getListShardsResponse not available."); + } + + /** + * Gets the children shards of a shard. + * @param shardId + * @return + * @throws InterruptedException + * @throws ExecutionException + * @throws TimeoutException + */ + default List getChildShards(String shardId) throws InterruptedException, ExecutionException, TimeoutException { + throw new UnsupportedOperationException("getChildShards not available."); + } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java index 7d374de5..5102bc5e 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java @@ -551,8 +551,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { */ @Override public LeaseCleanupManager createLeaseCleanupManager(MetricsFactory metricsFactory) { - return new LeaseCleanupManager(createLeaseCoordinator(metricsFactory), kinesisClient, - metricsFactory, dynamoDbRequestTimeout, Executors.newSingleThreadScheduledExecutor(), + return new LeaseCleanupManager(createLeaseCoordinator(metricsFactory), + metricsFactory, Executors.newSingleThreadScheduledExecutor(), cleanupLeasesUponShardCompletion, leaseCleanupConfig.leaseCleanupIntervalMillis(), leaseCleanupConfig.completedLeaseCleanupIntervalMillis(), leaseCleanupConfig.garbageLeaseCleanupIntervalMillis()); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/LeasePendingDeletion.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/LeasePendingDeletion.java index b840eb09..2d3d0c2f 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/LeasePendingDeletion.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/LeasePendingDeletion.java @@ -20,8 +20,14 @@ import lombok.Value; import lombok.experimental.Accessors; import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.leases.Lease; +import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardInfo; +import java.util.Set; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + /** * Helper class for cleaning up leases. */ @@ -32,4 +38,16 @@ public class LeasePendingDeletion { private final StreamIdentifier streamIdentifier; private final Lease lease; private final ShardInfo shardInfo; + private final ShardDetector shardDetector; + + /** + * Discovers the child shards for this lease. + * @return + * @throws InterruptedException + * @throws ExecutionException + * @throws TimeoutException + */ + public Set getChildShardsFromService() throws InterruptedException, ExecutionException, TimeoutException { + return shardDetector.getChildShards(shardInfo.shardId()).stream().map(c -> c.shardId()).collect(Collectors.toSet()); + } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index 6b4d1839..c2c5c790 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -67,7 +67,7 @@ public class ShutdownTask implements ConsumerTask { private static final String SHUTDOWN_TASK_OPERATION = "ShutdownTask"; private static final String RECORD_PROCESSOR_SHUTDOWN_METRIC = "RecordProcessor.shutdown"; @VisibleForTesting - static final int RETRY_RANDOM_MAX_RANGE = 10; + static final int RETRY_RANDOM_MAX_RANGE = 30; @NonNull private final ShardInfo shardInfo; @@ -185,7 +185,7 @@ public class ShutdownTask implements ConsumerTask { updateLeaseWithChildShards(currentShardLease); } final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(streamIdentifier, currentShardLease, - shardInfo); + shardInfo, shardDetector); if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) { boolean isSuccess = false; try { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/KinesisShardDetectorTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/KinesisShardDetectorTest.java index 68bb7d97..d0870d51 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/KinesisShardDetectorTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/KinesisShardDetectorTest.java @@ -15,14 +15,6 @@ package software.amazon.kinesis.leases; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.stream.Collectors; -import java.util.stream.IntStream; import org.junit.Assert; import org.junit.Before; import org.junit.Rule; @@ -38,6 +30,16 @@ import software.amazon.awssdk.services.kinesis.model.ListShardsResponse; import software.amazon.awssdk.services.kinesis.model.ResourceInUseException; import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; import software.amazon.awssdk.services.kinesis.model.Shard; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.isA; import static org.hamcrest.CoreMatchers.nullValue; diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCleanupManagerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCleanupManagerTest.java index d02ced04..e9d237f9 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCleanupManagerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCleanupManagerTest.java @@ -15,18 +15,12 @@ package software.amazon.kinesis.leases; -import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; -import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.ChildShard; -import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; -import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; -import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; -import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse; import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.leases.exceptions.LeasePendingDeletion; @@ -39,7 +33,6 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.UUID; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ScheduledExecutorService; import java.util.stream.Collectors; @@ -73,7 +66,7 @@ public class LeaseCleanupManagerTest { @Mock private LeaseCoordinator leaseCoordinator; @Mock - private KinesisAsyncClient kinesis; + private ShardDetector shardDetector; @Mock private ScheduledExecutorService deletionThreadPool; @@ -82,9 +75,9 @@ public class LeaseCleanupManagerTest { shardInfo = new ShardInfo(shardId, concurrencyToken, Collections.emptySet(), ExtendedSequenceNumber.LATEST); streamIdentifier = StreamIdentifier.singleStreamInstance("streamName"); - leaseCleanupManager = new LeaseCleanupManager(leaseCoordinator, kinesis, - NULL_METRICS_FACTORY, maxFutureWait, deletionThreadPool, cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis, - completedLeaseCleanupIntervalMillis, garbageLeaseCleanupIntervalMillis); + leaseCleanupManager = new LeaseCleanupManager(leaseCoordinator, NULL_METRICS_FACTORY, deletionThreadPool, + cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis, completedLeaseCleanupIntervalMillis, + garbageLeaseCleanupIntervalMillis); when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher); when(leaseCoordinator.updateLease(any(Lease.class), any(UUID.class), any(String.class), any(String.class))).thenReturn(true); @@ -124,8 +117,8 @@ public class LeaseCleanupManagerTest { ExtendedSequenceNumber.LATEST); cleanupLeasesOfCompletedShards = false; - leaseCleanupManager = new LeaseCleanupManager(leaseCoordinator, kinesis, NULL_METRICS_FACTORY, maxFutureWait, - deletionThreadPool, cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis, completedLeaseCleanupIntervalMillis, + leaseCleanupManager = new LeaseCleanupManager(leaseCoordinator, NULL_METRICS_FACTORY, deletionThreadPool, + cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis, completedLeaseCleanupIntervalMillis, garbageLeaseCleanupIntervalMillis); verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShardsForSplit(), ExtendedSequenceNumber.LATEST, 0); @@ -206,8 +199,8 @@ public class LeaseCleanupManagerTest { cleanupLeasesOfCompletedShards = false; - leaseCleanupManager = new LeaseCleanupManager(leaseCoordinator, kinesis, NULL_METRICS_FACTORY, maxFutureWait, - deletionThreadPool, cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis, completedLeaseCleanupIntervalMillis, + leaseCleanupManager = new LeaseCleanupManager(leaseCoordinator, NULL_METRICS_FACTORY, deletionThreadPool, + cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis, completedLeaseCleanupIntervalMillis, garbageLeaseCleanupIntervalMillis); testLeaseDeletedWhenShardDoesNotExist(heldLease); @@ -216,12 +209,13 @@ public class LeaseCleanupManagerTest { public void testLeaseDeletedWhenShardDoesNotExist(Lease heldLease) throws Exception { when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher); when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.shardId())).thenReturn(heldLease); - when(kinesis.getShardIterator(any(GetShardIteratorRequest.class))).thenThrow(ResourceNotFoundException.class); + when(shardDetector.getChildShards(any(String.class))).thenThrow(ResourceNotFoundException.class); when(leaseRefresher.getLease(heldLease.leaseKey())).thenReturn(heldLease); - leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, heldLease, shardInfo)); + leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, heldLease, shardInfo, shardDetector)); leaseCleanupManager.cleanupLeases(); + verify(shardDetector, times(1)).getChildShards(shardInfo.shardId()); verify(leaseRefresher, times(1)).deleteLease(heldLease); } @@ -256,20 +250,10 @@ public class LeaseCleanupManagerTest { } } - GetShardIteratorResponse getShardIteratorResponse = GetShardIteratorResponse.builder() - .shardIterator("123") - .build(); - when(kinesis.getShardIterator(any(GetShardIteratorRequest.class))).thenReturn(CompletableFuture.completedFuture(getShardIteratorResponse)); - - GetRecordsResponse getRecordsResponse = GetRecordsResponse.builder() - .records(Collections.emptyList()) - .childShards(childShards) - .build(); - when(kinesis.getRecords(any(GetRecordsRequest.class))).thenReturn(CompletableFuture.completedFuture(getRecordsResponse)); - - leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, lease, shardInfo)); + leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, lease, shardInfo, shardDetector)); leaseCleanupManager.cleanupLeases(); + verify(shardDetector, times(1)).getChildShards(shardInfo.shardId()); verify(leaseRefresher, times(expectedDeletedLeases)).deleteLease(any(Lease.class)); }