From 6f16b168a4fbd559a4c76956421bfbd9e4c99293 Mon Sep 17 00:00:00 2001 From: Joshua Kim <20001595+joshua-kim@users.noreply.github.com> Date: Mon, 27 Jul 2020 16:08:52 -0400 Subject: [PATCH] Lease cleanup v1.x (#73) * Moving lease cleanup to ShutdownTask. * Introducing LeaseCleanupManager with relevant configs. --- .../lib/worker/ConsumerStates.java | 5 +- .../worker/KinesisClientLibConfiguration.java | 230 ++++++++--- .../lib/worker/KinesisDataFetcher.java | 2 - .../worker/KinesisLeaseCleanupValidator.java | 1 + .../lib/worker/KinesisShardSyncer.java | 181 +-------- .../lib/worker/LeaseCleanupValidator.java | 1 + .../lib/worker/ShardConsumer.java | 60 +++ .../clientlibrary/lib/worker/ShardInfo.java | 2 +- .../lib/worker/ShutdownTask.java | 27 +- .../clientlibrary/lib/worker/Worker.java | 29 +- .../kinesis/leases/LeasePendingDeletion.java | 31 ++ .../leases/impl/LeaseCleanupManager.java | 369 ++++++++++++++++++ .../lib/worker/ConsumerStatesTest.java | 2 +- .../lib/worker/ShardConsumerTest.java | 62 +-- .../lib/worker/ShardObjectHelper.java | 18 +- .../lib/worker/ShardSyncerTest.java | 103 +---- .../lib/worker/ShutdownTaskTest.java | 20 +- .../leases/impl/LeaseCleanupManagerTest.java | 289 ++++++++++++++ .../kinesis/leases/impl/LeaseHelper.java | 44 +++ 19 files changed, 1104 insertions(+), 372 deletions(-) create mode 100644 src/main/java/com/amazonaws/services/kinesis/leases/LeasePendingDeletion.java create mode 100644 src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManager.java create mode 100644 src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManagerTest.java create mode 100644 src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseHelper.java diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java index fc3400e8..5cf55dbf 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStates.java @@ -199,7 +199,7 @@ class ConsumerStates { @Override public ConsumerState shutdownTransition(ShutdownReason shutdownReason) { - return ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState(); + return ShardConsumerState.SHUTTING_DOWN.getConsumerState(); } @Override @@ -531,7 +531,8 @@ class ConsumerStates { consumer.getLeaseCoordinator(), consumer.getTaskBackoffTimeMillis(), consumer.getGetRecordsCache(), consumer.getShardSyncer(), - consumer.getShardSyncStrategy(), consumer.getChildShards()); + consumer.getShardSyncStrategy(), consumer.getChildShards(), + consumer.getLeaseCleanupManager()); } @Override diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java index 86e7a496..52e73b3e 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java @@ -14,6 +14,7 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import java.time.Duration; import java.util.Date; import java.util.Optional; import java.util.Set; @@ -89,6 +90,23 @@ public class KinesisClientLibConfiguration { */ public static final boolean DEFAULT_CLEANUP_LEASES_UPON_SHARDS_COMPLETION = true; + /** + * Interval to run lease cleanup thread in {@link LeaseCleanupManager}. + */ + private static final long DEFAULT_LEASE_CLEANUP_INTERVAL_MILLIS = Duration.ofHours(1).toMillis(); + + /** + * Threshold in millis at which to check if there are any completed leases (leases for shards which have been + * closed as a result of a resharding operation) that need to be cleaned up. + */ + private static final long DEFAULT_COMPLETED_LEASE_CLEANUP_THRESHOLD_MILLIS = Duration.ofMinutes(5).toMillis(); + + /** + * Threshold in millis at which to check if there are any garbage leases (leases for shards which no longer exist + * in the stream) that need to be cleaned up. + */ + private static final long DEFAULT_GARBAGE_LEASE_CLEANUP_THRESHOLD_MILLIS = Duration.ofMinutes(30).toMillis(); + /** * Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures). */ @@ -246,6 +264,9 @@ public class KinesisClientLibConfiguration { private ShardPrioritization shardPrioritization; private long shutdownGraceMillis; private ShardSyncStrategyType shardSyncStrategyType; + private long leaseCleanupIntervalMillis; + private long completedLeaseCleanupThresholdMillis; + private long garbageLeaseCleanupThresholdMillis; @Getter private Optional timeoutInSeconds = Optional.empty(); @@ -284,6 +305,7 @@ public class KinesisClientLibConfiguration { * @param credentialsProvider Provides credentials used to sign AWS requests * @param workerId Used to distinguish different workers/processes of a Kinesis application */ + @Deprecated public KinesisClientLibConfiguration(String applicationName, String streamName, AWSCredentialsProvider credentialsProvider, @@ -303,6 +325,7 @@ public class KinesisClientLibConfiguration { * @param cloudWatchCredentialsProvider Provides credentials used to access CloudWatch * @param workerId Used to distinguish different workers/processes of a Kinesis application */ + @Deprecated public KinesisClientLibConfiguration(String applicationName, String streamName, AWSCredentialsProvider kinesisCredentialsProvider, @@ -373,6 +396,7 @@ public class KinesisClientLibConfiguration { */ // CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES + @Deprecated public KinesisClientLibConfiguration(String applicationName, String streamName, String kinesisEndpoint, @@ -444,6 +468,7 @@ public class KinesisClientLibConfiguration { */ // CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES + @Deprecated public KinesisClientLibConfiguration(String applicationName, String streamName, String kinesisEndpoint, @@ -470,54 +495,14 @@ public class KinesisClientLibConfiguration { String regionName, long shutdownGraceMillis, BillingMode billingMode) { - // Check following values are greater than zero - checkIsValuePositive("FailoverTimeMillis", failoverTimeMillis); - checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis); - checkIsValuePositive("ParentShardPollIntervalMillis", parentShardPollIntervalMillis); - checkIsValuePositive("ShardSyncIntervalMillis", shardSyncIntervalMillis); - checkIsValuePositive("MaxRecords", (long) maxRecords); - checkIsValuePositive("TaskBackoffTimeMillis", taskBackoffTimeMillis); - checkIsValuePositive("MetricsBufferTimeMills", metricsBufferTimeMillis); - checkIsValuePositive("MetricsMaxQueueSize", (long) metricsMaxQueueSize); - checkIsValuePositive("ShutdownGraceMillis", shutdownGraceMillis); - this.applicationName = applicationName; - this.tableName = applicationName; - this.streamName = streamName; - this.kinesisEndpoint = kinesisEndpoint; - this.dynamoDBEndpoint = dynamoDBEndpoint; - this.initialPositionInStream = initialPositionInStream; - this.kinesisCredentialsProvider = kinesisCredentialsProvider; - this.dynamoDBCredentialsProvider = dynamoDBCredentialsProvider; - this.cloudWatchCredentialsProvider = cloudWatchCredentialsProvider; - this.failoverTimeMillis = failoverTimeMillis; - this.maxRecords = maxRecords; - this.idleTimeBetweenReadsInMillis = idleTimeBetweenReadsInMillis; - this.callProcessRecordsEvenForEmptyRecordList = callProcessRecordsEvenForEmptyRecordList; - this.parentShardPollIntervalMillis = parentShardPollIntervalMillis; - this.shardSyncIntervalMillis = shardSyncIntervalMillis; - this.cleanupLeasesUponShardCompletion = cleanupTerminatedShardsBeforeExpiry; - this.workerIdentifier = workerId; - this.kinesisClientConfig = checkAndAppendKinesisClientLibUserAgent(kinesisClientConfig); - this.dynamoDBClientConfig = checkAndAppendKinesisClientLibUserAgent(dynamoDBClientConfig); - this.cloudWatchClientConfig = checkAndAppendKinesisClientLibUserAgent(cloudWatchClientConfig); - this.taskBackoffTimeMillis = taskBackoffTimeMillis; - this.metricsBufferTimeMillis = metricsBufferTimeMillis; - this.metricsMaxQueueSize = metricsMaxQueueSize; - this.metricsLevel = DEFAULT_METRICS_LEVEL; - this.metricsEnabledDimensions = DEFAULT_METRICS_ENABLED_DIMENSIONS; - this.validateSequenceNumberBeforeCheckpointing = validateSequenceNumberBeforeCheckpointing; - this.regionName = regionName; - this.maxLeasesForWorker = DEFAULT_MAX_LEASES_FOR_WORKER; - this.maxLeasesToStealAtOneTime = DEFAULT_MAX_LEASES_TO_STEAL_AT_ONE_TIME; - this.initialLeaseTableReadCapacity = DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY; - this.initialLeaseTableWriteCapacity = DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY; - this.initialPositionInStreamExtended = - InitialPositionInStreamExtended.newInitialPosition(initialPositionInStream); - this.skipShardSyncAtWorkerInitializationIfLeasesExist = DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST; - this.shardSyncStrategyType = DEFAULT_SHARD_SYNC_STRATEGY_TYPE; - this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION; - this.recordsFetcherFactory = new SimpleRecordsFetcherFactory(); - this.billingMode = billingMode; + + this(applicationName, streamName, kinesisEndpoint, dynamoDBEndpoint, initialPositionInStream, kinesisCredentialsProvider, + dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, failoverTimeMillis, workerId, maxRecords, idleTimeBetweenReadsInMillis, + callProcessRecordsEvenForEmptyRecordList, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupTerminatedShardsBeforeExpiry, + kinesisClientConfig, dynamoDBClientConfig, cloudWatchClientConfig, taskBackoffTimeMillis, metricsBufferTimeMillis, + metricsMaxQueueSize, validateSequenceNumberBeforeCheckpointing, regionName, shutdownGraceMillis, billingMode, + new SimpleRecordsFetcherFactory(), DEFAULT_LEASE_CLEANUP_INTERVAL_MILLIS, DEFAULT_COMPLETED_LEASE_CLEANUP_THRESHOLD_MILLIS, + DEFAULT_GARBAGE_LEASE_CLEANUP_THRESHOLD_MILLIS); } /** @@ -556,6 +541,7 @@ public class KinesisClientLibConfiguration { */ // CHECKSTYLE:IGNORE HiddenFieldCheck FOR NEXT 26 LINES // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 26 LINES + @Deprecated public KinesisClientLibConfiguration(String applicationName, String streamName, String kinesisEndpoint, @@ -581,6 +567,91 @@ public class KinesisClientLibConfiguration { boolean validateSequenceNumberBeforeCheckpointing, String regionName, RecordsFetcherFactory recordsFetcherFactory) { + + + this(applicationName, streamName, kinesisEndpoint, dynamoDBEndpoint, initialPositionInStream, kinesisCredentialsProvider, + dynamoDBCredentialsProvider, cloudWatchCredentialsProvider, failoverTimeMillis, workerId, maxRecords, idleTimeBetweenReadsInMillis, + callProcessRecordsEvenForEmptyRecordList, parentShardPollIntervalMillis, shardSyncIntervalMillis, cleanupTerminatedShardsBeforeExpiry, + kinesisClientConfig, dynamoDBClientConfig, cloudWatchClientConfig, taskBackoffTimeMillis, metricsBufferTimeMillis, + metricsMaxQueueSize, validateSequenceNumberBeforeCheckpointing, regionName, 0, DEFAULT_DDB_BILLING_MODE, + recordsFetcherFactory, DEFAULT_LEASE_CLEANUP_INTERVAL_MILLIS, DEFAULT_COMPLETED_LEASE_CLEANUP_THRESHOLD_MILLIS, + DEFAULT_GARBAGE_LEASE_CLEANUP_THRESHOLD_MILLIS); + } + + /** + * @param applicationName Name of the Kinesis application + * By default the application name is included in the user agent string used to make AWS requests. This + * can assist with troubleshooting (e.g. distinguish requests made by separate applications). + * @param streamName Name of the Kinesis stream + * @param kinesisEndpoint Kinesis endpoint + * @param dynamoDBEndpoint DynamoDB endpoint + * @param initialPositionInStream One of LATEST or TRIM_HORIZON. The KinesisClientLibrary will start fetching + * records from that location in the stream when an application starts up for the first time and there + * are no checkpoints. If there are checkpoints, then we start from the checkpoint position. + * @param kinesisCredentialsProvider Provides credentials used to access Kinesis + * @param dynamoDBCredentialsProvider Provides credentials used to access DynamoDB + * @param cloudWatchCredentialsProvider Provides credentials used to access CloudWatch + * @param failoverTimeMillis Lease duration (leases not renewed within this period will be claimed by others) + * @param workerId Used to distinguish different workers/processes of a Kinesis application + * @param maxRecords Max records to read per Kinesis getRecords() call + * @param idleTimeBetweenReadsInMillis Idle time between calls to fetch data from Kinesis + * @param callProcessRecordsEvenForEmptyRecordList Call the IRecordProcessor::processRecords() API even if + * GetRecords returned an empty record list. + * @param parentShardPollIntervalMillis Wait for this long between polls to check if parent shards are done + * @param shardSyncIntervalMillis Time between tasks to sync leases and Kinesis shards + * @param cleanupTerminatedShardsBeforeExpiry Clean up shards we've finished processing (don't wait for expiration + * in Kinesis) + * @param kinesisClientConfig Client Configuration used by Kinesis client + * @param dynamoDBClientConfig Client Configuration used by DynamoDB client + * @param cloudWatchClientConfig Client Configuration used by CloudWatch client + * @param taskBackoffTimeMillis Backoff period when tasks encounter an exception + * @param metricsBufferTimeMillis Metrics are buffered for at most this long before publishing to CloudWatch + * @param metricsMaxQueueSize Max number of metrics to buffer before publishing to CloudWatch + * @param validateSequenceNumberBeforeCheckpointing whether KCL should validate client provided sequence numbers + * with a call to Amazon Kinesis before checkpointing for calls to + * {@link RecordProcessorCheckpointer#checkpoint(String)} + * @param regionName The region name for the service + * @param shutdownGraceMillis Time before gracefully shutdown forcefully terminates + * @param billingMode The DDB Billing mode to set for lease table creation. + * @param recordsFetcherFactory Factory to create the records fetcher to retrieve data from Kinesis for a given shard. + * @param leaseCleanupIntervalMillis Rate at which to run lease cleanup thread in + * {@link com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager} + * @param completedLeaseCleanupThresholdMillis Threshold in millis at which to check if there are any completed leases + * (leases for shards which have been closed as a result of a resharding operation) that need to be cleaned up. + * @param garbageLeaseCleanupThresholdMillis Threshold in millis at which to check if there are any garbage leases + * (leases for shards which no longer exist in the stream) that need to be cleaned up. + */ + public KinesisClientLibConfiguration(String applicationName, + String streamName, + String kinesisEndpoint, + String dynamoDBEndpoint, + InitialPositionInStream initialPositionInStream, + AWSCredentialsProvider kinesisCredentialsProvider, + AWSCredentialsProvider dynamoDBCredentialsProvider, + AWSCredentialsProvider cloudWatchCredentialsProvider, + long failoverTimeMillis, + String workerId, + int maxRecords, + long idleTimeBetweenReadsInMillis, + boolean callProcessRecordsEvenForEmptyRecordList, + long parentShardPollIntervalMillis, + long shardSyncIntervalMillis, + boolean cleanupTerminatedShardsBeforeExpiry, + ClientConfiguration kinesisClientConfig, + ClientConfiguration dynamoDBClientConfig, + ClientConfiguration cloudWatchClientConfig, + long taskBackoffTimeMillis, + long metricsBufferTimeMillis, + int metricsMaxQueueSize, + boolean validateSequenceNumberBeforeCheckpointing, + String regionName, + long shutdownGraceMillis, + BillingMode billingMode, + RecordsFetcherFactory recordsFetcherFactory, + long leaseCleanupIntervalMillis, + long completedLeaseCleanupThresholdMillis, + long garbageLeaseCleanupThresholdMillis) { + // Check following values are greater than zero checkIsValuePositive("FailoverTimeMillis", failoverTimeMillis); checkIsValuePositive("IdleTimeBetweenReadsInMillis", idleTimeBetweenReadsInMillis); @@ -627,7 +698,12 @@ public class KinesisClientLibConfiguration { this.shardSyncStrategyType = DEFAULT_SHARD_SYNC_STRATEGY_TYPE; this.shardPrioritization = DEFAULT_SHARD_PRIORITIZATION; this.recordsFetcherFactory = recordsFetcherFactory; + this.leaseCleanupIntervalMillis = leaseCleanupIntervalMillis; + this.completedLeaseCleanupThresholdMillis = completedLeaseCleanupThresholdMillis; + this.garbageLeaseCleanupThresholdMillis = garbageLeaseCleanupThresholdMillis; this.shutdownGraceMillis = shutdownGraceMillis; + this.billingMode = billingMode; + } // Check if value is positive, otherwise throw an exception @@ -836,6 +912,29 @@ public class KinesisClientLibConfiguration { return cleanupLeasesUponShardCompletion; } + /** + * @return Interval in millis at which to run lease cleanup thread in {@link com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager} + */ + public long leaseCleanupIntervalMillis() { + return leaseCleanupIntervalMillis; + } + + /** + * @return Interval in millis at which to check if there are any completed leases (leases for shards which have been + * closed as a result of a resharding operation) that need to be cleaned up. + */ + public long completedLeaseCleanupThresholdMillis() { + return completedLeaseCleanupThresholdMillis; + } + + /** + * @return Interval in millis at which to check if there are any garbage leases (leases for shards which no longer + * exist in the stream) that need to be cleaned up. + */ + public long garbageLeaseCleanupThresholdMillis() { + return garbageLeaseCleanupThresholdMillis; + } + /** * @return true if we should ignore child shards which have open parents */ @@ -1476,4 +1575,39 @@ public class KinesisClientLibConfiguration { this.maxInitializationAttempts = maxInitializationAttempts; return this; } + + /** + * @param leaseCleanupIntervalMillis Rate at which to run lease cleanup thread in + * {@link com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager} + * @return + */ + public KinesisClientLibConfiguration withLeaseCleanupIntervalMillis(long leaseCleanupIntervalMillis) { + checkIsValuePositive("leaseCleanupIntervalMillis", leaseCleanupIntervalMillis); + this.leaseCleanupIntervalMillis = leaseCleanupIntervalMillis; + return this; + } + + /** + * Threshold in millis at which to check if there are any completed leases (leases for shards which have been + * closed as a result of a resharding operation) that need to be cleaned up. + * @param completedLeaseCleanupThresholdMillis + * @return + */ + public KinesisClientLibConfiguration withCompletedLeaseCleanupThresholdMillis(long completedLeaseCleanupThresholdMillis) { + checkIsValuePositive("completedLeaseCleanupThresholdMillis", completedLeaseCleanupThresholdMillis); + this.completedLeaseCleanupThresholdMillis = completedLeaseCleanupThresholdMillis; + return this; + } + + /** + * Threshold in millis at which to check if there are any garbage leases (leases for shards which no longer exist + * in the stream) that need to be cleaned up. + * @param garbageLeaseCleanupThresholdMillis + * @return + */ + public KinesisClientLibConfiguration withGarbageLeaseCleanupThresholdMillis(long garbageLeaseCleanupThresholdMillis) { + checkIsValuePositive("garbageLeaseCleanupThresholdMillis", garbageLeaseCleanupThresholdMillis); + this.garbageLeaseCleanupThresholdMillis = garbageLeaseCleanupThresholdMillis; + return this; + } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java index c716afa1..ae4e321d 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisDataFetcher.java @@ -17,10 +17,8 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import java.util.Collections; import java.util.Date; import java.util.List; -import java.util.Set; import com.amazonaws.SdkClientException; -import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.model.ChildShard; import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisLeaseCleanupValidator.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisLeaseCleanupValidator.java index 31d8d998..514cfb8c 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisLeaseCleanupValidator.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisLeaseCleanupValidator.java @@ -10,6 +10,7 @@ import java.util.Set; /** * Represents the class that decides if a lease is eligible for cleanup. */ +@Deprecated class KinesisLeaseCleanupValidator implements LeaseCleanupValidator { private static final Log LOG = LogFactory.getLog(KinesisLeaseCleanupValidator.class); diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardSyncer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardSyncer.java index 08543230..1ce1175a 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardSyncer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisShardSyncer.java @@ -17,8 +17,6 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import java.io.Serializable; import java.math.BigInteger; import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; @@ -26,8 +24,6 @@ import java.util.List; import java.util.Map; import java.util.Set; -import com.amazonaws.services.kinesis.leases.impl.Lease; -import com.amazonaws.services.kinesis.leases.impl.LeaseManager; import com.amazonaws.services.kinesis.model.ChildShard; import com.amazonaws.services.kinesis.model.ShardFilter; import com.amazonaws.services.kinesis.model.ShardFilterType; @@ -49,8 +45,6 @@ import com.amazonaws.services.kinesis.metrics.impl.MetricsHelper; import com.amazonaws.services.kinesis.metrics.interfaces.MetricsLevel; import com.amazonaws.services.kinesis.model.Shard; -import javax.annotation.Nullable; - /** * Helper class to sync leases with shards of the Kinesis stream. * It will create new leases/activities when it discovers new Kinesis shards (bootstrap/resharding). @@ -67,11 +61,10 @@ class KinesisShardSyncer implements ShardSyncer { } synchronized void bootstrapShardLeases(IKinesisProxy kinesisProxy, ILeaseManager leaseManager, - InitialPositionInStreamExtended initialPositionInStream, boolean cleanupLeasesOfCompletedShards, - boolean ignoreUnexpectedChildShards) - throws DependencyException, InvalidStateException, ProvisionedThroughputException, - KinesisClientLibIOException { - syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, + InitialPositionInStreamExtended initialPositionInStream, + boolean ignoreUnexpectedChildShards) + throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { + syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, ignoreUnexpectedChildShards); } @@ -94,7 +87,7 @@ class KinesisShardSyncer implements ShardSyncer { boolean ignoreUnexpectedChildShards) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { - syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards); + syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, ignoreUnexpectedChildShards); } /** @@ -117,7 +110,7 @@ class KinesisShardSyncer implements ShardSyncer { boolean cleanupLeasesOfCompletedShards, boolean ignoreUnexpectedChildShards, List latestShards) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { - syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, cleanupLeasesOfCompletedShards, + syncShardLeases(kinesisProxy, leaseManager, initialPositionInStream, ignoreUnexpectedChildShards, latestShards, leaseManager.isLeaseTableEmpty()); } @@ -127,7 +120,6 @@ class KinesisShardSyncer implements ShardSyncer { * @param kinesisProxy * @param leaseManager * @param initialPosition - * @param cleanupLeasesOfCompletedShards * @param ignoreUnexpectedChildShards * @throws DependencyException * @throws InvalidStateException @@ -135,10 +127,10 @@ class KinesisShardSyncer implements ShardSyncer { * @throws KinesisClientLibIOException */ private synchronized void syncShardLeases(IKinesisProxy kinesisProxy, - ILeaseManager leaseManager, InitialPositionInStreamExtended initialPosition, - boolean cleanupLeasesOfCompletedShards, boolean ignoreUnexpectedChildShards) - throws DependencyException, InvalidStateException, ProvisionedThroughputException, - KinesisClientLibIOException { + ILeaseManager leaseManager, + InitialPositionInStreamExtended initialPosition, + boolean ignoreUnexpectedChildShards) + throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { // In the case where the lease table is empty, we want to synchronize the minimal amount of shards possible // based on the given initial position. @@ -148,7 +140,7 @@ class KinesisShardSyncer implements ShardSyncer { ? getShardListAtInitialPosition(kinesisProxy, initialPosition) : getCompleteShardList(kinesisProxy); - syncShardLeases(kinesisProxy, leaseManager, initialPosition, cleanupLeasesOfCompletedShards, + syncShardLeases(kinesisProxy, leaseManager, initialPosition, ignoreUnexpectedChildShards, latestShards, isLeaseTableEmpty); } @@ -158,7 +150,6 @@ class KinesisShardSyncer implements ShardSyncer { * @param kinesisProxy * @param leaseManager * @param initialPosition - * @param cleanupLeasesOfCompletedShards * @param ignoreUnexpectedChildShards * @param latestShards latest snapshot of shards to reuse * @throws DependencyException @@ -170,7 +161,6 @@ class KinesisShardSyncer implements ShardSyncer { private synchronized void syncShardLeases(IKinesisProxy kinesisProxy, ILeaseManager leaseManager, InitialPositionInStreamExtended initialPosition, - boolean cleanupLeasesOfCompletedShards, boolean ignoreUnexpectedChildShards, List latestShards, boolean isLeaseTableEmpty) @@ -218,11 +208,6 @@ class KinesisShardSyncer implements ShardSyncer { trackedLeases.addAll(currentLeases); } trackedLeases.addAll(newLeasesToCreate); - cleanupGarbageLeases(shards, trackedLeases, kinesisProxy, leaseManager); - if (cleanupLeasesOfCompletedShards) { - cleanupLeasesOfFinishedShards(currentLeases, shardIdToShardMap, shardIdToChildShardIdsMap, trackedLeases, - leaseManager); - } } // CHECKSTYLE:ON CyclomaticComplexity @@ -613,150 +598,6 @@ class KinesisShardSyncer implements ShardSyncer { return parentShardIds; } - /** - * Delete leases corresponding to shards that no longer exist in the stream. - * Current scheme: Delete a lease if: - * * the corresponding shard is not present in the list of Kinesis shards, AND - * * the parentShardIds listed in the lease are also not present in the list of Kinesis shards. - * @param shards List of all Kinesis shards (assumed to be a consistent snapshot - when stream is in Active state). - * @param trackedLeases List of - * @param kinesisProxy Kinesis proxy (used to get shard list) - * @param leaseManager - * @throws KinesisClientLibIOException Thrown if we couldn't get a fresh shard list from Kinesis. - * @throws ProvisionedThroughputException - * @throws InvalidStateException - * @throws DependencyException - */ - private void cleanupGarbageLeases(List shards, List trackedLeases, - IKinesisProxy kinesisProxy, ILeaseManager leaseManager) - throws KinesisClientLibIOException, DependencyException, InvalidStateException, - ProvisionedThroughputException { - Set kinesisShards = new HashSet<>(); - for (Shard shard : shards) { - kinesisShards.add(shard.getShardId()); - } - - // Check if there are leases for non-existent shards - List garbageLeases = new ArrayList<>(); - for (KinesisClientLease lease : trackedLeases) { - if (leaseCleanupValidator.isCandidateForCleanup(lease, kinesisShards)) { - garbageLeases.add(lease); - } - } - - if (!garbageLeases.isEmpty()) { - LOG.info("Found " + garbageLeases.size() + " candidate leases for cleanup. Refreshing list of" - + " Kinesis shards to pick up recent/latest shards"); - List currentShardList = getCompleteShardList(kinesisProxy); - Set currentKinesisShardIds = new HashSet<>(); - for (Shard shard : currentShardList) { - currentKinesisShardIds.add(shard.getShardId()); - } - - for (KinesisClientLease lease : garbageLeases) { - if (leaseCleanupValidator.isCandidateForCleanup(lease, currentKinesisShardIds)) { - LOG.info("Deleting lease for shard " + lease.getLeaseKey() - + " as it is not present in Kinesis stream."); - leaseManager.deleteLease(lease); - } - } - } - - } - - /** - * Private helper method. - * Clean up leases for shards that meet the following criteria: - * a/ the shard has been fully processed (checkpoint is set to SHARD_END) - * b/ we've begun processing all the child shards: we have leases for all child shards and their checkpoint is not - * TRIM_HORIZON. - * - * @param currentLeases List of leases we evaluate for clean up - * @param shardIdToShardMap Map of shardId->Shard (assumed to include all Kinesis shards) - * @param shardIdToChildShardIdsMap Map of shardId->childShardIds (assumed to include all Kinesis shards) - * @param trackedLeases List of all leases we are tracking. - * @param leaseManager Lease manager (will be used to delete leases) - * @throws DependencyException - * @throws InvalidStateException - * @throws ProvisionedThroughputException - * @throws KinesisClientLibIOException - */ - private synchronized void cleanupLeasesOfFinishedShards(Collection currentLeases, - Map shardIdToShardMap, Map> shardIdToChildShardIdsMap, - List trackedLeases, ILeaseManager leaseManager) - throws DependencyException, InvalidStateException, ProvisionedThroughputException, - KinesisClientLibIOException { - Set shardIdsOfClosedShards = new HashSet<>(); - List leasesOfClosedShards = new ArrayList<>(); - for (KinesisClientLease lease : currentLeases) { - if (lease.getCheckpoint().equals(ExtendedSequenceNumber.SHARD_END)) { - shardIdsOfClosedShards.add(lease.getLeaseKey()); - leasesOfClosedShards.add(lease); - } - } - - if (!leasesOfClosedShards.isEmpty()) { - assertClosedShardsAreCoveredOrAbsent(shardIdToShardMap, shardIdToChildShardIdsMap, shardIdsOfClosedShards); - Comparator startingSequenceNumberComparator = new StartingSequenceNumberAndShardIdBasedComparator( - shardIdToShardMap); - Collections.sort(leasesOfClosedShards, startingSequenceNumberComparator); - Map trackedLeaseMap = constructShardIdToKCLLeaseMap(trackedLeases); - - for (KinesisClientLease leaseOfClosedShard : leasesOfClosedShards) { - String closedShardId = leaseOfClosedShard.getLeaseKey(); - Set childShardIds = shardIdToChildShardIdsMap.get(closedShardId); - if ((closedShardId != null) && (childShardIds != null) && (!childShardIds.isEmpty())) { - cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager); - } - } - } - } - - /** - * Delete lease for the closed shard. Rules for deletion are: - * a/ the checkpoint for the closed shard is SHARD_END, - * b/ there are leases for all the childShardIds and their checkpoint is NOT TRIM_HORIZON - * Note: This method has package level access solely for testing purposes. - * - * @param closedShardId Identifies the closed shard - * @param childShardIds ShardIds of children of the closed shard - * @param trackedLeases shardId->KinesisClientLease map with all leases we are tracking (should not be null) - * @param leaseManager - * @throws ProvisionedThroughputException - * @throws InvalidStateException - * @throws DependencyException - */ - synchronized void cleanupLeaseForClosedShard(String closedShardId, Set childShardIds, - Map trackedLeases, ILeaseManager leaseManager) - throws DependencyException, InvalidStateException, ProvisionedThroughputException { - KinesisClientLease leaseForClosedShard = trackedLeases.get(closedShardId); - List childShardLeases = new ArrayList<>(); - - for (String childShardId : childShardIds) { - KinesisClientLease childLease = trackedLeases.get(childShardId); - if (childLease != null) { - childShardLeases.add(childLease); - } - } - - if ((leaseForClosedShard != null) && (leaseForClosedShard.getCheckpoint() - .equals(ExtendedSequenceNumber.SHARD_END)) && (childShardLeases.size() == childShardIds.size())) { - boolean okayToDelete = true; - for (KinesisClientLease lease : childShardLeases) { - if (lease.getCheckpoint().equals(ExtendedSequenceNumber.TRIM_HORIZON)) { - okayToDelete = false; - break; - } - } - - if (okayToDelete) { - LOG.info("Deleting lease for shard " + leaseForClosedShard.getLeaseKey() - + " as it has been completely processed and processing of child shards has begun."); - leaseManager.deleteLease(leaseForClosedShard); - } - } - } - /** * Helper method to create a new KinesisClientLease POJO for a shard. * Note: Package level access only for testing purposes diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/LeaseCleanupValidator.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/LeaseCleanupValidator.java index afb37112..7e07587a 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/LeaseCleanupValidator.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/LeaseCleanupValidator.java @@ -8,6 +8,7 @@ import java.util.Set; /** * Represents the class that decides if a lease is eligible for cleanup. */ +@Deprecated public interface LeaseCleanupValidator { /** diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java index f5513d3e..71f8a6bc 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumer.java @@ -18,9 +18,11 @@ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; import java.util.List; import java.util.Optional; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; +import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager; import com.amazonaws.services.kinesis.model.ChildShard; import com.amazonaws.util.CollectionUtils; import org.apache.commons.logging.Log; @@ -55,6 +57,7 @@ class ShardConsumer { private final IMetricsFactory metricsFactory; private final KinesisClientLibLeaseCoordinator leaseCoordinator; private ICheckpoint checkpoint; + private LeaseCleanupManager leaseCleanupManager; // Backoff time when polling to check if application has finished processing parent shards private final long parentShardPollIntervalMillis; private final boolean cleanupLeasesOfCompletedShards; @@ -112,6 +115,7 @@ class ShardConsumer { * @param shardSyncer shardSyncer instance used to check and create new leases */ // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES + @Deprecated ShardConsumer(ShardInfo shardInfo, StreamConfig streamConfig, ICheckpoint checkpoint, @@ -124,6 +128,7 @@ class ShardConsumer { long backoffTimeMillis, boolean skipShardSyncAtWorkerInitializationIfLeasesExist, KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) { + this(shardInfo, streamConfig, checkpoint, @@ -156,6 +161,7 @@ class ShardConsumer { * @param shardSyncer shardSyncer instance used to check and create new leases */ // CHECKSTYLE:IGNORE ParameterNumber FOR NEXT 10 LINES + @Deprecated ShardConsumer(ShardInfo shardInfo, StreamConfig streamConfig, ICheckpoint checkpoint, @@ -215,7 +221,9 @@ class ShardConsumer { * @param maxGetRecordsThreadPool max number of threads in the getRecords thread pool * @param config Kinesis library configuration * @param shardSyncer shardSyncer instance used to check and create new leases + * @param leaseCleanupManager used to clean up leases in lease table. */ + @Deprecated ShardConsumer(ShardInfo shardInfo, StreamConfig streamConfig, ICheckpoint checkpoint, @@ -232,6 +240,53 @@ class ShardConsumer { Optional retryGetRecordsInSeconds, Optional maxGetRecordsThreadPool, KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy) { + + this(shardInfo, streamConfig, checkpoint, recordProcessor, recordProcessorCheckpointer, leaseCoordinator, + parentShardPollIntervalMillis, cleanupLeasesOfCompletedShards, executorService, metricsFactory, + backoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, kinesisDataFetcher, retryGetRecordsInSeconds, + maxGetRecordsThreadPool, config, shardSyncer, shardSyncStrategy, LeaseCleanupManager.createOrGetInstance(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(), + Executors.newSingleThreadScheduledExecutor(), metricsFactory, config.shouldCleanupLeasesUponShardCompletion(), + config.leaseCleanupIntervalMillis(), config.completedLeaseCleanupThresholdMillis(), + config.garbageLeaseCleanupThresholdMillis(), config.getMaxRecords())); + } + + /** + * @param shardInfo Shard information + * @param streamConfig Stream Config to use + * @param checkpoint Checkpoint tracker + * @param recordProcessor Record processor used to process the data records for the shard + * @param recordProcessorCheckpointer RecordProcessorCheckpointer to use to checkpoint progress + * @param leaseCoordinator Used to manage leases for current worker + * @param parentShardPollIntervalMillis Wait for this long if parent shards are not done (or we get an exception) + * @param cleanupLeasesOfCompletedShards clean up the leases of completed shards + * @param executorService ExecutorService used to execute process tasks for this shard + * @param metricsFactory IMetricsFactory used to construct IMetricsScopes for this shard + * @param backoffTimeMillis backoff interval when we encounter exceptions + * @param skipShardSyncAtWorkerInitializationIfLeasesExist Skip sync at init if lease exists + * @param kinesisDataFetcher KinesisDataFetcher to fetch data from Kinesis streams. + * @param retryGetRecordsInSeconds time in seconds to wait before the worker retries to get a record + * @param maxGetRecordsThreadPool max number of threads in the getRecords thread pool + * @param config Kinesis library configuration + * @param shardSyncer shardSyncer instance used to check and create new leases + * @param leaseCleanupManager used to clean up leases in lease table. + */ + ShardConsumer(ShardInfo shardInfo, + StreamConfig streamConfig, + ICheckpoint checkpoint, + IRecordProcessor recordProcessor, + RecordProcessorCheckpointer recordProcessorCheckpointer, + KinesisClientLibLeaseCoordinator leaseCoordinator, + long parentShardPollIntervalMillis, + boolean cleanupLeasesOfCompletedShards, + ExecutorService executorService, + IMetricsFactory metricsFactory, + long backoffTimeMillis, + boolean skipShardSyncAtWorkerInitializationIfLeasesExist, + KinesisDataFetcher kinesisDataFetcher, + Optional retryGetRecordsInSeconds, + Optional maxGetRecordsThreadPool, + KinesisClientLibConfiguration config, ShardSyncer shardSyncer, ShardSyncStrategy shardSyncStrategy, + LeaseCleanupManager leaseCleanupManager) { this.shardInfo = shardInfo; this.streamConfig = streamConfig; this.checkpoint = checkpoint; @@ -251,6 +306,7 @@ class ShardConsumer { this.getShardInfo().getShardId(), this.metricsFactory, this.config.getMaxRecords()); this.shardSyncer = shardSyncer; this.shardSyncStrategy = shardSyncStrategy; + this.leaseCleanupManager = leaseCleanupManager; } /** @@ -529,4 +585,8 @@ class ShardConsumer { ShardSyncStrategy getShardSyncStrategy() { return shardSyncStrategy; } + + LeaseCleanupManager getLeaseCleanupManager() { + return leaseCleanupManager; + } } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardInfo.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardInfo.java index 24b30e69..3b04b791 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardInfo.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardInfo.java @@ -86,7 +86,7 @@ public class ShardInfo { * * @return a list of shardId's that are parents of this shard, or empty if the shard has no parents. */ - protected List getParentShardIds() { + public List getParentShardIds() { return new LinkedList(parentShardIds); } diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java index 71cf3b9d..300f1ae3 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java @@ -14,9 +14,11 @@ */ package com.amazonaws.services.kinesis.clientlibrary.lib.worker; +import com.amazonaws.services.kinesis.leases.LeasePendingDeletion; import com.amazonaws.services.kinesis.leases.exceptions.DependencyException; import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException; +import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager; import com.amazonaws.services.kinesis.model.ChildShard; import com.amazonaws.util.CollectionUtils; import org.apache.commons.logging.Log; @@ -60,6 +62,7 @@ class ShutdownTask implements ITask { private final ShardSyncer shardSyncer; private final ShardSyncStrategy shardSyncStrategy; private final List childShards; + private final LeaseCleanupManager leaseCleanupManager; /** * Constructor. @@ -76,7 +79,8 @@ class ShutdownTask implements ITask { KinesisClientLibLeaseCoordinator leaseCoordinator, long backoffTimeMillis, GetRecordsCache getRecordsCache, ShardSyncer shardSyncer, - ShardSyncStrategy shardSyncStrategy, List childShards) { + ShardSyncStrategy shardSyncStrategy, List childShards, + LeaseCleanupManager leaseCleanupManager) { this.shardInfo = shardInfo; this.recordProcessor = recordProcessor; this.recordProcessorCheckpointer = recordProcessorCheckpointer; @@ -91,6 +95,7 @@ class ShutdownTask implements ITask { this.shardSyncer = shardSyncer; this.shardSyncStrategy = shardSyncStrategy; this.childShards = childShards; + this.leaseCleanupManager = leaseCleanupManager; } /* @@ -153,13 +158,29 @@ class ShutdownTask implements ITask { recordProcessor.shutdown(shutdownInput); ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.getLastCheckpointValue(); + final boolean successfullyCheckpointedShardEnd = lastCheckpointValue.equals(ExtendedSequenceNumber.SHARD_END); + if (localReason == ShutdownReason.TERMINATE) { - if ((lastCheckpointValue == null) - || (!lastCheckpointValue.equals(ExtendedSequenceNumber.SHARD_END))) { + if ((lastCheckpointValue == null) || (!successfullyCheckpointedShardEnd)) { throw new IllegalArgumentException("Application didn't checkpoint at end of shard " + shardInfo.getShardId() + ". Application must checkpoint upon shutdown. " + "See IRecordProcessor.shutdown javadocs for more information."); } + + // Check if either the shard end ddb persist is successful or + // if childshards is empty. When child shards is empty then either it is due to + // completed shard being reprocessed or we got RNF from service. + // For these cases enqueue the lease for deletion. + if (successfullyCheckpointedShardEnd || CollectionUtils.isNullOrEmpty(childShards)) { + final KinesisClientLease currentLease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId()); + final LeasePendingDeletion leasePendingDeletion = new LeasePendingDeletion(currentLease, shardInfo); + + if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) { + leaseCleanupManager.enqueueForDeletion(leasePendingDeletion); + } + + //TODO: Add shard end checkpointing here. + } } LOG.debug("Shutting down retrieval strategy."); getRecordsCache.shutdown(); diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java index f99d24d0..5b4f31e8 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker.java @@ -39,6 +39,7 @@ import com.amazonaws.services.kinesis.leases.exceptions.DependencyException; import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException; import com.amazonaws.services.kinesis.leases.impl.GenericLeaseSelector; +import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager; import com.amazonaws.services.kinesis.leases.impl.LeaseCoordinator; import com.amazonaws.services.kinesis.leases.impl.LeaseRenewer; import com.amazonaws.services.kinesis.leases.impl.LeaseTaker; @@ -157,6 +158,8 @@ public class Worker implements Runnable { private ShardSyncStrategy shardSyncStrategy; private PeriodicShardSyncManager leaderElectedPeriodicShardSyncManager; + private final LeaseCleanupManager leaseCleanupManager; + /** * Constructor. * @@ -573,6 +576,10 @@ public class Worker implements Runnable { this.workerStateChangeListener = workerStateChangeListener; workerStateChangeListener.onWorkerStateChange(WorkerStateChangeListener.WorkerState.CREATED); createShardSyncStrategy(config.getShardSyncStrategyType(), leaderDecider, periodicShardSyncManager); + this.leaseCleanupManager = LeaseCleanupManager.createOrGetInstance(streamConfig.getStreamProxy(), leaseCoordinator.getLeaseManager(), + Executors.newSingleThreadScheduledExecutor(), metricsFactory, cleanupLeasesUponShardCompletion, + config.leaseCleanupIntervalMillis(), config.completedLeaseCleanupThresholdMillis(), + config.garbageLeaseCleanupThresholdMillis(), config.getMaxRecords()); } /** @@ -726,6 +733,13 @@ public class Worker implements Runnable { } } + if (!leaseCleanupManager.isRunning()) { + LOG.info("Starting LeaseCleanupManager."); + leaseCleanupManager.start(); + } else { + LOG.info("LeaseCleanupManager is already running. No need to start it."); + } + // If we reach this point, then we either skipped the lease sync or did not have any exception for the // shard sync in the previous attempt. if (!leaseCoordinator.isRunning()) { @@ -1111,12 +1125,21 @@ public class Worker implements Runnable { } protected ShardConsumer buildConsumer(ShardInfo shardInfo, IRecordProcessorFactory processorFactory) { - IRecordProcessor recordProcessor = processorFactory.createProcessor(); + final IRecordProcessor recordProcessor = processorFactory.createProcessor(); + final RecordProcessorCheckpointer recordProcessorCheckpointer = new RecordProcessorCheckpointer( + shardInfo, + checkpointTracker, + new SequenceNumberValidator( + streamConfig.getStreamProxy(), + shardInfo.getShardId(), + streamConfig.shouldValidateSequenceNumberBeforeCheckpointing()), + metricsFactory); return new ShardConsumer(shardInfo, streamConfig, checkpointTracker, recordProcessor, + recordProcessorCheckpointer, leaseCoordinator, parentShardPollIntervalMillis, cleanupLeasesUponShardCompletion, @@ -1124,9 +1147,11 @@ public class Worker implements Runnable { metricsFactory, taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, + new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo), retryGetRecordsInSeconds, maxGetRecordsThreadPool, - config, shardSyncer, shardSyncStrategy); + config, shardSyncer, shardSyncStrategy, + leaseCleanupManager); } /** diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/LeasePendingDeletion.java b/src/main/java/com/amazonaws/services/kinesis/leases/LeasePendingDeletion.java new file mode 100644 index 00000000..4f78db3a --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/leases/LeasePendingDeletion.java @@ -0,0 +1,31 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.amazonaws.services.kinesis.leases; + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo; +import com.amazonaws.services.kinesis.leases.impl.KinesisClientLease; +import lombok.Value; +import lombok.experimental.Accessors; + +/** + * Helper class for cleaning up leases. + */ +@Accessors(fluent=true) +@Value +public class LeasePendingDeletion { + private final KinesisClientLease lease; + private final ShardInfo shardInfo; +} + diff --git a/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManager.java b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManager.java new file mode 100644 index 00000000..af02f588 --- /dev/null +++ b/src/main/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManager.java @@ -0,0 +1,369 @@ +package com.amazonaws.services.kinesis.leases.impl; + +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo; +import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; +import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; +import com.amazonaws.services.kinesis.leases.LeasePendingDeletion; +import com.amazonaws.services.kinesis.leases.exceptions.DependencyException; +import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; +import com.amazonaws.services.kinesis.leases.exceptions.ProvisionedThroughputException; +import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; +import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; +import com.amazonaws.services.kinesis.model.ResourceNotFoundException; +import com.amazonaws.services.kinesis.model.ShardIteratorType; +import com.amazonaws.util.CollectionUtils; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Stopwatch; +import lombok.AccessLevel; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NonNull; +import lombok.RequiredArgsConstructor; +import lombok.Value; +import lombok.experimental.Accessors; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import java.util.HashSet; +import java.util.Objects; +import java.util.Optional; +import java.util.Queue; +import java.util.Set; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +/** + * Helper class to cleanup of any expired/closed shard leases. It will cleanup leases periodically as defined by + * {@link KinesisClientLibConfiguration#leaseCleanupIntervalMillis()} upon worker shutdown, following a re-shard event or + * a shard expiring from the service. + */ +@RequiredArgsConstructor(access= AccessLevel.PACKAGE) +@EqualsAndHashCode +public class LeaseCleanupManager { + @NonNull + private IKinesisProxy kinesisProxy; + @NonNull + private final ILeaseManager leaseManager; + @NonNull + private final ScheduledExecutorService deletionThreadPool; + @NonNull + private final IMetricsFactory metricsFactory; + private final boolean cleanupLeasesUponShardCompletion; + private final long leaseCleanupIntervalMillis; + private final long completedLeaseCleanupIntervalMillis; + private final long garbageLeaseCleanupIntervalMillis; + private final int maxRecords; + + private final Stopwatch completedLeaseStopwatch = Stopwatch.createUnstarted(); + private final Stopwatch garbageLeaseStopwatch = Stopwatch.createUnstarted(); + private final Queue deletionQueue = new ConcurrentLinkedQueue<>(); + + private static final long INITIAL_DELAY = 0L; + private static final Log LOG = LogFactory.getLog(LeaseCleanupManager.class); + + @Getter + private volatile boolean isRunning = false; + + private static LeaseCleanupManager instance; + + /** + * Factory method to return a singleton instance of {@link LeaseCleanupManager}. + * @param kinesisProxy + * @param leaseManager + * @param deletionThreadPool + * @param metricsFactory + * @param cleanupLeasesUponShardCompletion + * @param leaseCleanupIntervalMillis + * @param completedLeaseCleanupIntervalMillis + * @param garbageLeaseCleanupIntervalMillis + * @param maxRecords + * @return + */ + public static LeaseCleanupManager createOrGetInstance(IKinesisProxy kinesisProxy, ILeaseManager leaseManager, + ScheduledExecutorService deletionThreadPool, IMetricsFactory metricsFactory, + boolean cleanupLeasesUponShardCompletion, long leaseCleanupIntervalMillis, + long completedLeaseCleanupIntervalMillis, long garbageLeaseCleanupIntervalMillis, + int maxRecords) { + if (instance == null) { + instance = new LeaseCleanupManager(kinesisProxy, leaseManager, deletionThreadPool, metricsFactory, cleanupLeasesUponShardCompletion, + leaseCleanupIntervalMillis, completedLeaseCleanupIntervalMillis, garbageLeaseCleanupIntervalMillis, maxRecords); + } + + return instance; + } + + /** + * Starts the lease cleanup thread, which is scheduled periodically as specified by + * {@link LeaseCleanupManager#leaseCleanupIntervalMillis} + */ + public void start() { + LOG.debug("Starting lease cleanup thread."); + isRunning = true; + completedLeaseStopwatch.start(); + garbageLeaseStopwatch.start(); + + deletionThreadPool.scheduleAtFixedRate(new LeaseCleanupThread(), INITIAL_DELAY, leaseCleanupIntervalMillis, + TimeUnit.MILLISECONDS); + } + + /** + * Enqueues a lease for deletion without check for duplicate entry. Use {@link #isEnqueuedForDeletion} + * for checking the duplicate entries. + * @param leasePendingDeletion + */ + public void enqueueForDeletion(LeasePendingDeletion leasePendingDeletion) { + final KinesisClientLease lease = leasePendingDeletion.lease(); + if (lease == null) { + LOG.warn("Cannot enqueue lease " + lease.getLeaseKey() + " for deferred deletion - instance doesn't hold " + + "the lease for that shard."); + } else { + LOG.debug("Enqueuing lease " + lease.getLeaseKey() + " for deferred deletion."); + if (!deletionQueue.add(leasePendingDeletion)) { + LOG.warn("Unable to enqueue lease " + lease.getLeaseKey() + " for deletion."); + } + } + } + + /** + * Check if lease was already enqueued for deletion. + * //TODO: Optimize verifying duplicate entries https://sim.amazon.com/issues/KinesisLTR-597. + * @param leasePendingDeletion + * @return true if enqueued for deletion; false otherwise. + */ + public boolean isEnqueuedForDeletion(LeasePendingDeletion leasePendingDeletion) { + return deletionQueue.contains(leasePendingDeletion); + } + + /** + * Returns how many leases are currently waiting in the queue pending deletion. + * @return number of leases pending deletion. + */ + private int leasesPendingDeletion() { + return deletionQueue.size(); + } + + private boolean timeToCheckForCompletedShard() { + return completedLeaseStopwatch.elapsed(TimeUnit.MILLISECONDS) >= completedLeaseCleanupIntervalMillis; + } + + private boolean timeToCheckForGarbageShard() { + return garbageLeaseStopwatch.elapsed(TimeUnit.MILLISECONDS) >= garbageLeaseCleanupIntervalMillis; + } + + public LeaseCleanupResult cleanupLease(LeasePendingDeletion leasePendingDeletion, + boolean timeToCheckForCompletedShard, boolean timeToCheckForGarbageShard) + throws DependencyException, ProvisionedThroughputException, InvalidStateException { + final KinesisClientLease lease = leasePendingDeletion.lease(); + final ShardInfo shardInfo = leasePendingDeletion.shardInfo(); + + boolean cleanedUpCompletedLease = false; + boolean cleanedUpGarbageLease = false; + boolean alreadyCheckedForGarbageCollection = false; + boolean wereChildShardsPresent = false; + boolean wasResourceNotFound = false; + + try { + if (cleanupLeasesUponShardCompletion && timeToCheckForCompletedShard) { + final KinesisClientLease leaseFromDDB = leaseManager.getLease(shardInfo.getShardId()); + if(leaseFromDDB != null) { + Set childShardKeys = leaseFromDDB.getChildShardIds(); + if (CollectionUtils.isNullOrEmpty(childShardKeys)) { + try { + childShardKeys = getChildShardsFromService(shardInfo); + + if (CollectionUtils.isNullOrEmpty(childShardKeys)) { + LOG.error("No child shards returned from service for shard " + shardInfo.getShardId()); + } else { + wereChildShardsPresent = true; + updateLeaseWithChildShards(leasePendingDeletion, childShardKeys); + } + } catch (ResourceNotFoundException e) { + throw e; + } finally { + alreadyCheckedForGarbageCollection = true; + } + } else { + wereChildShardsPresent = true; + } + try { + cleanedUpCompletedLease = cleanupLeaseForCompletedShard(lease, childShardKeys); + } catch (Exception e) { + // Suppressing the exception here, so that we can attempt for garbage cleanup. + LOG.warn("Unable to cleanup lease for shard " + shardInfo.getShardId()); + } + } else { + LOG.info("Lease not present in lease table while cleaning the shard " + shardInfo.getShardId()); + cleanedUpCompletedLease = true; + } + } + + if (!alreadyCheckedForGarbageCollection && timeToCheckForGarbageShard) { + try { + wereChildShardsPresent = !CollectionUtils + .isNullOrEmpty(getChildShardsFromService(shardInfo)); + } catch (ResourceNotFoundException e) { + throw e; + } + } + } catch (ResourceNotFoundException e) { + wasResourceNotFound = true; + cleanedUpGarbageLease = cleanupLeaseForGarbageShard(lease); + } + + return new LeaseCleanupResult(cleanedUpCompletedLease, cleanedUpGarbageLease, wereChildShardsPresent, + wasResourceNotFound); + } + + private Set getChildShardsFromService(ShardInfo shardInfo) { + final String iterator = kinesisProxy.getIterator(shardInfo.getShardId(), ShardIteratorType.LATEST.toString()); + return kinesisProxy.get(iterator, maxRecords).getChildShards().stream().map(c -> c.getShardId()).collect(Collectors.toSet()); + } + + + // A lease that ended with SHARD_END from ResourceNotFoundException is safe to delete if it no longer exists in the + // stream (known explicitly from ResourceNotFound being thrown when processing this shard), + private boolean cleanupLeaseForGarbageShard(KinesisClientLease lease) throws DependencyException, ProvisionedThroughputException, InvalidStateException { + LOG.info("Deleting lease " + lease.getLeaseKey() + " as it is not present in the stream."); + leaseManager.deleteLease(lease); + return true; + } + + private boolean allParentShardLeasesDeleted(KinesisClientLease lease) throws DependencyException, ProvisionedThroughputException, InvalidStateException { + for (String parentShard : lease.getParentShardIds()) { + final KinesisClientLease parentLease = leaseManager.getLease(parentShard); + + if (parentLease != null) { + LOG.warn("Lease " + lease.getLeaseKey() + " has a parent lease " + parentLease.getLeaseKey() + + " which is still present in the lease table, skipping deletion for this lease."); + return false; + } + } + return true; + } + + // We should only be deleting the current shard's lease if + // 1. All of its children are currently being processed, i.e their checkpoint is not TRIM_HORIZON or AT_TIMESTAMP. + // 2. Its parent shard lease(s) have already been deleted. + private boolean cleanupLeaseForCompletedShard(KinesisClientLease lease, Set childShardLeaseKeys) + throws DependencyException, ProvisionedThroughputException, InvalidStateException, IllegalStateException { + final Set processedChildShardLeaseKeys = new HashSet<>(); + + for (String childShardLeaseKey : childShardLeaseKeys) { + final KinesisClientLease childShardLease = Optional.ofNullable( + leaseManager.getLease(childShardLeaseKey)) + .orElseThrow(() -> new IllegalStateException( + "Child lease " + childShardLeaseKey + " for completed shard not found in " + + "lease table - not cleaning up lease " + lease)); + + if (!childShardLease.getCheckpoint().equals(ExtendedSequenceNumber.TRIM_HORIZON) && !childShardLease + .getCheckpoint().equals(ExtendedSequenceNumber.AT_TIMESTAMP)) { + processedChildShardLeaseKeys.add(childShardLease.getLeaseKey()); + } + } + + if (!allParentShardLeasesDeleted(lease) || !Objects.equals(childShardLeaseKeys, processedChildShardLeaseKeys)) { + return false; + } + + LOG.info("Deleting lease " + lease.getLeaseKey() + " as it has been completely processed and processing of child shard(s) has begun."); + leaseManager.deleteLease(lease); + + return true; + } + + private void updateLeaseWithChildShards(LeasePendingDeletion leasePendingDeletion, Set childShardKeys) + throws DependencyException, ProvisionedThroughputException, InvalidStateException { + final KinesisClientLease updatedLease = leasePendingDeletion.lease(); + updatedLease.setChildShardIds(childShardKeys); + + leaseManager.updateLease(updatedLease); + } + + @VisibleForTesting + void cleanupLeases() { + LOG.info("Number of pending leases to clean before the scan : " + leasesPendingDeletion()); + if (deletionQueue.isEmpty()) { + LOG.debug("No leases pending deletion."); + } else if (timeToCheckForCompletedShard() | timeToCheckForGarbageShard()) { + final Queue failedDeletions = new ConcurrentLinkedQueue<>(); + boolean completedLeaseCleanedUp = false; + boolean garbageLeaseCleanedUp = false; + + LOG.debug("Attempting to clean up " + deletionQueue.size() + " lease(s)."); + + while (!deletionQueue.isEmpty()) { + final LeasePendingDeletion leasePendingDeletion = deletionQueue.poll(); + final String leaseKey = leasePendingDeletion.lease().getLeaseKey(); + boolean deletionSucceeded = false; + try { + final LeaseCleanupResult leaseCleanupResult = cleanupLease(leasePendingDeletion, + timeToCheckForCompletedShard(), timeToCheckForGarbageShard()); + completedLeaseCleanedUp |= leaseCleanupResult.cleanedUpCompletedLease(); + garbageLeaseCleanedUp |= leaseCleanupResult.cleanedUpGarbageLease(); + + if (leaseCleanupResult.leaseCleanedUp()) { + LOG.debug("Successfully cleaned up lease " + leaseKey); + deletionSucceeded = true; + } else { + LOG.warn("Unable to clean up lease " + leaseKey + " due to " + leaseCleanupResult); + } + } catch (Exception e) { + LOG.error("Failed to cleanup lease " + leaseKey + ". Will re-enqueue for deletion and retry on next " + + "scheduled execution.", e); + } + if (!deletionSucceeded) { + LOG.debug("Did not cleanup lease " + leaseKey + ". Re-enqueueing for deletion."); + failedDeletions.add(leasePendingDeletion); + } + } + if (completedLeaseCleanedUp) { + LOG.debug("At least one completed lease was cleaned up - restarting interval"); + completedLeaseStopwatch.reset().start(); + } + if (garbageLeaseCleanedUp) { + LOG.debug("At least one garbage lease was cleaned up - restarting interval"); + garbageLeaseStopwatch.reset().start(); + } + deletionQueue.addAll(failedDeletions); + + LOG.info("Number of pending leases to clean after the scan : " + leasesPendingDeletion()); + } + } + + private class LeaseCleanupThread implements Runnable { + @Override + public void run() { + cleanupLeases(); + } + } + + @Value + @Accessors(fluent=true) + public static class LeaseCleanupResult { + boolean cleanedUpCompletedLease; + boolean cleanedUpGarbageLease; + boolean wereChildShardsPresent; + boolean wasResourceNotFound; + + public boolean leaseCleanedUp() { + return cleanedUpCompletedLease | cleanedUpGarbageLease; + } + } +} diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java index d9160f0f..6a5e76b9 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ConsumerStatesTest.java @@ -124,7 +124,7 @@ public class ConsumerStatesTest { assertThat(state.successTransition(), equalTo(ShardConsumerState.INITIALIZING.getConsumerState())); for (ShutdownReason shutdownReason : ShutdownReason.values()) { assertThat(state.shutdownTransition(shutdownReason), - equalTo(ShardConsumerState.SHUTDOWN_COMPLETE.getConsumerState())); + equalTo(ShardConsumerState.SHUTTING_DOWN.getConsumerState())); } assertThat(state.getState(), equalTo(ShardConsumerState.WAITING_ON_PARENT_SHARDS)); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java index f040c6a6..67bf3697 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardConsumerTest.java @@ -54,6 +54,8 @@ import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; +import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager; +import com.amazonaws.services.kinesis.leases.impl.LeaseManager; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.hamcrest.Description; @@ -138,6 +140,7 @@ public class ShardConsumerTest { recordsFetcherFactory = spy(new SimpleRecordsFetcherFactory()); when(config.getRecordsFetcherFactory()).thenReturn(recordsFetcherFactory); when(config.getLogWarningForTaskAfterMillis()).thenReturn(Optional.empty()); + when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); } /** @@ -475,6 +478,8 @@ public class ShardConsumerTest { when(leaseCoordinator.getLeaseManager()).thenReturn(leaseManager); when(leaseManager.getLease(eq(parentShardId))).thenReturn(parentLease); when(parentLease.getCheckpoint()).thenReturn(ExtendedSequenceNumber.TRIM_HORIZON); + when(recordProcessorCheckpointer.getLastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END); + when(streamConfig.getStreamProxy()).thenReturn(streamProxy); final ShardConsumer consumer = new ShardConsumer(shardInfo, @@ -507,6 +512,9 @@ public class ShardConsumerTest { assertThat(consumer.getShutdownReason(), equalTo(ShutdownReason.REQUESTED)); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); consumer.consumeShard(); + assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTTING_DOWN))); + Thread.sleep(50L); + consumer.beginShutdown(); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.SHUTDOWN_COMPLETE))); assertThat(consumer.isShutdown(), is(true)); verify(shutdownNotification, times(1)).shutdownComplete(); @@ -704,19 +712,19 @@ public class ShardConsumerTest { StreamConfig streamConfig = new StreamConfig(fileBasedProxy, - maxRecords, - idleTimeMS, - callProcessRecordsForEmptyRecordList, - skipCheckpointValidationValue, INITIAL_POSITION_LATEST); + maxRecords, + idleTimeMS, + callProcessRecordsForEmptyRecordList, + skipCheckpointValidationValue, INITIAL_POSITION_LATEST); ShardInfo shardInfo = new ShardInfo(streamShardId, testConcurrencyToken, null, null); dataFetcher = new KinesisDataFetcher(streamConfig.getStreamProxy(), shardInfo); getRecordsCache = spy(new BlockingGetRecordsCache(maxRecords, - new SynchronousGetRecordsRetrievalStrategy(dataFetcher))); + new SynchronousGetRecordsRetrievalStrategy(dataFetcher))); when(recordsFetcherFactory.createRecordsFetcher(any(GetRecordsRetrievalStrategy.class), anyString(), - any(IMetricsFactory.class), anyInt())) + any(IMetricsFactory.class), anyInt())) .thenReturn(getRecordsCache); RecordProcessorCheckpointer recordProcessorCheckpointer = new RecordProcessorCheckpointer( @@ -732,29 +740,29 @@ public class ShardConsumerTest { ShardConsumer consumer = new ShardConsumer(shardInfo, - streamConfig, - checkpoint, - processor, - recordProcessorCheckpointer, - leaseCoordinator, - parentShardPollIntervalMillis, - cleanupLeasesOfCompletedShards, - executorService, - metricsFactory, - taskBackoffTimeMillis, - KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, - dataFetcher, - Optional.empty(), - Optional.empty(), - config, - shardSyncer, - shardSyncStrategy); + streamConfig, + checkpoint, + processor, + recordProcessorCheckpointer, + leaseCoordinator, + parentShardPollIntervalMillis, + cleanupLeasesOfCompletedShards, + executorService, + metricsFactory, + taskBackoffTimeMillis, + KinesisClientLibConfiguration.DEFAULT_SKIP_SHARD_SYNC_AT_STARTUP_IF_LEASES_EXIST, + dataFetcher, + Optional.empty(), + Optional.empty(), + config, + shardSyncer, + shardSyncStrategy); List parentShardIds = new ArrayList<>(); parentShardIds.add(shardInfo.getShardId()); when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId())).thenReturn(createLease(shardInfo.getShardId(), - "leaseOwner", - parentShardIds)); + "leaseOwner", + parentShardIds)); when(leaseCoordinator.updateLease(any(KinesisClientLease.class), any(UUID.class))).thenReturn(true); assertThat(consumer.getCurrentState(), is(equalTo(ConsumerStates.ShardConsumerState.WAITING_ON_PARENT_SHARDS))); @@ -1111,7 +1119,7 @@ public class ShardConsumerTest { //@formatter:off (gets the formatting wrong) private void verifyConsumedRecords(List expectedRecords, - List actualRecords) { + List actualRecords) { //@formatter:on assertThat(actualRecords.size(), is(equalTo(expectedRecords.size()))); ListIterator expectedIter = expectedRecords.listIterator(); @@ -1141,7 +1149,7 @@ public class ShardConsumerTest { } Matcher initializationInputMatcher(final ExtendedSequenceNumber checkpoint, - final ExtendedSequenceNumber pendingCheckpoint) { + final ExtendedSequenceNumber pendingCheckpoint) { return new TypeSafeMatcher() { @Override protected boolean matchesSafely(InitializationInput item) { diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardObjectHelper.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardObjectHelper.java index a155f5c4..b80064af 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardObjectHelper.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardObjectHelper.java @@ -25,24 +25,24 @@ import com.amazonaws.services.kinesis.model.Shard; /** * Helper class to create Shard, SequenceRange and related objects. */ -class ShardObjectHelper { +public class ShardObjectHelper { private static final int EXPONENT = 128; /** * Max value of a sequence number (2^128 -1). Useful for defining sequence number range for a shard. */ - static final String MAX_SEQUENCE_NUMBER = new BigInteger("2").pow(EXPONENT).subtract(BigInteger.ONE).toString(); + public static final String MAX_SEQUENCE_NUMBER = new BigInteger("2").pow(EXPONENT).subtract(BigInteger.ONE).toString(); /** * Min value of a sequence number (0). Useful for defining sequence number range for a shard. */ - static final String MIN_SEQUENCE_NUMBER = BigInteger.ZERO.toString(); + public static final String MIN_SEQUENCE_NUMBER = BigInteger.ZERO.toString(); /** * Max value of a hash key (2^128 -1). Useful for defining hash key range for a shard. */ - static final String MAX_HASH_KEY = new BigInteger("2").pow(EXPONENT).subtract(BigInteger.ONE).toString(); + public static final String MAX_HASH_KEY = new BigInteger("2").pow(EXPONENT).subtract(BigInteger.ONE).toString(); /** * Min value of a hash key (0). Useful for defining sequence number range for a shard. @@ -63,7 +63,7 @@ class ShardObjectHelper { * @param sequenceNumberRange * @return */ - static Shard newShard(String shardId, + public static Shard newShard(String shardId, String parentShardId, String adjacentParentShardId, SequenceNumberRange sequenceNumberRange) { @@ -78,7 +78,7 @@ class ShardObjectHelper { * @param hashKeyRange * @return */ - static Shard newShard(String shardId, + public static Shard newShard(String shardId, String parentShardId, String adjacentParentShardId, SequenceNumberRange sequenceNumberRange, @@ -98,7 +98,7 @@ class ShardObjectHelper { * @param endingSequenceNumber * @return */ - static SequenceNumberRange newSequenceNumberRange(String startingSequenceNumber, String endingSequenceNumber) { + public static SequenceNumberRange newSequenceNumberRange(String startingSequenceNumber, String endingSequenceNumber) { SequenceNumberRange range = new SequenceNumberRange(); range.setStartingSequenceNumber(startingSequenceNumber); range.setEndingSequenceNumber(endingSequenceNumber); @@ -110,14 +110,14 @@ class ShardObjectHelper { * @param endingHashKey * @return */ - static HashKeyRange newHashKeyRange(String startingHashKey, String endingHashKey) { + public static HashKeyRange newHashKeyRange(String startingHashKey, String endingHashKey) { HashKeyRange range = new HashKeyRange(); range.setStartingHashKey(startingHashKey); range.setEndingHashKey(endingHashKey); return range; } - static List getParentShardIds(Shard shard) { + public static List getParentShardIds(Shard shard) { List parentShardIds = new ArrayList<>(2); if (shard.getAdjacentParentShardId() != null) { parentShardIds.add(shard.getAdjacentParentShardId()); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java index 208d6448..e2cc578c 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShardSyncerTest.java @@ -420,6 +420,7 @@ public class ShardSyncerTest { cleanupLeasesOfCompletedShards, true, shards); List newLeases = leaseManager.listLeases(); Set expectedLeaseShardIds = new HashSet(); + expectedLeaseShardIds.add("shardId-1000"); // dummy lease will still be in the table. expectedLeaseShardIds.add("shardId-4"); expectedLeaseShardIds.add("shardId-5"); expectedLeaseShardIds.add("shardId-8"); @@ -691,31 +692,6 @@ public class ShardSyncerTest { dataFile.delete(); } - /** - * Test bootstrapShardLeases() - cleanup garbage leases. - * - * @throws ProvisionedThroughputException - * @throws InvalidStateException - * @throws DependencyException - * @throws IOException - * @throws KinesisClientLibIOException - */ - @Test - public final void testBootstrapShardLeasesCleanupGarbage() - throws DependencyException, InvalidStateException, ProvisionedThroughputException, IOException, - KinesisClientLibIOException { - String garbageShardId = "shardId-garbage-001"; - KinesisClientLease garbageLease = shardSyncer.newKCLLease(ShardObjectHelper.newShard(garbageShardId, - null, - null, - ShardObjectHelper.newSequenceNumberRange("101", null))); - garbageLease.setCheckpoint(new ExtendedSequenceNumber("999")); - leaseManager.createLeaseIfNotExists(garbageLease); - Assert.assertEquals(garbageShardId, leaseManager.getLease(garbageShardId).getLeaseKey()); - testBootstrapShardLeasesAtStartingPosition(INITIAL_POSITION_LATEST); - Assert.assertNull(leaseManager.getLease(garbageShardId)); - } - private void testBootstrapShardLeasesAtStartingPosition(InitialPositionInStreamExtended initialPosition) throws DependencyException, InvalidStateException, ProvisionedThroughputException, IOException, KinesisClientLibIOException { @@ -730,7 +706,7 @@ public class ShardSyncerTest { dataFile.deleteOnExit(); IKinesisProxy kinesisProxy = new KinesisLocalFileProxy(dataFile.getAbsolutePath()); - shardSyncer.bootstrapShardLeases(kinesisProxy, leaseManager, initialPosition, cleanupLeasesOfCompletedShards, + shardSyncer.bootstrapShardLeases(kinesisProxy, leaseManager, initialPosition, false); List newLeases = leaseManager.listLeases(); Assert.assertEquals(2, newLeases.size()); @@ -2267,81 +2243,6 @@ public class ShardSyncerTest { Assert.assertFalse(leaseCleanupValidator.isCandidateForCleanup(lease, currentKinesisShardIds)); } - /** - * Test cleanup of lease for a shard that has been fully processed (and processing of child shards has begun). - * - * @throws DependencyException - * @throws InvalidStateException - * @throws ProvisionedThroughputException - */ - @Test - public final void testCleanupLeaseForClosedShard() - throws DependencyException, InvalidStateException, ProvisionedThroughputException { - String closedShardId = "shardId-2"; - KinesisClientLease leaseForClosedShard = newLease(closedShardId); - leaseForClosedShard.setCheckpoint(new ExtendedSequenceNumber("1234")); - leaseManager.createLeaseIfNotExists(leaseForClosedShard); - - Set childShardIds = new HashSet<>(); - List trackedLeases = new ArrayList<>(); - Set parentShardIds = new HashSet<>(); - parentShardIds.add(closedShardId); - String childShardId1 = "shardId-5"; - KinesisClientLease childLease1 = newLease(childShardId1); - childLease1.setParentShardIds(parentShardIds); - childLease1.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); - String childShardId2 = "shardId-7"; - KinesisClientLease childLease2 = newLease(childShardId2); - childLease2.setParentShardIds(parentShardIds); - childLease2.setCheckpoint(ExtendedSequenceNumber.TRIM_HORIZON); - Map trackedLeaseMap = shardSyncer.constructShardIdToKCLLeaseMap(trackedLeases); - - // empty list of leases - shardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager); - Assert.assertNotNull(leaseManager.getLease(closedShardId)); - - // closed shard has not been fully processed yet (checkpoint != SHARD_END) - trackedLeases.add(leaseForClosedShard); - trackedLeaseMap = shardSyncer.constructShardIdToKCLLeaseMap(trackedLeases); - shardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager); - Assert.assertNotNull(leaseManager.getLease(closedShardId)); - - // closed shard has been fully processed yet (checkpoint == SHARD_END) - leaseForClosedShard.setCheckpoint(ExtendedSequenceNumber.SHARD_END); - leaseManager.updateLease(leaseForClosedShard); - shardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager); - Assert.assertNull(leaseManager.getLease(closedShardId)); - - // lease for only one child exists - childShardIds.add(childShardId1); - childShardIds.add(childShardId2); - leaseManager.createLeaseIfNotExists(leaseForClosedShard); - leaseManager.createLeaseIfNotExists(childLease1); - trackedLeases.add(childLease1); - trackedLeaseMap = shardSyncer.constructShardIdToKCLLeaseMap(trackedLeases); - shardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager); - Assert.assertNotNull(leaseManager.getLease(closedShardId)); - - // leases for both children exists, but they are both at TRIM_HORIZON - leaseManager.createLeaseIfNotExists(childLease2); - trackedLeases.add(childLease2); - trackedLeaseMap = shardSyncer.constructShardIdToKCLLeaseMap(trackedLeases); - shardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager); - Assert.assertNotNull(leaseManager.getLease(closedShardId)); - - // leases for both children exists, one is at TRIM_HORIZON - childLease1.setCheckpoint(new ExtendedSequenceNumber("34890")); - leaseManager.updateLease(childLease1); - shardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager); - Assert.assertNotNull(leaseManager.getLease(closedShardId)); - - // leases for both children exists, NONE of them are at TRIM_HORIZON - childLease2.setCheckpoint(new ExtendedSequenceNumber("43789")); - leaseManager.updateLease(childLease2); - shardSyncer.cleanupLeaseForClosedShard(closedShardId, childShardIds, trackedLeaseMap, leaseManager); - Assert.assertNull(leaseManager.getLease(closedShardId)); - } - /** * Test we can handle trimmed Kinesis shards (absent from the shard list), and valid closed shards. * diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java index cbfdf54a..b47bf70d 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java @@ -34,6 +34,7 @@ import java.util.UUID; import com.amazonaws.services.kinesis.clientlibrary.proxies.ShardListWrappingShardClosureVerificationResponse; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; import com.amazonaws.services.kinesis.leases.exceptions.InvalidStateException; +import com.amazonaws.services.kinesis.leases.impl.LeaseCleanupManager; import com.amazonaws.services.kinesis.model.ChildShard; import com.amazonaws.services.kinesis.model.HashKeyRange; import com.amazonaws.services.kinesis.model.SequenceNumberRange; @@ -86,6 +87,8 @@ public class ShutdownTaskTest { private ILeaseManager leaseManager; @Mock private KinesisClientLibLeaseCoordinator leaseCoordinator; + @Mock + private LeaseCleanupManager leaseCleanupManager; /** * @throws java.lang.Exception @@ -143,7 +146,8 @@ public class ShutdownTaskTest { getRecordsCache, shardSyncer, shardSyncStrategy, - constructChildShards()); + constructChildShards(), + leaseCleanupManager); TaskResult result = task.call(); Assert.assertNotNull(result.getException()); Assert.assertTrue(result.getException() instanceof IllegalArgumentException); @@ -178,7 +182,8 @@ public class ShutdownTaskTest { getRecordsCache, shardSyncer, shardSyncStrategy, - constructChildShards()); + constructChildShards(), + leaseCleanupManager); TaskResult result = task.call(); verify(getRecordsCache).shutdown(); verify(leaseCoordinator).dropLease(any(KinesisClientLease.class)); @@ -205,7 +210,8 @@ public class ShutdownTaskTest { getRecordsCache, shardSyncer, shardSyncStrategy, - constructChildShards()); + constructChildShards(), + leaseCleanupManager); TaskResult result = task.call(); verify(leaseManager, times(2)).createLeaseIfNotExists(any(KinesisClientLease.class)); verify(leaseCoordinator).updateLease(any(KinesisClientLease.class), any(UUID.class)); @@ -238,7 +244,8 @@ public class ShutdownTaskTest { getRecordsCache, shardSyncer, shardSyncStrategy, - Collections.emptyList()); + Collections.emptyList(), + leaseCleanupManager); TaskResult result = task.call(); verify(leaseManager, never()).createLeaseIfNotExists(any(KinesisClientLease.class)); verify(leaseCoordinator, never()).updateLease(any(KinesisClientLease.class), any(UUID.class)); @@ -267,7 +274,8 @@ public class ShutdownTaskTest { getRecordsCache, shardSyncer, shardSyncStrategy, - Collections.emptyList()); + Collections.emptyList(), + leaseCleanupManager); TaskResult result = task.call(); verify(leaseManager, never()).createLeaseIfNotExists(any(KinesisClientLease.class)); verify(leaseCoordinator, never()).updateLease(any(KinesisClientLease.class), any(UUID.class)); @@ -284,7 +292,7 @@ public class ShutdownTaskTest { ShutdownTask task = new ShutdownTask(null, null, null, null, null, null, false, false, leaseCoordinator, 0, - getRecordsCache, shardSyncer, shardSyncStrategy, Collections.emptyList()); + getRecordsCache, shardSyncer, shardSyncStrategy, Collections.emptyList(), leaseCleanupManager); Assert.assertEquals(TaskType.SHUTDOWN, task.getTaskType()); } diff --git a/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManagerTest.java b/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManagerTest.java new file mode 100644 index 00000000..367c0ab0 --- /dev/null +++ b/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseCleanupManagerTest.java @@ -0,0 +1,289 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazonaws.services.kinesis.leases.impl; + +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardInfo; +import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShardObjectHelper; +import com.amazonaws.services.kinesis.clientlibrary.proxies.IKinesisProxy; +import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; +import com.amazonaws.services.kinesis.leases.LeasePendingDeletion; +import com.amazonaws.services.kinesis.metrics.impl.NullMetricsFactory; +import com.amazonaws.services.kinesis.metrics.interfaces.IMetricsFactory; +import com.amazonaws.services.kinesis.model.ChildShard; +import com.amazonaws.services.kinesis.model.GetRecordsResult; +import com.amazonaws.services.kinesis.model.ResourceNotFoundException; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ScheduledExecutorService; +import java.util.stream.Collectors; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class LeaseCleanupManagerTest { + + private ShardInfo shardInfo; + private String concurrencyToken = "1234"; + private int maxRecords = 1; + + private String getShardId = "getShardId"; + private String splitParent = "splitParent"; + private String mergeParent1 = "mergeParent-1"; + private String mergeParent2 = "mergeParent-2"; + + private long leaseCleanupIntervalMillis = Duration.ofSeconds(1).toMillis(); + private long completedLeaseCleanupIntervalMillis = Duration.ofSeconds(0).toMillis(); + private long garbageLeaseCleanupIntervalMillis = Duration.ofSeconds(0).toMillis(); + private boolean cleanupLeasesOfCompletedShards = true; + private LeaseCleanupManager leaseCleanupManager; + private static final IMetricsFactory NULL_METRICS_FACTORY = new NullMetricsFactory(); + + @Mock + private LeaseManager leaseManager; + @Mock + private LeaseCoordinator leaseCoordinator; + @Mock + private IKinesisProxy kinesis; + @Mock + private ScheduledExecutorService deletionThreadPool; + + @Before + public void setUp() { + shardInfo = new ShardInfo(getShardId, concurrencyToken, Collections.emptySet(), + ExtendedSequenceNumber.LATEST); + leaseCleanupManager = new LeaseCleanupManager(kinesis, leaseManager, deletionThreadPool, NULL_METRICS_FACTORY, + cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis, completedLeaseCleanupIntervalMillis, garbageLeaseCleanupIntervalMillis, maxRecords); + } + + /** + * Tests that when both child shard leases are present, we are able to delete the parent shard for the completed + * shard case. + */ + @Test + public final void testParentShardLeaseDeletedSplitCase() throws Exception { + shardInfo = new ShardInfo("ShardId-0", concurrencyToken, Collections.emptySet(), + ExtendedSequenceNumber.LATEST); + + verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShardsForSplit(), ExtendedSequenceNumber.LATEST, 1); + } + + /** + * Tests that when both child shard leases are present, we are able to delete the parent shard for the completed + * shard case. + */ + @Test + public final void testParentShardLeaseDeletedMergeCase() throws Exception { + shardInfo = new ShardInfo("ShardId-0", concurrencyToken, Collections.emptySet(), + ExtendedSequenceNumber.LATEST); + + verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShardsForMerge(), ExtendedSequenceNumber.LATEST, 1); + } + + /** + * Tests that if cleanupLeasesOfCompletedShards is not enabled by the customer, then no leases are cleaned up for + * the completed shard case. + */ + @Test + public final void testNoLeasesDeletedWhenNotEnabled() throws Exception { + shardInfo = new ShardInfo("ShardId-0", concurrencyToken, Collections.emptySet(), + ExtendedSequenceNumber.LATEST); + cleanupLeasesOfCompletedShards = false; + + leaseCleanupManager = new LeaseCleanupManager(kinesis, leaseManager, deletionThreadPool, NULL_METRICS_FACTORY, + cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis, completedLeaseCleanupIntervalMillis, garbageLeaseCleanupIntervalMillis, maxRecords); + + verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShardsForSplit(), ExtendedSequenceNumber.LATEST, 0); + } + + /** + * Tests that if some of the child shard leases are missing, we fail fast and don't delete the parent shard lease + * for the completed shard case. + */ + @Test + public final void testNoCleanupWhenSomeChildShardLeasesAreNotPresent() throws Exception { + List childShards = childShardsForSplit(); + + shardInfo = new ShardInfo("ShardId-0", concurrencyToken, Collections.emptySet(), + ExtendedSequenceNumber.LATEST); + + verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShards, ExtendedSequenceNumber.LATEST, false, 0); + } + + /** + * Tests that if some child shard leases haven't begun processing (at least one lease w/ checkpoint TRIM_HORIZON), + * we don't delete them for the completed shard case. + */ + @Test + public final void testParentShardLeaseNotDeletedWhenChildIsAtTrim() throws Exception { + testParentShardLeaseNotDeletedWhenChildIsAtPosition(ExtendedSequenceNumber.TRIM_HORIZON); + } + + /** + * Tests that if some child shard leases haven't begun processing (at least one lease w/ checkpoint AT_TIMESTAMP), + * we don't delete them for the completed shard case. + */ + @Test + public final void testParentShardLeaseNotDeletedWhenChildIsAtTimestamp() throws Exception { + testParentShardLeaseNotDeletedWhenChildIsAtPosition(ExtendedSequenceNumber.AT_TIMESTAMP); + } + + private void testParentShardLeaseNotDeletedWhenChildIsAtPosition(ExtendedSequenceNumber extendedSequenceNumber) + throws Exception { + shardInfo = new ShardInfo("ShardId-0", concurrencyToken, Collections.emptySet(), + ExtendedSequenceNumber.LATEST); + + verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShardsForMerge(), extendedSequenceNumber, 0); + } + + /** + * Tests that if a lease's parents are still present, we do not delete the lease. + */ + @Test + public final void testLeaseNotDeletedWhenParentsStillPresent() throws Exception { + shardInfo = new ShardInfo("ShardId-0", concurrencyToken, Collections.singleton("parent"), + ExtendedSequenceNumber.LATEST); + + verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShardsForMerge(), ExtendedSequenceNumber.LATEST, 0); + } + + /** + * Tests ResourceNotFound case for if a shard expires, that we delete the lease when shardExpired is found. + */ + @Test + public final void testLeaseDeletedWhenShardDoesNotExist() throws Exception { + shardInfo = new ShardInfo("ShardId-0", concurrencyToken, Collections.emptySet(), + ExtendedSequenceNumber.LATEST); + final KinesisClientLease heldLease = LeaseHelper.createLease(shardInfo.getShardId(), "leaseOwner", Collections.singleton("parentShardId")); + + testLeaseDeletedWhenShardDoesNotExist(heldLease); + } + + /** + * Tests ResourceNotFound case when completed lease cleanup is disabled. + * @throws Exception + */ + @Test + public final void testLeaseDeletedWhenShardDoesNotExistAndCleanupCompletedLeaseDisabled() throws Exception { + shardInfo = new ShardInfo("ShardId-0", concurrencyToken, Collections.emptySet(), + ExtendedSequenceNumber.LATEST); + final KinesisClientLease heldLease = LeaseHelper.createLease(shardInfo.getShardId(), "leaseOwner", Collections.singleton("parentShardId")); + + cleanupLeasesOfCompletedShards = false; + + leaseCleanupManager = new LeaseCleanupManager(kinesis, leaseManager, deletionThreadPool, NULL_METRICS_FACTORY, + cleanupLeasesOfCompletedShards, leaseCleanupIntervalMillis, completedLeaseCleanupIntervalMillis, garbageLeaseCleanupIntervalMillis, maxRecords); + + testLeaseDeletedWhenShardDoesNotExist(heldLease); + } + + public void testLeaseDeletedWhenShardDoesNotExist(KinesisClientLease heldLease) throws Exception { + when(leaseCoordinator.getCurrentlyHeldLease(shardInfo.getShardId())).thenReturn(heldLease); + when(kinesis.get(anyString(), anyInt())).thenThrow(ResourceNotFoundException.class); + when(kinesis.getIterator(anyString(), anyString())).thenThrow(ResourceNotFoundException.class); + when(leaseManager.getLease(heldLease.getLeaseKey())).thenReturn(heldLease); + + leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(heldLease, shardInfo)); + leaseCleanupManager.cleanupLeases(); + + verify(leaseManager, times(1)).deleteLease(heldLease); + } + + private void verifyExpectedDeletedLeasesCompletedShardCase(ShardInfo shardInfo, List childShards, + ExtendedSequenceNumber extendedSequenceNumber, + int expectedDeletedLeases) throws Exception { + verifyExpectedDeletedLeasesCompletedShardCase(shardInfo, childShards, extendedSequenceNumber, true, expectedDeletedLeases); + } + + private void verifyExpectedDeletedLeasesCompletedShardCase(ShardInfo shardInfo, List childShards, + ExtendedSequenceNumber extendedSequenceNumber, + boolean childShardLeasesPresent, + int expectedDeletedLeases) throws Exception { + + final KinesisClientLease lease = LeaseHelper.createLease(shardInfo.getShardId(), "leaseOwner", shardInfo.getParentShardIds(), + childShards.stream().map(c -> c.getShardId()).collect(Collectors.toSet())); + final List childShardLeases = childShards.stream().map(c -> LeaseHelper.createLease( + c.getShardId(), "leaseOwner", Collections.singleton(shardInfo.getShardId()), + Collections.emptyList(), extendedSequenceNumber)).collect(Collectors.toList()); + + final List parentShardLeases = lease.getParentShardIds().stream().map(p -> + LeaseHelper.createLease(p, "leaseOwner", Collections.emptyList(), + Collections.singleton(shardInfo.getShardId()), extendedSequenceNumber)).collect(Collectors.toList()); + + when(leaseManager.getLease(lease.getLeaseKey())).thenReturn(lease); + for (Lease parentShardLease : parentShardLeases) { + when(leaseManager.getLease(parentShardLease.getLeaseKey())).thenReturn(parentShardLease); + } + if (childShardLeasesPresent) { + for (Lease childShardLease : childShardLeases) { + when(leaseManager.getLease(childShardLease.getLeaseKey())).thenReturn(childShardLease); + } + } + + when(kinesis.getIterator(any(String.class), any(String.class))).thenReturn("123"); + + final GetRecordsResult getRecordsResult = new GetRecordsResult(); + getRecordsResult.setRecords(Collections.emptyList()); + getRecordsResult.setChildShards(childShards); + + when(kinesis.get(any(String.class), any(Integer.class))).thenReturn(getRecordsResult); + + leaseCleanupManager.enqueueForDeletion(new LeasePendingDeletion(lease, shardInfo)); + leaseCleanupManager.cleanupLeases(); + + verify(leaseManager, times(expectedDeletedLeases)).deleteLease(any(Lease.class)); + } + + private List childShardsForSplit() { + final List parentShards = Arrays.asList(splitParent); + + final ChildShard leftChild = new ChildShard(); + leftChild.setShardId("leftChild"); + leftChild.setParentShards(parentShards); + leftChild.setHashKeyRange(ShardObjectHelper.newHashKeyRange("0", "49")); + + final ChildShard rightChild = new ChildShard(); + rightChild.setShardId("rightChild"); + rightChild.setParentShards(parentShards); + rightChild.setHashKeyRange(ShardObjectHelper.newHashKeyRange("50", "99")); + + return Arrays.asList(leftChild, rightChild); + } + + private List childShardsForMerge() { + final List parentShards = Arrays.asList(mergeParent1, mergeParent2); + + final ChildShard child = new ChildShard(); + child.setShardId("onlyChild"); + child.setParentShards(parentShards); + child.setHashKeyRange(ShardObjectHelper.newHashKeyRange("0", "99")); + + return Collections.singletonList(child); + } +} diff --git a/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseHelper.java b/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseHelper.java new file mode 100644 index 00000000..b122d9ff --- /dev/null +++ b/src/test/java/com/amazonaws/services/kinesis/leases/impl/LeaseHelper.java @@ -0,0 +1,44 @@ +/* + * Copyright 2020 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.amazonaws.services.kinesis.leases.impl; + +import com.amazonaws.services.kinesis.clientlibrary.types.ExtendedSequenceNumber; + +import java.util.Collection; +import java.util.Collections; + +public class LeaseHelper { + + public static KinesisClientLease createLease(String leaseKey, String leaseOwner, Collection parentShardIds) { + return createLease(leaseKey, leaseOwner, parentShardIds, Collections.emptySet(), ExtendedSequenceNumber.LATEST); + } + + public static KinesisClientLease createLease(String leaseKey, String leaseOwner, Collection parentShardIds, Collection childShardIds) { + return createLease(leaseKey, leaseOwner, parentShardIds, childShardIds, ExtendedSequenceNumber.LATEST); + } + + public static KinesisClientLease createLease(String leaseKey, String leaseOwner, Collection parentShardIds, + Collection childShardIds, ExtendedSequenceNumber extendedSequenceNumber) { + KinesisClientLease lease = new KinesisClientLease (); + lease.setLeaseKey(leaseKey); + lease.setLeaseOwner(leaseOwner); + lease.setParentShardIds(parentShardIds); + lease.setChildShardIds(childShardIds); + lease.setCheckpoint(extendedSequenceNumber); + + return lease; + } +} \ No newline at end of file