From 2f1838483c361a0e61b6e79fc86a8ac3a3d92158 Mon Sep 17 00:00:00 2001 From: Joshua Kim Date: Wed, 6 May 2020 08:51:06 -0400 Subject: [PATCH] Adding Lease cleanup in shutdown task. --- .../kinesis/common/LeaseCleanupConfig.java | 41 +++ .../amazon/kinesis/coordinator/Scheduler.java | 42 +-- .../kinesis/leases/LeaseCleanupManager.java | 338 +++++++++++++++++ .../kinesis/leases/LeaseManagementConfig.java | 23 +- .../leases/LeaseManagementFactory.java | 2 + .../DynamoDBLeaseManagementFactory.java | 44 ++- .../exceptions/LeasePendingDeletion.java | 35 ++ .../kinesis/lifecycle/ConsumerStates.java | 6 +- .../lifecycle/ShardConsumerArgument.java | 2 + .../kinesis/lifecycle/ShutdownTask.java | 46 ++- .../kinesis/coordinator/SchedulerTest.java | 32 +- .../leases/LeaseCleanupManagerTest.java | 347 ++++++++++++++++++ .../amazon/kinesis/leases/LeaseHelper.java | 44 +++ .../kinesis/lifecycle/ConsumerStatesTest.java | 7 +- .../kinesis/lifecycle/ShutdownTaskTest.java | 54 ++- 15 files changed, 969 insertions(+), 94 deletions(-) create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/LeaseCleanupConfig.java create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java create mode 100644 amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/LeasePendingDeletion.java create mode 100644 amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCleanupManagerTest.java create mode 100644 amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseHelper.java diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/LeaseCleanupConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/LeaseCleanupConfig.java new file mode 100644 index 00000000..b2582d45 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/LeaseCleanupConfig.java @@ -0,0 +1,41 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.kinesis.common; + +import lombok.Builder; +import lombok.Getter; +import lombok.experimental.Accessors; + +/** + * Configuration for lease cleanup. + */ +@Builder +@Getter +@Accessors(fluent=true) +public class LeaseCleanupConfig { + /** + * Interval at which to run lease cleanup thread. + */ + private final long leaseCleanupIntervalMillis; + /** + * Interval at which to check if a lease is completed or not. + */ + private final long completedLeaseCleanupIntervalMillis; + /** + * Interval at which to check if a lease is garbage (i.e trimmed past the stream's retention period) or not. + */ + private final long garbageLeaseCleanupIntervalMillis; +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 38a8131a..3e74e23b 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -58,6 +58,7 @@ import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.leases.HierarchicalShardSyncer; import software.amazon.kinesis.leases.Lease; +import software.amazon.kinesis.leases.LeaseCleanupManager; import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseManagementConfig; import software.amazon.kinesis.leases.LeaseRefresher; @@ -164,6 +165,7 @@ public class Scheduler implements Runnable { private final long schedulerInitializationBackoffTimeMillis; private final LeaderDecider leaderDecider; private final Map staleStreamDeletionMap = new HashMap<>(); + private final LeaseCleanupManager leaseCleanupManager; // Holds consumers for shards the worker is currently tracking. Key is shard // info, value is ShardConsumer. @@ -289,6 +291,8 @@ public class Scheduler implements Runnable { this.leaderElectedPeriodicShardSyncManager = new PeriodicShardSyncManager( leaseManagementConfig.workerIdentifier(), leaderDecider, leaseRefresher, currentStreamConfigMap, shardSyncTaskManagerProvider, isMultiStreamMode); + this.leaseCleanupManager = this.leaseManagementConfig.leaseManagementFactory(leaseSerializer, isMultiStreamMode) + .createLeaseCleanupManager(metricsFactory); } /** @@ -341,6 +345,13 @@ public class Scheduler implements Runnable { log.info("Skipping shard sync per configuration setting (and lease table is not empty)"); } + if (!leaseCleanupManager.isRunning()) { + log.info("Starting LeaseCleanupManager."); + leaseCleanupManager.start(); + } else { + log.info("LeaseCleanupManager is already running. No need to start it"); + } + // If we reach this point, then we either skipped the lease sync or did not have any exception // for any of the shard sync in the previous attempt. if (!leaseCoordinator.isRunning()) { @@ -397,29 +408,14 @@ public class Scheduler implements Runnable { void runProcessLoop() { try { Set assignedShards = new HashSet<>(); - final Set completedShards = new HashSet<>(); for (ShardInfo shardInfo : getShardInfoForAssignments()) { ShardConsumer shardConsumer = createOrGetShardConsumer(shardInfo, - processorConfig.shardRecordProcessorFactory()); + processorConfig.shardRecordProcessorFactory(), leaseCleanupManager); - if (shardConsumer.isShutdown() && shardConsumer.shutdownReason().equals(ShutdownReason.SHARD_END)) { - completedShards.add(shardInfo); - } else { - shardConsumer.executeLifecycle(); - } + shardConsumer.executeLifecycle(); assignedShards.add(shardInfo); } - for (ShardInfo completedShard : completedShards) { - final StreamIdentifier streamIdentifier = getStreamIdentifier(completedShard.streamIdentifierSerOpt()); - final StreamConfig streamConfig = currentStreamConfigMap - .getOrDefault(streamIdentifier, getDefaultStreamConfig(streamIdentifier)); - if (createOrGetShardSyncTaskManager(streamConfig).submitShardSyncTask()) { - log.info("{} : Found completed shard, initiated new ShardSyncTak for {} ", - streamIdentifier.serialize(), completedShard.toString()); - } - } - // clean up shard consumers for unassigned shards cleanupShardConsumers(assignedShards); @@ -868,7 +864,8 @@ public class Scheduler implements Runnable { * @return ShardConsumer for the shard */ ShardConsumer createOrGetShardConsumer(@NonNull final ShardInfo shardInfo, - @NonNull final ShardRecordProcessorFactory shardRecordProcessorFactory) { + @NonNull final ShardRecordProcessorFactory shardRecordProcessorFactory, + @NonNull final LeaseCleanupManager leaseCleanupManager) { ShardConsumer consumer = shardInfoShardConsumerMap.get(shardInfo); // Instantiate a new consumer if we don't have one, or the one we // had was from an earlier @@ -877,7 +874,7 @@ public class Scheduler implements Runnable { // completely processed (shutdown reason terminate). if ((consumer == null) || (consumer.isShutdown() && consumer.shutdownReason().equals(ShutdownReason.LEASE_LOST))) { - consumer = buildConsumer(shardInfo, shardRecordProcessorFactory); + consumer = buildConsumer(shardInfo, shardRecordProcessorFactory, leaseCleanupManager); shardInfoShardConsumerMap.put(shardInfo, consumer); slog.infoForce("Created new shardConsumer for : " + shardInfo); } @@ -889,12 +886,14 @@ public class Scheduler implements Runnable { } protected ShardConsumer buildConsumer(@NonNull final ShardInfo shardInfo, - @NonNull final ShardRecordProcessorFactory shardRecordProcessorFactory) { + @NonNull final ShardRecordProcessorFactory shardRecordProcessorFactory, + @NonNull final LeaseCleanupManager leaseCleanupManager) { ShardRecordProcessorCheckpointer checkpointer = coordinatorConfig.coordinatorFactory().createRecordProcessorCheckpointer(shardInfo, checkpoint); // The only case where streamName is not available will be when multistreamtracker not set. In this case, // get the default stream name for the single stream application. final StreamIdentifier streamIdentifier = getStreamIdentifier(shardInfo.streamIdentifierSerOpt()); + // Irrespective of single stream app or multi stream app, streamConfig should always be available. // If we have a shardInfo, that is not present in currentStreamConfigMap for whatever reason, then return default stream config // to gracefully complete the reading. @@ -922,7 +921,8 @@ public class Scheduler implements Runnable { shardDetectorProvider.apply(streamConfig), aggregatorUtil, hierarchicalShardSyncerProvider.apply(streamConfig), - metricsFactory); + metricsFactory, + leaseCleanupManager); return new ShardConsumer(cache, executorService, shardInfo, lifecycleConfig.logWarningForTaskAfterMillis(), argument, lifecycleConfig.taskExecutionListener(), lifecycleConfig.readTimeoutsToIgnoreBeforeWarning()); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java new file mode 100644 index 00000000..ba108748 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseCleanupManager.java @@ -0,0 +1,338 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.kinesis.leases; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Stopwatch; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import lombok.Value; +import lombok.experimental.Accessors; +import lombok.extern.slf4j.Slf4j; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; +import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; +import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; +import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse; +import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; +import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; +import software.amazon.awssdk.utils.CollectionUtils; +import software.amazon.kinesis.common.FutureUtils; +import software.amazon.kinesis.common.KinesisRequestsBuilder; +import software.amazon.kinesis.common.StreamIdentifier; +import software.amazon.kinesis.leases.exceptions.DependencyException; +import software.amazon.kinesis.leases.exceptions.LeasePendingDeletion; +import software.amazon.kinesis.leases.exceptions.InvalidStateException; +import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; +import software.amazon.kinesis.metrics.MetricsFactory; +import software.amazon.kinesis.retrieval.AWSExceptionManager; +import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; + +import java.time.Duration; +import java.util.HashSet; +import java.util.Objects; +import java.util.Optional; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; + +/** + * Helper class to cleanup of any expired/closed shard leases. It will cleanup leases periodically as defined by + * {@link LeaseManagementConfig#leaseCleanupConfig()} asynchronously. + */ +@Accessors(fluent=true) +@Slf4j +@RequiredArgsConstructor +@EqualsAndHashCode +public class LeaseCleanupManager { + @NonNull + private final LeaseCoordinator leaseCoordinator; + @NonNull + private final KinesisAsyncClient kinesisClient; + @NonNull + private final MetricsFactory metricsFactory; + @NonNull + private final Duration maxFutureWait; + @NonNull + private final ScheduledExecutorService deletionThreadPool; + private final boolean cleanupLeasesUponShardCompletion; + private final long leaseCleanupIntervalMillis; + private final long completedLeaseCleanupIntervalMillis; + private final long garbageLeaseCleanupIntervalMillis; + private final Stopwatch completedLeaseStopwatch = Stopwatch.createUnstarted(); + private final Stopwatch garbageLeaseStopwatch = Stopwatch.createUnstarted(); + + private final Queue deletionQueue = new ConcurrentLinkedQueue<>(); + + private static final int MAX_RECORDS = 1; + private static final long INITIAL_DELAY = 0L; + + @Getter + private volatile boolean isRunning = false; + + /** + * Starts the lease cleanup thread, which is scheduled periodically as specified by + * {@link LeaseCleanupManager#leaseCleanupIntervalMillis} + */ + public void start() { + log.debug("Starting lease cleanup thread."); + completedLeaseStopwatch.start(); + garbageLeaseStopwatch.start(); + + deletionThreadPool.scheduleAtFixedRate(new LeaseCleanupThread(), INITIAL_DELAY, leaseCleanupIntervalMillis, + TimeUnit.MILLISECONDS); + } + + /** + * Enqueues a lease for deletion. + * @param leasePendingDeletion + */ + public void enqueueForDeletion(LeasePendingDeletion leasePendingDeletion) { + final Lease lease = leasePendingDeletion.lease(); + if (lease == null) { + log.warn("Cannot enqueue lease {} for deferred deletion - instance doesn't hold the lease for that shard.", + lease.leaseKey()); + } else { + //TODO: Optimize verifying duplicate entries https://sim.amazon.com/issues/KinesisLTR-597. + if (!deletionQueue.contains(leasePendingDeletion)) { + log.debug("Enqueuing lease {} for deferred deletion.", lease.leaseKey()); + deletionQueue.add(leasePendingDeletion); + } else { + log.warn("Lease {} is already pending deletion, not enqueueing for deletion.", lease.leaseKey()); + } + } + } + + /** + * Returns how many leases are currently waiting in the queue pending deletion. + * @return number of leases pending deletion. + */ + public int leasesPendingDeletion() { + return deletionQueue.size(); + } + + private boolean timeToCheckForCompletedShard() { + return completedLeaseStopwatch.elapsed(TimeUnit.MILLISECONDS) >= completedLeaseCleanupIntervalMillis; + } + + private boolean timeToCheckForGarbageShard() { + return garbageLeaseStopwatch.elapsed(TimeUnit.MILLISECONDS) >= garbageLeaseCleanupIntervalMillis; + } + + private LeaseCleanupResult cleanupLease(LeasePendingDeletion leasePendingDeletion) throws TimeoutException, + InterruptedException, DependencyException, ProvisionedThroughputException, InvalidStateException { + final Lease lease = leasePendingDeletion.lease(); + final ShardInfo shardInfo = leasePendingDeletion.shardInfo(); + final StreamIdentifier streamIdentifier = leasePendingDeletion.streamIdentifier(); + + final AWSExceptionManager exceptionManager = createExceptionManager(); + + boolean cleanedUpCompletedLease = false; + boolean cleanedUpGarbageLease = false; + boolean alreadyCheckedForGarbageCollection = false; + + try { + if (cleanupLeasesUponShardCompletion && timeToCheckForCompletedShard()) { + Set childShardKeys = leaseCoordinator.leaseRefresher().getLease(lease.leaseKey()).childShardIds(); + if (CollectionUtils.isNullOrEmpty(childShardKeys)) { + try { + childShardKeys = getChildShardsFromService(shardInfo, streamIdentifier); + updateLeaseWithChildShards(leasePendingDeletion, childShardKeys); + } catch (ExecutionException e) { + throw exceptionManager.apply(e.getCause()); + } finally { + alreadyCheckedForGarbageCollection = true; + } + } + cleanedUpCompletedLease = cleanupLeaseForCompletedShard(lease, shardInfo, childShardKeys); + } + + if (!alreadyCheckedForGarbageCollection && timeToCheckForGarbageShard()) { + try { + getChildShardsFromService(shardInfo, streamIdentifier); + } catch (ExecutionException e) { + throw exceptionManager.apply(e.getCause()); + } + } + } catch (ResourceNotFoundException e) { + cleanedUpGarbageLease = cleanupLeaseForGarbageShard(lease); + } + + return new LeaseCleanupResult(cleanedUpCompletedLease, cleanedUpGarbageLease); + } + + private Set getChildShardsFromService(ShardInfo shardInfo, StreamIdentifier streamIdentifier) + throws InterruptedException, ExecutionException, TimeoutException { + final GetShardIteratorRequest getShardIteratorRequest = KinesisRequestsBuilder.getShardIteratorRequestBuilder() + .streamName(streamIdentifier.streamName()) + .shardIteratorType(ShardIteratorType.LATEST) + .shardId(shardInfo.shardId()) + .build(); + + final GetShardIteratorResponse getShardIteratorResponse = + FutureUtils.resolveOrCancelFuture(kinesisClient.getShardIterator(getShardIteratorRequest), maxFutureWait); + + final GetRecordsRequest getRecordsRequest = KinesisRequestsBuilder.getRecordsRequestBuilder() + .shardIterator(getShardIteratorResponse.shardIterator()) + .limit(MAX_RECORDS) + .build(); + + final GetRecordsResponse getRecordsResponse = + FutureUtils.resolveOrCancelFuture(kinesisClient.getRecords(getRecordsRequest), maxFutureWait); + + return getRecordsResponse.childShards().stream().map(c -> c.shardId()).collect(Collectors.toSet()); + } + + + // A lease that ended with SHARD_END from ResourceNotFoundException is safe to delete if it no longer exists in the + // stream (known explicitly from ResourceNotFound being thrown when processing this shard), + private boolean cleanupLeaseForGarbageShard(Lease lease) throws DependencyException, ProvisionedThroughputException, InvalidStateException { + log.info("Deleting lease {} as it is not present in the stream.", lease); + leaseCoordinator.leaseRefresher().deleteLease(lease); + return true; + } + + private boolean allParentShardLeasesDeleted(Lease lease, ShardInfo shardInfo) throws DependencyException, ProvisionedThroughputException, InvalidStateException { + for (String parentShard : lease.parentShardIds()) { + final Lease parentLease = leaseCoordinator.leaseRefresher().getLease(ShardInfo.getLeaseKey(shardInfo, parentShard)); + + if (parentLease != null) { + log.warn("Lease {} has a parent lease {} which is still present in the lease table, skipping deletion " + + "for this lease.", lease, parentLease); + return false; + } + } + return true; + } + + // We should only be deleting the current shard's lease if + // 1. All of its children are currently being processed, i.e their checkpoint is not TRIM_HORIZON or AT_TIMESTAMP. + // 2. Its parent shard lease(s) have already been deleted. + private boolean cleanupLeaseForCompletedShard(Lease lease, ShardInfo shardInfo, Set childShardKeys) + throws DependencyException, ProvisionedThroughputException, InvalidStateException, IllegalStateException { + final Set processedChildShardLeases = new HashSet<>(); + + for (String childShardKey : childShardKeys) { + final Lease childShardLease = Optional.ofNullable(leaseCoordinator.leaseRefresher().getLease(childShardKey)).orElseThrow( + () -> new IllegalStateException("Child lease " + childShardKey + " for completed shard not found in " + + "lease table - not cleaning up lease " + lease)); + + if (!childShardLease.checkpoint().equals(ExtendedSequenceNumber.TRIM_HORIZON) + && !childShardLease.checkpoint().equals(ExtendedSequenceNumber.AT_TIMESTAMP)) { + processedChildShardLeases.add(childShardLease.leaseKey()); + } + } + + if (!allParentShardLeasesDeleted(lease, shardInfo) || !Objects.equals(childShardKeys, processedChildShardLeases)) { + return false; + } + + log.info("Deleting lease {} as it has been completely processed and processing of child shard(s) has begun.", + lease); + leaseCoordinator.leaseRefresher().deleteLease(lease); + + return true; + } + + private void updateLeaseWithChildShards(LeasePendingDeletion leasePendingDeletion, Set childShardKeys) + throws DependencyException, ProvisionedThroughputException, InvalidStateException { + final Lease updatedLease = leasePendingDeletion.lease(); + updatedLease.childShardIds(childShardKeys); + + leaseCoordinator.leaseRefresher().updateLeaseWithMetaInfo(updatedLease, UpdateField.CHILD_SHARDS); + } + + private AWSExceptionManager createExceptionManager() { + final AWSExceptionManager exceptionManager = new AWSExceptionManager(); + exceptionManager.add(ResourceNotFoundException.class, t -> t); + + return exceptionManager; + } + + @VisibleForTesting + void cleanupLeases() { + if (deletionQueue.isEmpty()) { + log.debug("No leases pending deletion."); + } else if (timeToCheckForCompletedShard() | timeToCheckForGarbageShard()) { + final Queue failedDeletions = new ConcurrentLinkedQueue<>(); + boolean completedLeaseCleanedUp = false; + boolean garbageLeaseCleanedUp = false; + + log.debug("Attempting to clean up {} lease(s).", deletionQueue.size()); + + while (!deletionQueue.isEmpty()) { + final LeasePendingDeletion leasePendingDeletion = deletionQueue.poll(); + final String leaseKey = leasePendingDeletion.lease().leaseKey(); + final StreamIdentifier streamIdentifier = leasePendingDeletion.streamIdentifier(); + boolean deletionFailed = true; + try { + final LeaseCleanupResult leaseCleanupResult = cleanupLease(leasePendingDeletion); + completedLeaseCleanedUp |= leaseCleanupResult.cleanedUpCompletedLease(); + garbageLeaseCleanedUp |= leaseCleanupResult.cleanedUpGarbageLease(); + + if (leaseCleanupResult.leaseCleanedUp()) { + log.debug("Successfully cleaned up lease {} for {}", leaseKey, streamIdentifier); + deletionFailed = false; + } + } catch (Exception e) { + log.error("Failed to cleanup lease {} for {}. Will re-enqueue for deletion and retry on next " + + "scheduled execution.", leaseKey, streamIdentifier, e); + } + + if (deletionFailed) { + log.debug("Did not cleanup lease {} for {}. Re-enqueueing for deletion.", leaseKey, streamIdentifier); + failedDeletions.add(leasePendingDeletion); + } + } + + if (completedLeaseCleanedUp) { + log.debug("At least one completed lease was cleaned up - restarting interval"); + completedLeaseStopwatch.reset().start(); + } + + if (garbageLeaseCleanedUp) { + log.debug("At least one garbage lease was cleaned up - restarting interval"); + garbageLeaseStopwatch.reset().start(); + } + + deletionQueue.addAll(failedDeletions); + } + } + + private class LeaseCleanupThread implements Runnable { + @Override + public void run() { + cleanupLeases(); + } + } + + @Value + private class LeaseCleanupResult { + boolean cleanedUpCompletedLease; + boolean cleanedUpGarbageLease; + + public boolean leaseCleanedUp() { + return cleanedUpCompletedLease | cleanedUpGarbageLease; + } + } +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java index 789a3008..82c02060 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java @@ -33,6 +33,7 @@ import software.amazon.awssdk.services.dynamodb.model.BillingMode; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.common.LeaseCleanupConfig; import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.leases.dynamodb.DynamoDBLeaseManagementFactory; import software.amazon.kinesis.leases.dynamodb.TableCreatorCallback; @@ -48,6 +49,16 @@ public class LeaseManagementConfig { public static final Duration DEFAULT_REQUEST_TIMEOUT = Duration.ofMinutes(1); + public static final long DEFAULT_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(1).toMillis(); + public static final long DEFAULT_COMPLETED_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(5).toMillis(); + public static final long DEFAULT_GARBAGE_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofMinutes(30).toMillis(); + + public static final LeaseCleanupConfig DEFAULT_LEASE_CLEANUP_CONFIG = LeaseCleanupConfig.builder() + .leaseCleanupIntervalMillis(DEFAULT_LEASE_CLEANUP_INTERVAL_MILLIS) + .completedLeaseCleanupIntervalMillis(DEFAULT_COMPLETED_LEASE_CLEANUP_INTERVAL_MILLIS) + .garbageLeaseCleanupIntervalMillis(DEFAULT_GARBAGE_LEASE_CLEANUP_INTERVAL_MILLIS) + .build(); + /** * Name of the table to use in DynamoDB * @@ -108,6 +119,15 @@ public class LeaseManagementConfig { */ private boolean cleanupLeasesUponShardCompletion = true; + /** + * Configuration for lease cleanup in {@link LeaseCleanupManager}. + * + *

Default lease cleanup interval value: 1 minute.

+ *

Default completed lease cleanup threshold: 5 minute.

+ *

Default garbage lease cleanup threshold: 30 minute.

+ */ + private final LeaseCleanupConfig leaseCleanupConfig = DEFAULT_LEASE_CLEANUP_CONFIG; + /** * The max number of leases (shards) this worker should process. * This can be useful to avoid overloading (and thrashing) a worker when a host has resource constraints @@ -344,7 +364,8 @@ public class LeaseManagementConfig { billingMode(), leaseSerializer, customShardDetectorProvider(), - isMultiStreamingMode); + isMultiStreamingMode, + leaseCleanupConfig()); } return leaseManagementFactory; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementFactory.java index 37f66258..ecf9b390 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementFactory.java @@ -39,4 +39,6 @@ public interface LeaseManagementFactory { throw new UnsupportedOperationException(); } + LeaseCleanupManager createLeaseCleanupManager(MetricsFactory metricsFactory); + } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java index c1b250a4..7d374de5 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java @@ -17,6 +17,7 @@ package software.amazon.kinesis.leases.dynamodb; import java.time.Duration; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.function.Function; import lombok.Data; import lombok.NonNull; @@ -25,10 +26,12 @@ import software.amazon.awssdk.services.dynamodb.model.BillingMode; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.common.LeaseCleanupConfig; import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.leases.HierarchicalShardSyncer; import software.amazon.kinesis.leases.KinesisShardDetector; +import software.amazon.kinesis.leases.LeaseCleanupManager; import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseManagementConfig; import software.amazon.kinesis.leases.LeaseManagementFactory; @@ -83,6 +86,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { private final Duration dynamoDbRequestTimeout; private final BillingMode billingMode; private final boolean isMultiStreamMode; + private final LeaseCleanupConfig leaseCleanupConfig; /** * Constructor. @@ -208,7 +212,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { * @param cacheMissWarningModulus * @param initialLeaseTableReadCapacity * @param initialLeaseTableWriteCapacity - * @param deprecatedHierarchicalShardSyncer + * @param hierarchicalShardSyncer * @param tableCreatorCallback */ @Deprecated @@ -222,14 +226,14 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload, final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus, final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity, - final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback) { + final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback) { this(kinesisClient, streamName, dynamoDBClient, tableName, workerIdentifier, executorService, initialPositionInStream, failoverTimeMillis, epsilonMillis, maxLeasesForWorker, maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis, maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds, cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity, - deprecatedHierarchicalShardSyncer, tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT); + hierarchicalShardSyncer, tableCreatorCallback, LeaseManagementConfig.DEFAULT_REQUEST_TIMEOUT); } /** @@ -258,7 +262,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { * @param cacheMissWarningModulus * @param initialLeaseTableReadCapacity * @param initialLeaseTableWriteCapacity - * @param deprecatedHierarchicalShardSyncer + * @param hierarchicalShardSyncer * @param tableCreatorCallback * @param dynamoDbRequestTimeout */ @@ -273,7 +277,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload, final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus, final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity, - final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback, + final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback, Duration dynamoDbRequestTimeout) { this(kinesisClient, streamName, dynamoDBClient, tableName, workerIdentifier, executorService, initialPositionInStream, failoverTimeMillis, epsilonMillis, maxLeasesForWorker, @@ -281,7 +285,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis, maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds, cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity, - deprecatedHierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, BillingMode.PROVISIONED); + hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, BillingMode.PROVISIONED); } /** @@ -310,7 +314,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { * @param cacheMissWarningModulus * @param initialLeaseTableReadCapacity * @param initialLeaseTableWriteCapacity - * @param deprecatedHierarchicalShardSyncer + * @param hierarchicalShardSyncer * @param tableCreatorCallback * @param dynamoDbRequestTimeout * @param billingMode @@ -326,7 +330,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload, final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus, final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity, - final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback, + final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback, Duration dynamoDbRequestTimeout, BillingMode billingMode) { this(kinesisClient, new StreamConfig(StreamIdentifier.singleStreamInstance(streamName), initialPositionInStream), dynamoDBClient, tableName, @@ -335,7 +339,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis, maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds, cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity, - deprecatedHierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, new DynamoDBLeaseSerializer()); + hierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, new DynamoDBLeaseSerializer()); } /** @@ -386,7 +390,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds, cacheMissWarningModulus, initialLeaseTableReadCapacity, initialLeaseTableWriteCapacity, deprecatedHierarchicalShardSyncer, tableCreatorCallback, dynamoDbRequestTimeout, billingMode, leaseSerializer, - null, false); + null, false, LeaseManagementConfig.DEFAULT_LEASE_CLEANUP_CONFIG); this.streamConfig = streamConfig; } @@ -420,6 +424,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { * @param leaseSerializer * @param customShardDetectorProvider * @param isMultiStreamMode + * @param leaseCleanupConfig */ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier, @@ -432,7 +437,8 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity, final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback, Duration dynamoDbRequestTimeout, BillingMode billingMode, LeaseSerializer leaseSerializer, - Function customShardDetectorProvider, boolean isMultiStreamMode) { + Function customShardDetectorProvider, boolean isMultiStreamMode, + LeaseCleanupConfig leaseCleanupConfig) { this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.tableName = tableName; @@ -461,6 +467,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { this.leaseSerializer = leaseSerializer; this.customShardDetectorProvider = customShardDetectorProvider; this.isMultiStreamMode = isMultiStreamMode; + this.leaseCleanupConfig = leaseCleanupConfig; } @Override @@ -535,4 +542,19 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { maxListShardsRetryAttempts, listShardsCacheAllowedAgeInSeconds, maxCacheMissesBeforeReload, cacheMissWarningModulus, dynamoDbRequestTimeout); } + + /** + * LeaseCleanupManager cleans up leases in the lease table for shards which have either expired past the + * stream's retention period or have been completely processed. + * @param metricsFactory + * @return LeaseCleanupManager + */ + @Override + public LeaseCleanupManager createLeaseCleanupManager(MetricsFactory metricsFactory) { + return new LeaseCleanupManager(createLeaseCoordinator(metricsFactory), kinesisClient, + metricsFactory, dynamoDbRequestTimeout, Executors.newSingleThreadScheduledExecutor(), + cleanupLeasesUponShardCompletion, leaseCleanupConfig.leaseCleanupIntervalMillis(), + leaseCleanupConfig.completedLeaseCleanupIntervalMillis(), + leaseCleanupConfig.garbageLeaseCleanupIntervalMillis()); + } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/LeasePendingDeletion.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/LeasePendingDeletion.java new file mode 100644 index 00000000..b840eb09 --- /dev/null +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/exceptions/LeasePendingDeletion.java @@ -0,0 +1,35 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.kinesis.leases.exceptions; + +import lombok.EqualsAndHashCode; +import lombok.Value; +import lombok.experimental.Accessors; +import software.amazon.kinesis.common.StreamIdentifier; +import software.amazon.kinesis.leases.Lease; +import software.amazon.kinesis.leases.ShardInfo; + +/** + * Helper class for cleaning up leases. + */ +@Accessors(fluent = true) +@Value +@EqualsAndHashCode(exclude = {"queueEntryTime"}) +public class LeasePendingDeletion { + private final StreamIdentifier streamIdentifier; + private final Lease lease; + private final ShardInfo shardInfo; +} diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java index 58e31985..4d894d94 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java @@ -146,7 +146,7 @@ class ConsumerStates { @Override public ConsumerState shutdownTransition(ShutdownReason shutdownReason) { - return ShardConsumerState.SHUTDOWN_COMPLETE.consumerState(); + return ShardConsumerState.SHUTTING_DOWN.consumerState(); } @Override @@ -497,7 +497,9 @@ class ConsumerStates { argument.recordsPublisher(), argument.hierarchicalShardSyncer(), argument.metricsFactory(), - input == null ? null : input.childShards()); + input == null ? null : input.childShards(), + argument.streamIdentifier(), + argument.leaseCleanupManager()); } @Override diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java index 03ddc6ee..0f18891c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java @@ -22,6 +22,7 @@ import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.StreamIdentifier; +import software.amazon.kinesis.leases.LeaseCleanupManager; import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardInfo; @@ -71,4 +72,5 @@ public class ShardConsumerArgument { private final HierarchicalShardSyncer hierarchicalShardSyncer; @NonNull private final MetricsFactory metricsFactory; + private final LeaseCleanupManager leaseCleanupManager; } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index 5bd0664f..5f1dcd25 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -15,6 +15,10 @@ package software.amazon.kinesis.lifecycle; import com.google.common.annotations.VisibleForTesting; + +import java.util.List; +import java.util.Optional; + import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; @@ -23,13 +27,16 @@ import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.leases.HierarchicalShardSyncer; import software.amazon.kinesis.leases.Lease; +import software.amazon.kinesis.leases.LeaseCleanupManager; import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.exceptions.CustomerApplicationException; import software.amazon.kinesis.leases.exceptions.DependencyException; +import software.amazon.kinesis.leases.exceptions.LeasePendingDeletion; import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; @@ -42,7 +49,6 @@ import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; -import java.util.List; import java.util.Set; import java.util.UUID; import java.util.function.Function; @@ -85,6 +91,10 @@ public class ShutdownTask implements ConsumerTask { private final TaskType taskType = TaskType.SHUTDOWN; private final List childShards; + @NonNull + private final StreamIdentifier streamIdentifier; + @NonNull + private final LeaseCleanupManager leaseCleanupManager; private static final Function leaseKeyProvider = shardInfo -> ShardInfo.getLeaseKey(shardInfo); @@ -113,19 +123,26 @@ public class ShutdownTask implements ConsumerTask { // This would happen when KinesisDataFetcher(for polling mode) or FanOutRecordsPublisher(for StoS mode) catches ResourceNotFound exception. // In this case, KinesisDataFetcher and FanOutRecordsPublisher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards. // This scenario could happen when customer deletes the stream while leaving the KCL application running. + final Lease currentShardLease = leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(shardInfo)); + if (!CollectionUtils.isNullOrEmpty(childShards)) { createLeasesForChildShardsIfNotExist(); - updateLeaseWithChildShards(); - } else { - log.warn("Shard {} no longer exists. Shutting down consumer with SHARD_END reason without creating leases for child shards.", leaseKeyProvider.apply(shardInfo)); + updateLeaseWithChildShards(currentShardLease); } - recordProcessorCheckpointer - .sequenceNumberAtShardEnd(recordProcessorCheckpointer.largestPermittedCheckpointValue()); - recordProcessorCheckpointer.largestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END); - // Call the shardRecordsProcessor to checkpoint with SHARD_END sequence number. - // The shardEnded is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling shardEnded. - throwOnApplicationException(() -> applicationCheckpointAndVerification(), scope, startTime); + final Lease leaseFromDdb = Optional.ofNullable(leaseCoordinator.leaseRefresher().getLease(leaseKeyProvider.apply(shardInfo))) + .orElseThrow(() -> new IllegalStateException("Lease for shard " + leaseKeyProvider.apply(shardInfo) + " does not exist.")); + if (!leaseFromDdb.checkpoint().equals(ExtendedSequenceNumber.SHARD_END)) { + recordProcessorCheckpointer + .sequenceNumberAtShardEnd(recordProcessorCheckpointer.largestPermittedCheckpointValue()); + recordProcessorCheckpointer.largestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END); + // Call the shardRecordsProcessor to checkpoint with SHARD_END sequence number. + // The shardEnded is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling shardEnded. + throwOnApplicationException(() -> applicationCheckpointAndVerification(), scope, startTime); + } + + final LeasePendingDeletion garbageLease = new LeasePendingDeletion(streamIdentifier, currentShardLease, shardInfo); + leaseCleanupManager.enqueueForDeletion(garbageLease); } else { throwOnApplicationException(() -> shardRecordProcessor.leaseLost(LeaseLostInput.builder().build()), scope, startTime); } @@ -162,8 +179,8 @@ public class ShutdownTask implements ConsumerTask { if (lastCheckpointValue == null || !lastCheckpointValue.equals(ExtendedSequenceNumber.SHARD_END)) { throw new IllegalArgumentException("Application didn't checkpoint at end of shard " - + leaseKeyProvider.apply(shardInfo) + ". Application must checkpoint upon shard end. " + - "See ShardRecordProcessor.shardEnded javadocs for more information."); + + leaseKeyProvider.apply(shardInfo) + ". Application must checkpoint upon shard end. " + + "See ShardRecordProcessor.shardEnded javadocs for more information."); } } @@ -189,9 +206,8 @@ public class ShutdownTask implements ConsumerTask { } } - private void updateLeaseWithChildShards() + private void updateLeaseWithChildShards(Lease currentLease) throws DependencyException, InvalidStateException, ProvisionedThroughputException { - final Lease currentLease = leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(shardInfo)); Set childShardIds = childShards.stream().map(ChildShard::shardId).collect(Collectors.toSet()); final Lease updatedLease = currentLease.copy(); @@ -206,7 +222,7 @@ public class ShutdownTask implements ConsumerTask { /* * (non-Javadoc) - * + * * @see com.amazonaws.services.kinesis.clientlibrary.lib.worker.ConsumerTask#taskType() */ @Override diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java index a1601eda..af58d3ab 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/coordinator/SchedulerTest.java @@ -73,6 +73,7 @@ import software.amazon.kinesis.common.StreamConfig; import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.exceptions.KinesisClientLibException; import software.amazon.kinesis.exceptions.KinesisClientLibNonRetryableException; +import software.amazon.kinesis.leases.LeaseCleanupManager; import software.amazon.kinesis.leases.HierarchicalShardSyncer; import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseManagementConfig; @@ -153,6 +154,8 @@ public class SchedulerTest { private WorkerStateChangeListener workerStateChangeListener; @Mock private MultiStreamTracker multiStreamTracker; + @Mock + private LeaseCleanupManager leaseCleanupManager; private Map shardSyncTaskManagerMap; private Map shardDetectorMap; @@ -219,9 +222,9 @@ public class SchedulerTest { final String shardId = "shardId-000000000000"; final String concurrencyToken = "concurrencyToken"; final ShardInfo shardInfo = new ShardInfo(shardId, concurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); - final ShardConsumer shardConsumer1 = scheduler.createOrGetShardConsumer(shardInfo, shardRecordProcessorFactory); + final ShardConsumer shardConsumer1 = scheduler.createOrGetShardConsumer(shardInfo, shardRecordProcessorFactory, leaseCleanupManager); assertNotNull(shardConsumer1); - final ShardConsumer shardConsumer2 = scheduler.createOrGetShardConsumer(shardInfo, shardRecordProcessorFactory); + final ShardConsumer shardConsumer2 = scheduler.createOrGetShardConsumer(shardInfo, shardRecordProcessorFactory, leaseCleanupManager); assertNotNull(shardConsumer2); assertSame(shardConsumer1, shardConsumer2); @@ -229,7 +232,7 @@ public class SchedulerTest { final String anotherConcurrencyToken = "anotherConcurrencyToken"; final ShardInfo shardInfo2 = new ShardInfo(shardId, anotherConcurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); - final ShardConsumer shardConsumer3 = scheduler.createOrGetShardConsumer(shardInfo2, shardRecordProcessorFactory); + final ShardConsumer shardConsumer3 = scheduler.createOrGetShardConsumer(shardInfo2, shardRecordProcessorFactory, leaseCleanupManager); assertNotNull(shardConsumer3); assertNotSame(shardConsumer1, shardConsumer3); @@ -261,9 +264,9 @@ public class SchedulerTest { schedulerSpy.runProcessLoop(); schedulerSpy.runProcessLoop(); - verify(schedulerSpy).buildConsumer(same(initialShardInfo.get(0)), eq(shardRecordProcessorFactory)); - verify(schedulerSpy, never()).buildConsumer(same(firstShardInfo.get(0)), eq(shardRecordProcessorFactory)); - verify(schedulerSpy, never()).buildConsumer(same(secondShardInfo.get(0)), eq(shardRecordProcessorFactory)); + verify(schedulerSpy).buildConsumer(same(initialShardInfo.get(0)), eq(shardRecordProcessorFactory), eq(leaseCleanupManager)); + verify(schedulerSpy, never()).buildConsumer(same(firstShardInfo.get(0)), eq(shardRecordProcessorFactory), eq(leaseCleanupManager)); + verify(schedulerSpy, never()).buildConsumer(same(secondShardInfo.get(0)), eq(shardRecordProcessorFactory), eq(leaseCleanupManager)); verify(checkpoint).getCheckpointObject(eq(shardId)); } @@ -279,10 +282,10 @@ public class SchedulerTest { ExtendedSequenceNumber.TRIM_HORIZON); final ShardInfo shardInfo1 = new ShardInfo(shard1, concurrencyToken, null, ExtendedSequenceNumber.TRIM_HORIZON); - final ShardConsumer shardConsumer0 = scheduler.createOrGetShardConsumer(shardInfo0, shardRecordProcessorFactory); + final ShardConsumer shardConsumer0 = scheduler.createOrGetShardConsumer(shardInfo0, shardRecordProcessorFactory, leaseCleanupManager); final ShardConsumer shardConsumer0WithAnotherConcurrencyToken = - scheduler.createOrGetShardConsumer(shardInfo0WithAnotherConcurrencyToken, shardRecordProcessorFactory); - final ShardConsumer shardConsumer1 = scheduler.createOrGetShardConsumer(shardInfo1, shardRecordProcessorFactory); + scheduler.createOrGetShardConsumer(shardInfo0WithAnotherConcurrencyToken, shardRecordProcessorFactory, leaseCleanupManager); + final ShardConsumer shardConsumer1 = scheduler.createOrGetShardConsumer(shardInfo1, shardRecordProcessorFactory, leaseCleanupManager); Set shards = new HashSet<>(); shards.add(shardInfo0); @@ -397,11 +400,11 @@ public class SchedulerTest { schedulerSpy.runProcessLoop(); initialShardInfo.stream().forEach( - shardInfo -> verify(schedulerSpy).buildConsumer(same(shardInfo), eq(shardRecordProcessorFactory))); + shardInfo -> verify(schedulerSpy).buildConsumer(same(shardInfo), eq(shardRecordProcessorFactory), same(leaseCleanupManager))); firstShardInfo.stream().forEach( - shardInfo -> verify(schedulerSpy, never()).buildConsumer(same(shardInfo), eq(shardRecordProcessorFactory))); + shardInfo -> verify(schedulerSpy, never()).buildConsumer(same(shardInfo), eq(shardRecordProcessorFactory), eq(leaseCleanupManager))); secondShardInfo.stream().forEach( - shardInfo -> verify(schedulerSpy, never()).buildConsumer(same(shardInfo), eq(shardRecordProcessorFactory))); + shardInfo -> verify(schedulerSpy, never()).buildConsumer(same(shardInfo), eq(shardRecordProcessorFactory), eq(leaseCleanupManager))); } @@ -1071,6 +1074,11 @@ public class SchedulerTest { public ShardDetector createShardDetector(StreamConfig streamConfig) { return shardDetectorMap.get(streamConfig.streamIdentifier()); } + + @Override + public LeaseCleanupManager createLeaseCleanupManager(MetricsFactory metricsFactory) { + return leaseCleanupManager; + } } private class TestKinesisCheckpointFactory implements CheckpointFactory { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCleanupManagerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCleanupManagerTest.java new file mode 100644 index 00000000..eb06a4a0 --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCleanupManagerTest.java @@ -0,0 +1,347 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.kinesis.leases; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; +import software.amazon.awssdk.services.kinesis.model.ChildShard; +import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest; +import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse; +import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest; +import software.amazon.awssdk.services.kinesis.model.GetShardIteratorResponse; +import software.amazon.awssdk.services.kinesis.model.ResourceNotFoundException; +import software.amazon.kinesis.common.StreamIdentifier; +import software.amazon.kinesis.leases.exceptions.LeasePendingDeletion; +import software.amazon.kinesis.metrics.MetricsFactory; +import software.amazon.kinesis.metrics.NullMetricsFactory; +import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; +import java.util.stream.Collectors; + +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class LeaseCleanupManagerTest { + + private ShardInfo shardInfo; + private StreamIdentifier streamIdentifier; + private String concurrencyToken = "1234"; + + private String shardId = "shardId"; + private String splitParent = "splitParent"; + private String mergeParent1 = "mergeParent-1"; + private String mergeParent2 = "mergeParent-2"; + + private Duration maxFutureWait = Duration.ofSeconds(1); + private long leaseCleanupIntervalMillis = Duration.ofSeconds(1).toMillis(); + private long completedLeaseCleanupIntervalMillis = Duration.ofSeconds(0).toMillis(); + private long garbageLeaseCleanupIntervalMillis = Duration.ofSeconds(0).toMillis(); + private boolean cleanupLeasesOfCompletedShards = true; + private LeaseCleanupManager leaseCleanupManager; + private static final MetricsFactory NULL_METRICS_FACTORY = new NullMetricsFactory(); + + @Mock + private LeaseRefresher leaseRefresher; + @Mock + private LeaseCoordinator leaseCoordinator; + @Mock + private KinesisAsyncClient kinesis; + @Mock + private ScheduledExecutorService deletionThreadPool; + + @Before + public void setUp() throws Exception { + shardInfo = new ShardInfo(shardId, concurrencyToken, Collections.emptySet(), + ExtendedSequenceNumber.LATEST); + streamIdentifier = StreamIdentifier.singleStreamInstance("streamName"); + leaseCleanupManager = new LeaseCleanupManager(leaseCoordinator, kinesis, + NULL_METRICS_FACTORY, maxFutureWait, deletionThreadPool, cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis, + completedLeaseCleanupIntervalMillis, garbageLeaseCleanupIntervalMillis); + + when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher); + when(leaseCoordinator.updateLease(any(Lease.class), any(UUID.class), any(String.class), any(String.class))).thenReturn(true); + } + + /** + * Tests that when both child shard leases are present, we are able to delete the parent shard for the completed + * shard case. + */ + @Test + public final void testParentShardLeaseDeletedSplitCase() throws Exception { + shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(), + ExtendedSequenceNumber.LATEST); + + verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShardsForSplit(), ExtendedSequenceNumber.LATEST, 1); + } + + /** + * Tests that when both child shard leases are present, we are able to delete the parent shard for the completed + * shard case. + */ + @Test + public final void testParentShardLeaseDeletedMergeCase() throws Exception { + shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(), + ExtendedSequenceNumber.LATEST); + + verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShardsForMerge(), ExtendedSequenceNumber.LATEST, 1); + } + + /** + * Tests that if cleanupLeasesOfCompletedShards is not enabled by the customer, then no leases are cleaned up for + * the completed shard case. + */ + @Test + public final void testNoLeasesDeletedWhenNotEnabled() throws Exception { + shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(), + ExtendedSequenceNumber.LATEST); + cleanupLeasesOfCompletedShards = false; + + leaseCleanupManager = new LeaseCleanupManager(leaseCoordinator, kinesis, NULL_METRICS_FACTORY, maxFutureWait, + deletionThreadPool, cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis, completedLeaseCleanupIntervalMillis, + garbageLeaseCleanupIntervalMillis); + + verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShardsForSplit(), ExtendedSequenceNumber.LATEST, 0); + } + + /** + * Tests that if some of the child shard leases are missing, we fail fast and don't delete the parent shard lease + * for the completed shard case. + */ + @Test + public final void testNoCleanupWhenSomeChildShardLeasesAreNotPresent() throws Exception { + List childShards = childShardsForSplit(); + + shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(), + ExtendedSequenceNumber.LATEST); + + verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShards, ExtendedSequenceNumber.LATEST, false, 0); + } + + /** + * Tests that if some child shard leases haven't begun processing (at least one lease w/ checkpoint TRIM_HORIZON), + * we don't delete them for the completed shard case. + */ + @Test + public final void testParentShardLeaseNotDeletedWhenChildIsAtTrim() throws Exception { + testParentShardLeaseNotDeletedWhenChildIsAtPosition(ExtendedSequenceNumber.TRIM_HORIZON); + } + + /** + * Tests that if some child shard leases haven't begun processing (at least one lease w/ checkpoint AT_TIMESTAMP), + * we don't delete them for the completed shard case. + */ + @Test + public final void testParentShardLeaseNotDeletedWhenChildIsAtTimestamp() throws Exception { + testParentShardLeaseNotDeletedWhenChildIsAtPosition(ExtendedSequenceNumber.AT_TIMESTAMP); + } + + private final void testParentShardLeaseNotDeletedWhenChildIsAtPosition(ExtendedSequenceNumber extendedSequenceNumber) + throws Exception { + shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(), + ExtendedSequenceNumber.LATEST); + + verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShardsForMerge(), extendedSequenceNumber, 0); + } + + /** + * Tests that if a lease's parents are still present, we do not delete the lease. + */ + @Test + public final void testLeaseNotDeletedWhenParentsStillPresent() throws Exception { + shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.singleton("parent"), + ExtendedSequenceNumber.LATEST); + + verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShardsForMerge(), ExtendedSequenceNumber.LATEST, 0); + } + + /** + * Tests ResourceNotFound case for if a shard expires, that we delete the lease when shardExpired is found. + */ + @Test + public final void testLeaseDeletedWhenShardDoesNotExist() throws Exception { + shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(), + ExtendedSequenceNumber.LATEST); + final Lease heldLease = LeaseHelper.createLease(shardInfo.shardId(), "leaseOwner", Collections.singleton("parentShardId")); + + testLeaseDeletedWhenShardDoesNotExist(heldLease); + } + + /** + * Tests ResourceNotFound case when completed lease cleanup is disabled. + * @throws Exception + */ + @Test + public final void testLeaseDeletedWhenShardDoesNotExistAndCleanupCompletedLeaseDisabled() throws Exception { + shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(), + ExtendedSequenceNumber.LATEST); + final Lease heldLease = LeaseHelper.createLease(shardInfo.shardId(), "leaseOwner", Collections.singleton("parentShardId")); + + cleanupLeasesOfCompletedShards = false; + + leaseCleanupManager = new LeaseCleanupManager(leaseCoordinator, kinesis, NULL_METRICS_FACTORY, maxFutureWait, + deletionThreadPool, cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis, completedLeaseCleanupIntervalMillis, + garbageLeaseCleanupIntervalMillis); + + testLeaseDeletedWhenShardDoesNotExist(heldLease); + } + + public void testLeaseDeletedWhenShardDoesNotExist(Lease heldLease) throws Exception { + when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher); + when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.shardId())).thenReturn(heldLease); + when(kinesis.getShardIterator(any(GetShardIteratorRequest.class))).thenThrow(ResourceNotFoundException.class); + when(leaseRefresher.getLease(heldLease.leaseKey())).thenReturn(heldLease); + + leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, heldLease, shardInfo)); + leaseCleanupManager.cleanupLeases(); + + verify(leaseRefresher, times(1)).deleteLease(heldLease); + } + + /** + * Tests that if a lease deletion fails, it's re-enqueued for deletion. + */ + @Test + public final void testFailedDeletionsReEnqueued() throws Exception { + shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(), + ExtendedSequenceNumber.LATEST); + + final Lease heldLease = LeaseHelper.createLease(shardInfo.shardId(), "leaseOwner", Collections.singleton("parentShardId")); + + when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher); + when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.shardId())).thenReturn(heldLease); + when(kinesis.getShardIterator(any(GetShardIteratorRequest.class))).thenThrow(Exception.class); + + leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, heldLease, shardInfo)); + leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, heldLease, shardInfo)); + Assert.assertEquals(1, leaseCleanupManager.leasesPendingDeletion()); + } + + /** + * Tests duplicate leases are not enqueued for deletion. + */ + @Test + public final void testNoDuplicateLeasesEnqueued() { + // Disable lease cleanup so that the queue isn't drained while the test is running. + cleanupLeasesOfCompletedShards = false; + leaseCleanupManager = new LeaseCleanupManager(leaseCoordinator, kinesis, + NULL_METRICS_FACTORY, maxFutureWait, deletionThreadPool, cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis, + completedLeaseCleanupIntervalMillis, garbageLeaseCleanupIntervalMillis); + shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(), + ExtendedSequenceNumber.LATEST); + final Lease heldLease = LeaseHelper.createLease(shardInfo.shardId(), "leaseOwner", Collections.singleton("parentShardId")); + + when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher); + when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.shardId())).thenReturn(heldLease); + + // Enqueue the same lease twice. + leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, heldLease, shardInfo)); + Assert.assertEquals(1, leaseCleanupManager.leasesPendingDeletion()); + leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, heldLease, shardInfo)); + Assert.assertEquals(1, leaseCleanupManager.leasesPendingDeletion()); + } + + private final void verifyExpectedDeletedLeasesCompletedShardCase(ShardInfo shardInfo, List childShards, + ExtendedSequenceNumber extendedSequenceNumber, + int expectedDeletedLeases) throws Exception { + verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShards, extendedSequenceNumber, true, expectedDeletedLeases); + } + + private final void verifyExpectedDeletedLeasesCompletedShardCase(ShardInfo shardInfo, List childShards, + ExtendedSequenceNumber extendedSequenceNumber, + boolean childShardLeasesPresent, + int expectedDeletedLeases) throws Exception { + + final Lease lease = LeaseHelper.createLease(shardInfo.shardId(), "leaseOwner", shardInfo.parentShardIds(), + childShards.stream().map(c -> c.shardId()).collect(Collectors.toSet())); + final List childShardLeases = childShards.stream().map(c -> LeaseHelper.createLease( + ShardInfo.getLeaseKey(shardInfo, c.shardId()), "leaseOwner", Collections.singleton(shardInfo.shardId()), + Collections.emptyList(), extendedSequenceNumber)).collect(Collectors.toList()); + + final List parentShardLeases = lease.parentShardIds().stream().map(p -> + LeaseHelper.createLease(ShardInfo.getLeaseKey(shardInfo, p), "leaseOwner", Collections.emptyList(), + Collections.singleton(shardInfo.shardId()), extendedSequenceNumber)).collect(Collectors.toList()); + + when(leaseRefresher.getLease(lease.leaseKey())).thenReturn(lease); + for (Lease parentShardLease : parentShardLeases) { + when(leaseRefresher.getLease(parentShardLease.leaseKey())).thenReturn(parentShardLease); + } + if (childShardLeasesPresent) { + for (Lease childShardLease : childShardLeases) { + when(leaseRefresher.getLease(childShardLease.leaseKey())).thenReturn(childShardLease); + } + } + + GetShardIteratorResponse getShardIteratorResponse = GetShardIteratorResponse.builder() + .shardIterator("123") + .build(); + when(kinesis.getShardIterator(any(GetShardIteratorRequest.class))).thenReturn(CompletableFuture.completedFuture(getShardIteratorResponse)); + + GetRecordsResponse getRecordsResponse = GetRecordsResponse.builder() + .records(Collections.emptyList()) + .childShards(childShards) + .build(); + when(kinesis.getRecords(any(GetRecordsRequest.class))).thenReturn(CompletableFuture.completedFuture(getRecordsResponse)); + + leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(streamIdentifier, lease, shardInfo)); + leaseCleanupManager.cleanupLeases(); + + verify(leaseRefresher, times(expectedDeletedLeases)).deleteLease(any(Lease.class)); + } + + private List childShardsForSplit() { + List parentShards = Arrays.asList(splitParent); + + ChildShard leftChild = ChildShard.builder() + .shardId("leftChild") + .parentShards(parentShards) + .hashKeyRange(ShardObjectHelper.newHashKeyRange("0", "49")) + .build(); + ChildShard rightChild = ChildShard.builder() + .shardId("rightChild") + .parentShards(parentShards) + .hashKeyRange(ShardObjectHelper.newHashKeyRange("50", "99")) + .build(); + + return Arrays.asList(leftChild, rightChild); + } + + private List childShardsForMerge() { + List parentShards = Arrays.asList(mergeParent1, mergeParent2); + + ChildShard child = ChildShard.builder() + .shardId("onlyChild") + .parentShards(parentShards) + .hashKeyRange(ShardObjectHelper.newHashKeyRange("0", "99")) + .build(); + + return Collections.singletonList(child); + } +} diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseHelper.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseHelper.java new file mode 100644 index 00000000..0e10bc48 --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseHelper.java @@ -0,0 +1,44 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package software.amazon.kinesis.leases; + +import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; + +import java.util.Collection; +import java.util.Collections; + +public class LeaseHelper { + + public static Lease createLease(String leaseKey, String leaseOwner, Collection parentShardIds) { + return createLease(leaseKey, leaseOwner, parentShardIds, Collections.emptySet(), ExtendedSequenceNumber.LATEST); + } + + public static Lease createLease(String leaseKey, String leaseOwner, Collection parentShardIds, Collection childShardIds) { + return createLease(leaseKey, leaseOwner, parentShardIds, childShardIds, ExtendedSequenceNumber.LATEST); + } + + public static Lease createLease(String leaseKey, String leaseOwner, Collection parentShardIds, + Collection childShardIds, ExtendedSequenceNumber extendedSequenceNumber) { + Lease lease = new Lease(); + lease.leaseKey(leaseKey); + lease.leaseOwner(leaseOwner); + lease.parentShardIds(parentShardIds); + lease.childShardIds(childShardIds); + lease.checkpoint(extendedSequenceNumber); + + return lease; + } +} diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java index 06e5afc7..235937d0 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java @@ -46,6 +46,7 @@ import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.StreamIdentifier; +import software.amazon.kinesis.leases.LeaseCleanupManager; import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.ShardDetector; @@ -102,6 +103,8 @@ public class ConsumerStatesTest { private ProcessRecordsInput processRecordsInput; @Mock private TaskExecutionListener taskExecutionListener; + @Mock + private LeaseCleanupManager leaseCleanupManager; private long parentShardPollIntervalMillis = 0xCAFE; private boolean cleanupLeasesOfCompletedShards = true; @@ -122,7 +125,7 @@ public class ConsumerStatesTest { taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, listShardsBackoffTimeInMillis, maxListShardsRetryAttempts, shouldCallProcessRecordsEvenForEmptyRecordList, idleTimeInMillis, INITIAL_POSITION_IN_STREAM, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, shardDetector, - new AggregatorUtil(), hierarchicalShardSyncer, metricsFactory); + new AggregatorUtil(), hierarchicalShardSyncer, metricsFactory, leaseCleanupManager); when(shardInfo.shardId()).thenReturn("shardId-000000000000"); when(shardInfo.streamIdentifierSerOpt()).thenReturn(Optional.of(StreamIdentifier.singleStreamInstance(STREAM_NAME).serialize())); consumer = spy(new ShardConsumer(recordsPublisher, executorService, shardInfo, logWarningForTaskAfterMillis, @@ -148,7 +151,7 @@ public class ConsumerStatesTest { assertThat(state.successTransition(), equalTo(ShardConsumerState.INITIALIZING.consumerState())); for (ShutdownReason shutdownReason : ShutdownReason.values()) { assertThat(state.shutdownTransition(shutdownReason), - equalTo(ShardConsumerState.SHUTDOWN_COMPLETE.consumerState())); + equalTo(ShardConsumerState.SHUTTING_DOWN.consumerState())); } assertThat(state.state(), equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS)); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java index f65655db..5920646c 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java @@ -19,7 +19,6 @@ import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; @@ -27,8 +26,6 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.UUID; @@ -41,8 +38,6 @@ import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; import software.amazon.awssdk.services.kinesis.model.ChildShard; -import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange; -import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; @@ -50,13 +45,16 @@ import software.amazon.kinesis.common.StreamIdentifier; import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException; import software.amazon.kinesis.leases.HierarchicalShardSyncer; import software.amazon.kinesis.leases.Lease; +import software.amazon.kinesis.leases.LeaseCleanupManager; import software.amazon.kinesis.leases.LeaseCoordinator; +import software.amazon.kinesis.leases.LeaseHelper; import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardObjectHelper; import software.amazon.kinesis.leases.exceptions.CustomerApplicationException; import software.amazon.kinesis.leases.exceptions.DependencyException; +import software.amazon.kinesis.leases.exceptions.LeasePendingDeletion; import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; @@ -86,6 +84,7 @@ public class ShutdownTaskTest { private boolean ignoreUnexpectedChildShards = false; private ShardInfo shardInfo; private ShutdownTask task; + private StreamIdentifier streamIdentifier = StreamIdentifier.singleStreamInstance("streamName"); @Mock private RecordsPublisher recordsPublisher; @@ -103,6 +102,8 @@ public class ShutdownTaskTest { private HierarchicalShardSyncer hierarchicalShardSyncer; @Mock private ShardRecordProcessor shardRecordProcessor; + @Mock + private LeaseCleanupManager leaseCleanupManager; @Before public void setUp() throws Exception { @@ -119,7 +120,7 @@ public class ShutdownTaskTest { task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, - hierarchicalShardSyncer, NULL_METRICS_FACTORY, constructChildShards()); + hierarchicalShardSyncer, NULL_METRICS_FACTORY, constructChildShards(), streamIdentifier, leaseCleanupManager); } /** @@ -129,8 +130,9 @@ public class ShutdownTaskTest { @Test public final void testCallWhenApplicationDoesNotCheckpoint() throws Exception { when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(new ExtendedSequenceNumber("3298")); - Lease heldLease = createLease("shardId-0", "leaseOwner", Collections.singleton("parentShardId")); + Lease heldLease = LeaseHelper.createLease("shardId-0", "leaseOwner", Collections.singleton("parentShardId"), Collections.emptyList(), ExtendedSequenceNumber.LATEST); when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(heldLease); + when(leaseRefresher.getLease("shardId-0")).thenReturn(heldLease); when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher); when(leaseCoordinator.updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString())).thenReturn(true); @@ -169,13 +171,14 @@ public class ShutdownTaskTest { task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, - hierarchicalShardSyncer, NULL_METRICS_FACTORY, constructChildShards()); + hierarchicalShardSyncer, NULL_METRICS_FACTORY, constructChildShards(), streamIdentifier, leaseCleanupManager); when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END); - Lease heldLease = createLease("shardId-0", "leaseOwner", Collections.singleton("parentShardId")); + Lease heldLease = LeaseHelper.createLease("shardId-0", "leaseOwner", Collections.singleton("parentShardId")); when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(heldLease); when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher); when(leaseCoordinator.updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString())).thenReturn(true); + when(leaseRefresher.getLease("shardId-0")).thenReturn(heldLease); final TaskResult result = task.call(); assertNull(result.getException()); @@ -185,6 +188,7 @@ public class ShutdownTaskTest { verify(leaseCoordinator).updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString()); verify(leaseRefresher, times(2)).createLeaseIfNotExists(Matchers.any(Lease.class)); verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class)); + verify(leaseCleanupManager, times(1)).enqueueForDeletion(any(LeasePendingDeletion.class)); } /** @@ -192,23 +196,24 @@ public class ShutdownTaskTest { * This test is for the scenario that a ShutdownTask is created for detecting a false Shard End. */ @Test - public final void testCallWhenShardNotFound() throws DependencyException, InvalidStateException, ProvisionedThroughputException { + public final void testCallWhenShardNotFound() throws Exception { + final Lease heldLease = LeaseHelper.createLease("shardId-4", "leaseOwner", Collections.emptyList()); shardInfo = new ShardInfo("shardId-4", concurrencyToken, Collections.emptySet(), ExtendedSequenceNumber.LATEST); task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, - hierarchicalShardSyncer, NULL_METRICS_FACTORY, new ArrayList<>()); + hierarchicalShardSyncer, NULL_METRICS_FACTORY, new ArrayList<>(), streamIdentifier, leaseCleanupManager); when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END); when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher); + when(leaseRefresher.getLease("shardId-4")).thenReturn(heldLease); + when(leaseCoordinator.getCurrentlyHeldLease("shardId-4")).thenReturn(heldLease); final TaskResult result = task.call(); assertNull(result.getException()); verify(recordsPublisher).shutdown(); - verify(shardRecordProcessor).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build()); - verify(leaseCoordinator, never()).getCurrentlyHeldLease(shardInfo.shardId()); verify(leaseRefresher, never()).createLeaseIfNotExists(Matchers.any(Lease.class)); verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class)); } @@ -220,11 +225,11 @@ public class ShutdownTaskTest { @Test public final void testCallWhenLeaseLost() throws DependencyException, InvalidStateException, ProvisionedThroughputException { shardInfo = new ShardInfo("shardId-4", concurrencyToken, Collections.emptySet(), - ExtendedSequenceNumber.LATEST); + ExtendedSequenceNumber.LATEST); task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, - LEASE_LOST_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, - ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, - hierarchicalShardSyncer, NULL_METRICS_FACTORY, new ArrayList<>()); + LEASE_LOST_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, + ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, + hierarchicalShardSyncer, NULL_METRICS_FACTORY, new ArrayList<>(), streamIdentifier, leaseCleanupManager); final TaskResult result = task.call(); assertNull(result.getException()); @@ -232,10 +237,9 @@ public class ShutdownTaskTest { verify(shardRecordProcessor, never()).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); verify(shardRecordProcessor).leaseLost(LeaseLostInput.builder().build()); verify(leaseCoordinator, never()).getAssignments(); - verify(leaseRefresher, never()).createLeaseIfNotExists(Matchers.any(Lease.class)); - verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class)); + verify(leaseRefresher, never()).createLeaseIfNotExists(any(Lease.class)); + verify(leaseCoordinator, never()).dropLease(any(Lease.class)); } - /** * Test method for {@link ShutdownTask#taskType()}. */ @@ -262,14 +266,4 @@ public class ShutdownTaskTest { childShards.add(rightChild); return childShards; } - - private Lease createLease(String leaseKey, String leaseOwner, Collection parentShardIds) { - Lease lease = new Lease(); - lease.leaseKey(leaseKey); - lease.leaseOwner(leaseOwner); - lease.parentShardIds(parentShardIds); - - return lease; - } - }