diff --git a/.gitignore b/.gitignore index 973f5095..5454f215 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,5 @@ target/ AwsCredentials.properties .idea *.iml - +.sdkmanrc +.vscode diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/CommonCalculations.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/CommonCalculations.java index edb6de2e..94328db2 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/CommonCalculations.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/CommonCalculations.java @@ -28,4 +28,22 @@ public class CommonCalculations { public static long getRenewerTakerIntervalMillis(long leaseDurationMillis, long epsilonMillis) { return leaseDurationMillis / 3 - epsilonMillis; } + + /** + * Convenience method for calculating lease taker intervals in milliseconds. + * + * @param leaseTakerIntervalMillis Current value for interval (from default or overriden). + * @param leaseDurationMillis Duration of a lease + * @param epsilonMillis Allow for some variance when calculating lease expirations + * @return lease taker interval. + */ + public static long getLeaseTakerIntervalMillis( + long leaseTakerIntervalMillis, long leaseDurationMillis, long epsilonMillis + ) { + if (leaseTakerIntervalMillis > 0) { + return leaseTakerIntervalMillis; + } + + return (leaseDurationMillis + epsilonMillis) * 2; + } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 5d9f73e9..6f1486ad 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -945,7 +945,8 @@ public class Scheduler implements Runnable { hierarchicalShardSyncerProvider.apply(streamConfig), metricsFactory, leaseCleanupManager, - schemaRegistryDecoder + schemaRegistryDecoder, + leaseManagementConfig.evictLeaseOnShutdown() ); return new ShardConsumer(cache, executorService, shardInfo, lifecycleConfig.logWarningForTaskAfterMillis(), argument, lifecycleConfig.taskExecutionListener(), lifecycleConfig.readTimeoutsToIgnoreBeforeWarning()); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java index 4074db22..8ea522ab 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.java @@ -44,7 +44,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; public class Lease { /* * See javadoc for System.nanoTime - summary: - * + * * Sometimes System.nanoTime's return values will wrap due to overflow. When they do, the difference between two * values will be very large. We will consider leases to be expired if they are more than a year old. */ @@ -111,7 +111,7 @@ public class Lease { /** * Copy constructor, used by clone(). - * + * * @param lease lease to copy */ protected Lease(Lease lease) { @@ -164,7 +164,7 @@ public class Lease { /** * Updates this Lease's mutable, application-specific fields based on the passed-in lease object. Does not update * fields that are internal to the leasing library (leaseKey, leaseOwner, leaseCounter). - * + * * @param lease */ public void update(final Lease lease) { @@ -195,9 +195,16 @@ public class Lease { } } + /** + * @return true if this lease is unassigned (no assigned owner), false otherwise. + */ + public boolean isUnassigned() { + return leaseOwner == null; + } + /** * Sets lastCounterIncrementNanos - * + * * @param lastCounterIncrementNanos last renewal in nanoseconds since the epoch */ public void lastCounterIncrementNanos(Long lastCounterIncrementNanos) { @@ -206,7 +213,7 @@ public class Lease { /** * Sets concurrencyToken. - * + * * @param concurrencyToken may not be null */ public void concurrencyToken(@NonNull final UUID concurrencyToken) { @@ -215,7 +222,7 @@ public class Lease { /** * Sets leaseKey. LeaseKey is immutable once set. - * + * * @param leaseKey may not be null. */ public void leaseKey(@NonNull final String leaseKey) { @@ -227,7 +234,7 @@ public class Lease { /** * Sets leaseCounter. - * + * * @param leaseCounter may not be null */ public void leaseCounter(@NonNull final Long leaseCounter) { @@ -303,7 +310,7 @@ public class Lease { /** * Sets leaseOwner. - * + * * @param leaseOwner may be null. */ public void leaseOwner(String leaseOwner) { @@ -312,12 +319,10 @@ public class Lease { /** * Returns a deep copy of this object. Type-unsafe - there aren't good mechanisms for copy-constructing generics. - * + * * @return A deep copy of this object. */ public Lease copy() { return new Lease(this); } - - } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java index 89e6a3bf..e46ce8c7 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/LeaseManagementConfig.java @@ -224,6 +224,18 @@ public class LeaseManagementConfig { private long listShardsCacheAllowedAgeInSeconds = 30; private int cacheMissWarningModulus = 250; + /** + * Interval at which the lease taker will execute. + * If unspecified, an interval will be calculated based on the lease duration. + */ + private long leaseTakerIntervalMillis = -1L; + + /** + * If leases should be evicted or not on shutdown requested. + * By default, leases are not evicted. + */ + private boolean evictLeaseOnShutdown = false; + private MetricsFactory metricsFactory = new NullMetricsFactory(); @Deprecated @@ -326,6 +338,7 @@ public class LeaseManagementConfig { initialPositionInStream(), failoverTimeMillis(), epsilonMillis(), + leaseTakerIntervalMillis, maxLeasesForWorker(), maxLeasesToStealAtOneTime(), maxLeaseRenewalThreads(), @@ -361,6 +374,7 @@ public class LeaseManagementConfig { executorService(), failoverTimeMillis(), epsilonMillis(), + leaseTakerIntervalMillis, maxLeasesForWorker(), maxLeasesToStealAtOneTime(), maxLeaseRenewalThreads(), diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java index 07e9068d..ee9c2ce2 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinator.java @@ -49,6 +49,7 @@ import software.amazon.kinesis.metrics.MetricsScope; import software.amazon.kinesis.metrics.MetricsUtil; import static software.amazon.kinesis.common.CommonCalculations.getRenewerTakerIntervalMillis; +import static software.amazon.kinesis.common.CommonCalculations.getLeaseTakerIntervalMillis; /** * LeaseCoordinator abstracts away LeaseTaker and LeaseRenewer from the application code that's using leasing. It owns @@ -108,12 +109,13 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { final String workerIdentifier, final long leaseDurationMillis, final long epsilonMillis, + final long leaseTakerIntervalMillis, final int maxLeasesForWorker, final int maxLeasesToStealAtOneTime, final int maxLeaseRenewerThreadCount, final MetricsFactory metricsFactory) { - this(leaseRefresher, workerIdentifier, leaseDurationMillis, epsilonMillis, maxLeasesForWorker, - maxLeasesToStealAtOneTime, maxLeaseRenewerThreadCount, + this(leaseRefresher, workerIdentifier, leaseDurationMillis, epsilonMillis, leaseTakerIntervalMillis, + maxLeasesForWorker, maxLeasesToStealAtOneTime, maxLeaseRenewerThreadCount, TableConstants.DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY, TableConstants.DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY, metricsFactory); } @@ -144,6 +146,7 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { final String workerIdentifier, final long leaseDurationMillis, final long epsilonMillis, + final long leaseTakerIntervalMillis, final int maxLeasesForWorker, final int maxLeasesToStealAtOneTime, final int maxLeaseRenewerThreadCount, @@ -158,7 +161,7 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator { this.leaseRenewer = new DynamoDBLeaseRenewer( leaseRefresher, workerIdentifier, leaseDurationMillis, leaseRenewalThreadpool, metricsFactory); this.renewerIntervalMillis = getRenewerTakerIntervalMillis(leaseDurationMillis, epsilonMillis); - this.takerIntervalMillis = (leaseDurationMillis + epsilonMillis) * 2; + this.takerIntervalMillis = getLeaseTakerIntervalMillis(leaseTakerIntervalMillis, leaseDurationMillis, epsilonMillis); if (initialLeaseTableReadCapacity <= 0) { throw new IllegalArgumentException("readCapacity should be >= 1"); } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java index ad1a2300..cffdcaa8 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseManagementFactory.java @@ -68,6 +68,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { private final long failoverTimeMillis; private final long epsilonMillis; + private final long leaseTakerIntervalMillis; private final int maxLeasesForWorker; private final int maxLeasesToStealAtOneTime; private final int maxLeaseRenewalThreads; @@ -119,14 +120,14 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName, final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier, final ExecutorService executorService, final InitialPositionInStreamExtended initialPositionInStream, - final long failoverTimeMillis, final long epsilonMillis, final int maxLeasesForWorker, + final long failoverTimeMillis, final long epsilonMillis, final long leaseTakerIntervalMillis, final int maxLeasesForWorker, final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads, final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards, final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis, final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload, final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus) { this(kinesisClient, streamName, dynamoDBClient, tableName, workerIdentifier, executorService, - initialPositionInStream, failoverTimeMillis, epsilonMillis, maxLeasesForWorker, + initialPositionInStream, failoverTimeMillis, epsilonMillis, leaseTakerIntervalMillis, maxLeasesForWorker, maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis, maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds, @@ -169,7 +170,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName, final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier, final ExecutorService executorService, final InitialPositionInStreamExtended initialPositionInStream, - final long failoverTimeMillis, final long epsilonMillis, final int maxLeasesForWorker, + final long failoverTimeMillis, final long epsilonMillis, final long leaseTakerIntervalMillis, final int maxLeasesForWorker, final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads, final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards, final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis, @@ -177,7 +178,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus, final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity) { this(kinesisClient, streamName, dynamoDBClient, tableName, workerIdentifier, executorService, - initialPositionInStream, failoverTimeMillis, epsilonMillis, maxLeasesForWorker, + initialPositionInStream, failoverTimeMillis, epsilonMillis, leaseTakerIntervalMillis, maxLeasesForWorker, maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis, maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds, @@ -219,7 +220,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName, final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier, final ExecutorService executorService, final InitialPositionInStreamExtended initialPositionInStream, - final long failoverTimeMillis, final long epsilonMillis, final int maxLeasesForWorker, + final long failoverTimeMillis, final long epsilonMillis, final long leaseTakerIntervalMillis, final int maxLeasesForWorker, final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads, final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards, final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis, @@ -228,7 +229,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity, final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback) { this(kinesisClient, streamName, dynamoDBClient, tableName, workerIdentifier, executorService, - initialPositionInStream, failoverTimeMillis, epsilonMillis, maxLeasesForWorker, + initialPositionInStream, failoverTimeMillis, epsilonMillis, leaseTakerIntervalMillis, maxLeasesForWorker, maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis, maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds, @@ -270,7 +271,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName, final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier, final ExecutorService executorService, final InitialPositionInStreamExtended initialPositionInStream, - final long failoverTimeMillis, final long epsilonMillis, final int maxLeasesForWorker, + final long failoverTimeMillis, final long epsilonMillis, final long leaseTakerIntervalMillis, final int maxLeasesForWorker, final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads, final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards, final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis, @@ -280,7 +281,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback, Duration dynamoDbRequestTimeout) { this(kinesisClient, streamName, dynamoDBClient, tableName, workerIdentifier, executorService, - initialPositionInStream, failoverTimeMillis, epsilonMillis, maxLeasesForWorker, + initialPositionInStream, failoverTimeMillis, epsilonMillis, leaseTakerIntervalMillis, maxLeasesForWorker, maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis, maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds, @@ -323,7 +324,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName, final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier, final ExecutorService executorService, final InitialPositionInStreamExtended initialPositionInStream, - final long failoverTimeMillis, final long epsilonMillis, final int maxLeasesForWorker, + final long failoverTimeMillis, final long epsilonMillis, final long leaseTakerIntervalMillis, final int maxLeasesForWorker, final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads, final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards, final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis, @@ -334,7 +335,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { Duration dynamoDbRequestTimeout, BillingMode billingMode) { this(kinesisClient, new StreamConfig(StreamIdentifier.singleStreamInstance(streamName), initialPositionInStream), dynamoDBClient, tableName, - workerIdentifier, executorService, failoverTimeMillis, epsilonMillis, maxLeasesForWorker, + workerIdentifier, executorService, failoverTimeMillis, epsilonMillis, leaseTakerIntervalMillis, maxLeasesForWorker, maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis, maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds, @@ -374,7 +375,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { */ private DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final StreamConfig streamConfig, final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier, - final ExecutorService executorService, final long failoverTimeMillis, final long epsilonMillis, + final ExecutorService executorService, final long failoverTimeMillis, final long epsilonMillis, final long leaseTakerIntervalMillis, final int maxLeasesForWorker, final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads, final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards, final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis, @@ -384,7 +385,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback, Duration dynamoDbRequestTimeout, BillingMode billingMode, LeaseSerializer leaseSerializer) { this(kinesisClient, dynamoDBClient, tableName, - workerIdentifier, executorService, failoverTimeMillis, epsilonMillis, maxLeasesForWorker, + workerIdentifier, executorService, failoverTimeMillis, epsilonMillis, leaseTakerIntervalMillis, maxLeasesForWorker, maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion, ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis, maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds, @@ -428,7 +429,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { */ public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier, - final ExecutorService executorService, final long failoverTimeMillis, final long epsilonMillis, + final ExecutorService executorService, final long failoverTimeMillis, final long epsilonMillis, final long leaseTakerIntervalMillis, final int maxLeasesForWorker, final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads, final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards, final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis, @@ -446,6 +447,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { this.executorService = executorService; this.failoverTimeMillis = failoverTimeMillis; this.epsilonMillis = epsilonMillis; + this.leaseTakerIntervalMillis = leaseTakerIntervalMillis; this.maxLeasesForWorker = maxLeasesForWorker; this.maxLeasesToStealAtOneTime = maxLeasesToStealAtOneTime; this.maxLeaseRenewalThreads = maxLeaseRenewalThreads; @@ -476,6 +478,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory { workerIdentifier, failoverTimeMillis, epsilonMillis, + leaseTakerIntervalMillis, maxLeasesForWorker, maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java index a90ef56e..5f7eb6b1 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTaker.java @@ -191,9 +191,9 @@ public class DynamoDBLeaseTaker implements LeaseTaker { return takenLeases; } - List expiredLeases = getExpiredLeases(); + List availableLeases = getAvailableLeases(); - Set leasesToTake = computeLeasesToTake(expiredLeases); + Set leasesToTake = computeLeasesToTake(availableLeases); leasesToTake = updateStaleLeasesWithLatestState(updateAllLeasesTotalTimeMillis, leasesToTake); Set untakenLeaseKeys = new HashSet<>(); @@ -358,34 +358,43 @@ public class DynamoDBLeaseTaker implements LeaseTaker { } /** - * @return list of leases that were expired as of our last scan. + * @return list of leases that are available for the taking. + * Expired leases and orphan leases (without owner) are considered available. */ - private List getExpiredLeases() { - List expiredLeases = new ArrayList<>(); + private List getAvailableLeases() { + List availableLeases = new ArrayList<>(); for (Lease lease : allLeases.values()) { - if (lease.isExpired(leaseDurationNanos, lastScanTimeNanos)) { - expiredLeases.add(lease); + if (isExpired(lease) || isUnassigned(lease)) { + availableLeases.add(lease); } } - return expiredLeases; + return availableLeases; + } + + private boolean isUnassigned(Lease lease) { + return lease.isUnassigned(); + } + + private boolean isExpired(Lease lease) { + return lease.isExpired(leaseDurationNanos, lastScanTimeNanos); } /** * Compute the number of leases I should try to take based on the state of the system. * - * @param expiredLeases list of leases we determined to be expired + * @param availableLeases list of leases we determined to be available * @return set of leases to take. */ - private Set computeLeasesToTake(List expiredLeases) { - Map leaseCounts = computeLeaseCounts(expiredLeases); + private Set computeLeasesToTake(List availableLeases) { + Map leaseCounts = computeLeaseCounts(availableLeases); Set leasesToTake = new HashSet<>(); final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, TAKE_LEASES_DIMENSION); MetricsUtil.addWorkerIdentifier(scope, workerIdentifier); List veryOldLeases = new ArrayList<>(); - final int numAvailableLeases = expiredLeases.size(); + final int numAvailableLeases = availableLeases.size(); int numLeases = 0; int numWorkers = 0; int numLeasesToReachTarget = 0; @@ -454,16 +463,16 @@ public class DynamoDBLeaseTaker implements LeaseTaker { return leasesToTake; } - // Shuffle expiredLeases so workers don't all try to contend for the same leases. - Collections.shuffle(expiredLeases); + // Shuffle availableLeases so workers don't all try to contend for the same leases. + Collections.shuffle(availableLeases); - if (expiredLeases.size() > 0) { - // If we have expired leases, get up to leases from expiredLeases - for (; numLeasesToReachTarget > 0 && expiredLeases.size() > 0; numLeasesToReachTarget--) { - leasesToTake.add(expiredLeases.remove(0)); + if (availableLeases.size() > 0) { + // If we have available leases, get up to leases from availableLeases + for (; numLeasesToReachTarget > 0 && availableLeases.size() > 0; numLeasesToReachTarget--) { + leasesToTake.add(availableLeases.remove(0)); } } else { - // If there are no expired leases and we need a lease, consider stealing. + // If there are no available leases and we need a lease, consider stealing. List leasesToSteal = chooseLeasesToSteal(leaseCounts, numLeasesToReachTarget, target); for (Lease leaseToSteal : leasesToSteal) { log.info("Worker {} needed {} leases but none were expired, so it will steal lease {} from {}", @@ -482,7 +491,7 @@ public class DynamoDBLeaseTaker implements LeaseTaker { } } finally { - scope.addData("ExpiredLeases", expiredLeases.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY); + scope.addData("AvailableLeases", availableLeases.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY); scope.addData("LeaseSpillover", leaseSpillover, StandardUnit.COUNT, MetricsLevel.SUMMARY); scope.addData("LeasesToTake", leasesToTake.size(), StandardUnit.COUNT, MetricsLevel.DETAILED); scope.addData("NeededLeases", Math.max(numLeasesToReachTarget, 0), StandardUnit.COUNT, MetricsLevel.DETAILED); diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java index b6e7c068..442c35e6 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java @@ -16,7 +16,6 @@ package software.amazon.kinesis.lifecycle; import java.time.Duration; import java.time.Instant; -import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; @@ -24,8 +23,6 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.RejectedExecutionException; import java.util.function.Function; -import org.reactivestreams.Subscription; - import com.google.common.annotations.VisibleForTesting; import lombok.AccessLevel; @@ -33,10 +30,18 @@ import lombok.Getter; import lombok.NonNull; import lombok.experimental.Accessors; import lombok.extern.slf4j.Slf4j; -import software.amazon.awssdk.services.kinesis.model.ChildShard; + +import org.reactivestreams.Subscription; + import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.exceptions.internal.BlockedOnParentShardException; +import software.amazon.kinesis.leases.Lease; +import software.amazon.kinesis.leases.LeaseCoordinator; +import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.ShardInfo; +import software.amazon.kinesis.leases.exceptions.DependencyException; +import software.amazon.kinesis.leases.exceptions.InvalidStateException; +import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.TaskExecutionListenerInput; import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator; @@ -436,6 +441,21 @@ public class ShardConsumer { if (shutdownReason == null || shutdownReason.canTransitionTo(reason)) { shutdownReason = reason; } + + if (reason == ShutdownReason.REQUESTED && shardConsumerArgument.evictLeaseOnShutdown()) { + log.debug("{} : Shutdown({}): Evicting lease.", streamIdentifier, shardInfo.shardId()); + + LeaseCoordinator leaseCoordinator = shardConsumerArgument.leaseCoordinator(); + LeaseRefresher leaseRefresher = leaseCoordinator.leaseRefresher(); + Lease currentShardLease = leaseCoordinator.getCurrentlyHeldLease(ShardInfo.getLeaseKey(shardInfo)); + + try { + leaseRefresher.evictLease(currentShardLease); + } catch (DependencyException | InvalidStateException | ProvisionedThroughputException ex) { + // This is recoverable. This lease will expire and some other consumer will take it. + log.error("Could not evict lease on requested shutdown of {}", shardInfo.shardId(), ex); + } + } } } diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java index 0518b830..43478d9c 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java @@ -75,4 +75,5 @@ public class ShardConsumerArgument { private final MetricsFactory metricsFactory; private final LeaseCleanupManager leaseCleanupManager; private final SchemaRegistryDecoder schemaRegistryDecoder; + private final boolean evictLeaseOnShutdown; } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/common/CommonCalculationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/common/CommonCalculationTest.java new file mode 100644 index 00000000..4ae87834 --- /dev/null +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/common/CommonCalculationTest.java @@ -0,0 +1,46 @@ +/* + * Copyright 2019 Amazon.com, Inc. or its affiliates. + * Licensed under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package software.amazon.kinesis.common; + +import static org.junit.Assert.assertEquals; + +import org.junit.Test; + +public class CommonCalculationTest { + + @Test + public void testGetLeaseTakerIntervalMillisWithDefault() { + long leaseTakerIntervalMillis = -1L; + long leaseDurationMillis = 10000L; + long epsilonMillis = 25L; + + long expected = (leaseDurationMillis + epsilonMillis) * 2; + + assertEquals(expected, CommonCalculations.getLeaseTakerIntervalMillis( + leaseTakerIntervalMillis, leaseDurationMillis, epsilonMillis)); + } + + @Test + public void testGetLeaseTakerIntervalMillisWhenOverriden() { + long leaseTakerIntervalMillis = 200L; + long leaseDurationMillis = 10000L; + long epsilonMillis = 25L; + + long expected = leaseTakerIntervalMillis; + + assertEquals(expected, CommonCalculations.getLeaseTakerIntervalMillis( + leaseTakerIntervalMillis, leaseDurationMillis, epsilonMillis)); + } +} diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCoordinatorExerciser.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCoordinatorExerciser.java index 186fe290..9ba49d0d 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCoordinatorExerciser.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/LeaseCoordinatorExerciser.java @@ -46,6 +46,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; @Slf4j public class LeaseCoordinatorExerciser { + private static final long LEASE_TAKER_INTERVAL_MILLIS = -1L; private static final int MAX_LEASES_FOR_WORKER = Integer.MAX_VALUE; private static final int MAX_LEASES_TO_STEAL_AT_ONE_TIME = 1; private static final int MAX_LEASE_RENEWER_THREAD_COUNT = 20; @@ -85,7 +86,7 @@ public class LeaseCoordinatorExerciser { String workerIdentifier = "worker-" + Integer.toString(i); LeaseCoordinator coord = new DynamoDBLeaseCoordinator(leaseRefresher, workerIdentifier, leaseDurationMillis, - epsilonMillis, MAX_LEASES_FOR_WORKER, MAX_LEASES_TO_STEAL_AT_ONE_TIME, + epsilonMillis, LEASE_TAKER_INTERVAL_MILLIS, MAX_LEASES_FOR_WORKER, MAX_LEASES_TO_STEAL_AT_ONE_TIME, MAX_LEASE_RENEWER_THREAD_COUNT, INITIAL_LEASE_TABLE_READ_CAPACITY, INITIAL_LEASE_TABLE_WRITE_CAPACITY, metricsFactory); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorIntegrationTest.java index d89c010e..2a3b6827 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorIntegrationTest.java @@ -20,9 +20,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertThat; -import static org.junit.Assert.fail; -import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -34,11 +32,10 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.runners.MockitoJUnitRunner; -import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider; -import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.kinesis.checkpoint.dynamodb.DynamoDBCheckpointer; import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseCoordinator; +import software.amazon.kinesis.leases.LeaseIntegrationTest; import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.LeasingException; @@ -48,21 +45,19 @@ import software.amazon.kinesis.metrics.NullMetricsFactory; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; @RunWith(MockitoJUnitRunner.class) -public class DynamoDBLeaseCoordinatorIntegrationTest { - private static final int ATTEMPTS = 20; +public class DynamoDBLeaseCoordinatorIntegrationTest extends LeaseIntegrationTest { private static final String OPERATION = "TestOperation"; - private static final String TABLE_NAME = DynamoDBLeaseCoordinatorIntegrationTest.class.getSimpleName(); private static final String WORKER_ID = UUID.randomUUID().toString(); private static final long LEASE_DURATION_MILLIS = 5000L; private static final long EPSILON_MILLIS = 25L; + private static final long LEASE_TAKER_INTERVAL_MILLIS = -1L; private static final int MAX_LEASES_FOR_WORKER = Integer.MAX_VALUE; private static final int MAX_LEASES_TO_STEAL_AT_ONE_TIME = 1; private static final int MAX_LEASE_RENEWER_THREAD_COUNT = 20; private static final long INITIAL_LEASE_TABLE_READ_CAPACITY = 10L; private static final long INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10L; - private static DynamoDBLeaseRefresher leaseRefresher; private static DynamoDBCheckpointer dynamoDBCheckpointer; private LeaseCoordinator coordinator; @@ -71,34 +66,8 @@ public class DynamoDBLeaseCoordinatorIntegrationTest { @Before public void setup() throws ProvisionedThroughputException, DependencyException, InvalidStateException { - final boolean useConsistentReads = true; - if (leaseRefresher == null) { - DynamoDbAsyncClient dynamoDBClient = DynamoDbAsyncClient.builder() - .credentialsProvider(DefaultCredentialsProvider.create()).build(); - leaseRefresher = new DynamoDBLeaseRefresher(TABLE_NAME, dynamoDBClient, new DynamoDBLeaseSerializer(), - useConsistentReads, TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK); - } - leaseRefresher.createLeaseTableIfNotExists(10L, 10L); - - int retryLeft = ATTEMPTS; - - while (!leaseRefresher.leaseTableExists()) { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - // Sleep called. - } - retryLeft--; - if (retryLeft == 0) { - if (!leaseRefresher.leaseTableExists()) { - fail("Failed to create table"); - } - } - } - - leaseRefresher.deleteAll(); - coordinator = new DynamoDBLeaseCoordinator(leaseRefresher, WORKER_ID, LEASE_DURATION_MILLIS, - EPSILON_MILLIS, MAX_LEASES_FOR_WORKER, MAX_LEASES_TO_STEAL_AT_ONE_TIME, MAX_LEASE_RENEWER_THREAD_COUNT, + coordinator = new DynamoDBLeaseCoordinator(leaseRefresher, WORKER_ID, LEASE_DURATION_MILLIS, EPSILON_MILLIS, + LEASE_TAKER_INTERVAL_MILLIS, MAX_LEASES_FOR_WORKER, MAX_LEASES_TO_STEAL_AT_ONE_TIME, MAX_LEASE_RENEWER_THREAD_COUNT, INITIAL_LEASE_TABLE_READ_CAPACITY, INITIAL_LEASE_TABLE_WRITE_CAPACITY, metricsFactory); dynamoDBCheckpointer = new DynamoDBCheckpointer(coordinator, leaseRefresher); dynamoDBCheckpointer.operation(OPERATION); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorTest.java index caa7a6c7..93ac718b 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseCoordinatorTest.java @@ -22,6 +22,7 @@ public class DynamoDBLeaseCoordinatorTest { private static final String WORKER_ID = UUID.randomUUID().toString(); private static final long LEASE_DURATION_MILLIS = 5000L; private static final long EPSILON_MILLIS = 25L; + private static final long LEASE_TAKER_INTERVAL_MILLIS = -1L; private static final int MAX_LEASES_FOR_WORKER = Integer.MAX_VALUE; private static final int MAX_LEASES_TO_STEAL_AT_ONE_TIME = 1; private static final int MAX_LEASE_RENEWER_THREAD_COUNT = 20; @@ -39,8 +40,8 @@ public class DynamoDBLeaseCoordinatorTest { @Before public void setup() { - this.leaseCoordinator = new DynamoDBLeaseCoordinator(leaseRefresher, WORKER_ID, LEASE_DURATION_MILLIS, - EPSILON_MILLIS, MAX_LEASES_FOR_WORKER, MAX_LEASES_TO_STEAL_AT_ONE_TIME, MAX_LEASE_RENEWER_THREAD_COUNT, + this.leaseCoordinator = new DynamoDBLeaseCoordinator(leaseRefresher, WORKER_ID, LEASE_DURATION_MILLIS, EPSILON_MILLIS, + LEASE_TAKER_INTERVAL_MILLIS, MAX_LEASES_FOR_WORKER, MAX_LEASES_TO_STEAL_AT_ONE_TIME, MAX_LEASE_RENEWER_THREAD_COUNT, INITIAL_LEASE_TABLE_READ_CAPACITY, INITIAL_LEASE_TABLE_WRITE_CAPACITY, metricsFactory); } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerIntegrationTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerIntegrationTest.java index 475f1940..02875852 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerIntegrationTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/DynamoDBLeaseTakerIntegrationTest.java @@ -88,6 +88,21 @@ public class DynamoDBLeaseTakerIntegrationTest extends LeaseIntegrationTest { builder.takeMutateAssert(taker, "1"); } + @Test + public void testTakeEvictedLease() throws LeasingException, InterruptedException { + TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher); + + builder.withLease("1", "bar").build(); + + // This should not take anything because the lease is new and owned. + builder.takeMutateAssert(taker); + + builder.evictLease("1"); + + // This should take because the lease has been evicted + builder.takeMutateAssert(taker, "1"); + } + /** * Verify that we take leases non-greedily by setting up an environment where there are 4 leases and 2 workers, * only one of which holds a lease. This leaves 3 free leases, but LeaseTaker should decide it needs 2 leases and @@ -203,7 +218,7 @@ public class DynamoDBLeaseTakerIntegrationTest extends LeaseIntegrationTest { /** * Verify that one activity is stolen from the highest loaded server when a server needs more than one lease and no * expired leases are available. Setup: 4 leases, server foo holds 0, bar holds 1, baz holds 5. - * + * * Foo should steal from baz. */ @Test diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/TestHarnessBuilder.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/TestHarnessBuilder.java index 00db6a51..ae4b7249 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/TestHarnessBuilder.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/dynamodb/TestHarnessBuilder.java @@ -30,6 +30,7 @@ import software.amazon.kinesis.leases.LeaseRenewer; import software.amazon.kinesis.leases.exceptions.DependencyException; import software.amazon.kinesis.leases.exceptions.InvalidStateException; import software.amazon.kinesis.leases.exceptions.LeasingException; +import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; public class TestHarnessBuilder { @@ -96,6 +97,11 @@ public class TestHarnessBuilder { currentTimeNanos += millis * 1000000; } + public void evictLease(String shardId) throws DependencyException, InvalidStateException, ProvisionedThroughputException { + Lease lease = leases.get(shardId); + leaseRefresher.evictLease(lease); + } + public Map takeMutateAssert(DynamoDBLeaseTaker taker, int numToTake) throws LeasingException { Map result = taker.takeLeases(timeProvider); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java index f94d82fd..0d4ef538 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java @@ -118,6 +118,7 @@ public class ConsumerStatesTest { private long idleTimeInMillis = 1000L; private Optional logWarningForTaskAfterMillis = Optional.empty(); private SchemaRegistryDecoder schemaRegistryDecoder = null; + private boolean evictLeaseOnShutdown = false; @Before public void setup() { @@ -126,7 +127,7 @@ public class ConsumerStatesTest { taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, listShardsBackoffTimeInMillis, maxListShardsRetryAttempts, shouldCallProcessRecordsEvenForEmptyRecordList, idleTimeInMillis, INITIAL_POSITION_IN_STREAM, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, shardDetector, - new AggregatorUtil(), hierarchicalShardSyncer, metricsFactory, leaseCleanupManager, schemaRegistryDecoder); + new AggregatorUtil(), hierarchicalShardSyncer, metricsFactory, leaseCleanupManager, schemaRegistryDecoder, evictLeaseOnShutdown); when(shardInfo.shardId()).thenReturn("shardId-000000000000"); when(shardInfo.streamIdentifierSerOpt()).thenReturn(Optional.of(StreamIdentifier.singleStreamInstance(STREAM_NAME).serialize())); consumer = spy(new ShardConsumer(recordsPublisher, executorService, shardInfo, logWarningForTaskAfterMillis, diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java index 46677fb9..01ebf6f2 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShardConsumerTest.java @@ -53,6 +53,10 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.function.Function; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +import lombok.extern.slf4j.Slf4j; + import org.junit.After; import org.junit.Before; import org.junit.Ignore; @@ -66,15 +70,15 @@ import org.mockito.runners.MockitoJUnitRunner; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; -import com.google.common.util.concurrent.ThreadFactoryBuilder; - -import lombok.extern.slf4j.Slf4j; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.RequestDetails; +import software.amazon.kinesis.leases.Lease; +import software.amazon.kinesis.leases.LeaseCoordinator; +import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.ShardInfo; +import software.amazon.kinesis.lifecycle.ConsumerStates.ShardConsumerState; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.TaskExecutionListenerInput; -import software.amazon.kinesis.lifecycle.ConsumerStates.ShardConsumerState; import software.amazon.kinesis.retrieval.RecordsDeliveryAck; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RecordsRetrieved; @@ -136,6 +140,12 @@ public class ShardConsumerTest { private ConsumerState shutdownRequestedAwaitState; @Mock private TaskExecutionListener taskExecutionListener; + @Mock + private LeaseCoordinator leaseCoordinator; + @Mock + private LeaseRefresher leaseRefresher; + @Mock + private Lease lease; private ProcessRecordsInput processRecordsInput; @@ -869,6 +879,186 @@ public class ShardConsumerTest { verifyNoMoreInteractions(taskExecutionListener); } + @Test + public void testLeaseEvictedOnShutdownWhenEnabled() throws Exception { + CyclicBarrier taskBarrier = new CyclicBarrier(2); + + TestPublisher cache = new TestPublisher(); + ShardConsumer consumer = new ShardConsumer(cache, executorService, shardInfo, logWarningForTaskAfterMillis, + shardConsumerArgument, initialState, t -> t, 1, taskExecutionListener, 0); + + mockSuccessfulInitialize(null); + + mockSuccessfulProcessing(taskBarrier); + + when(shardConsumerArgument.evictLeaseOnShutdown()).thenReturn(true); + when(shardConsumerArgument.leaseCoordinator()).thenReturn(leaseCoordinator); + when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher); + when(leaseCoordinator.getCurrentlyHeldLease(ShardInfo.getLeaseKey(shardInfo))).thenReturn(lease); + + when(processingState.shutdownTransition(eq(ShutdownReason.REQUESTED))).thenReturn(shutdownRequestedState); + when(shutdownRequestedState.requiresDataAvailability()).thenReturn(false); + when(shutdownRequestedState.createTask(any(), any(), any())).thenReturn(shutdownRequestedTask); + when(shutdownRequestedState.taskType()).thenReturn(TaskType.SHUTDOWN_NOTIFICATION); + when(shutdownRequestedTask.call()).thenReturn(new TaskResult(null)); + + when(shutdownRequestedState.shutdownTransition(eq(ShutdownReason.REQUESTED))) + .thenReturn(shutdownRequestedAwaitState); + when(shutdownRequestedState.shutdownTransition(eq(ShutdownReason.LEASE_LOST))).thenReturn(shutdownState); + when(shutdownRequestedAwaitState.requiresDataAvailability()).thenReturn(false); + when(shutdownRequestedAwaitState.createTask(any(), any(), any())).thenReturn(null); + when(shutdownRequestedAwaitState.shutdownTransition(eq(ShutdownReason.REQUESTED))) + .thenReturn(shutdownRequestedState); + when(shutdownRequestedAwaitState.shutdownTransition(eq(ShutdownReason.LEASE_LOST))).thenReturn(shutdownState); + when(shutdownRequestedAwaitState.taskType()).thenReturn(TaskType.SHUTDOWN_COMPLETE); + + mockSuccessfulShutdown(null); + + boolean init = consumer.initializeComplete().get(); + while (!init) { + init = consumer.initializeComplete().get(); + } + + consumer.subscribe(); + cache.awaitInitialSetup(); + + cache.publish(); + awaitAndResetBarrier(taskBarrier); + cache.awaitRequest(); + + cache.publish(); + awaitAndResetBarrier(taskBarrier); + cache.awaitRequest(); + + consumer.gracefulShutdown(shutdownNotification); + boolean shutdownComplete = consumer.shutdownComplete().get(); + assertThat(shutdownComplete, equalTo(false)); + shutdownComplete = consumer.shutdownComplete().get(); + assertThat(shutdownComplete, equalTo(false)); + + consumer.leaseLost(); + shutdownComplete = consumer.shutdownComplete().get(); + assertThat(shutdownComplete, equalTo(false)); + shutdownComplete = consumer.shutdownComplete().get(); + assertThat(shutdownComplete, equalTo(true)); + + verify(processingState, times(2)).createTask(any(), any(), any()); + verify(shutdownRequestedState, never()).shutdownTransition(eq(ShutdownReason.LEASE_LOST)); + verify(shutdownRequestedState).createTask(any(), any(), any()); + verify(shutdownRequestedState).shutdownTransition(eq(ShutdownReason.REQUESTED)); + verify(shutdownRequestedAwaitState).createTask(any(), any(), any()); + verify(shutdownRequestedAwaitState).shutdownTransition(eq(ShutdownReason.LEASE_LOST)); + verify(taskExecutionListener, times(1)).beforeTaskExecution(initialTaskInput); + verify(taskExecutionListener, times(2)).beforeTaskExecution(processTaskInput); + verify(taskExecutionListener, times(1)).beforeTaskExecution(shutdownRequestedTaskInput); + verify(taskExecutionListener, times(1)).beforeTaskExecution(shutdownRequestedAwaitTaskInput); + verify(taskExecutionListener, times(1)).beforeTaskExecution(shutdownTaskInput); + + initialTaskInput = initialTaskInput.toBuilder().taskOutcome(TaskOutcome.SUCCESSFUL).build(); + processTaskInput = processTaskInput.toBuilder().taskOutcome(TaskOutcome.SUCCESSFUL).build(); + shutdownRequestedTaskInput = shutdownRequestedTaskInput.toBuilder().taskOutcome(TaskOutcome.SUCCESSFUL).build(); + shutdownTaskInput = shutdownTaskInput.toBuilder().taskOutcome(TaskOutcome.SUCCESSFUL).build(); + // No task is created/run for this shutdownRequestedAwaitState, so there's no task outcome. + + verify(taskExecutionListener, times(1)).afterTaskExecution(initialTaskInput); + verify(taskExecutionListener, times(2)).afterTaskExecution(processTaskInput); + verify(taskExecutionListener, times(1)).afterTaskExecution(shutdownRequestedTaskInput); + verify(taskExecutionListener, times(1)).afterTaskExecution(shutdownRequestedAwaitTaskInput); + verify(taskExecutionListener, times(1)).afterTaskExecution(shutdownTaskInput); + verifyNoMoreInteractions(taskExecutionListener); + + verify(leaseRefresher).evictLease(lease); + } + + @Test + public void testLeaseNotEvictedOnShutdownByDefault() throws Exception { + CyclicBarrier taskBarrier = new CyclicBarrier(2); + + TestPublisher cache = new TestPublisher(); + ShardConsumer consumer = new ShardConsumer(cache, executorService, shardInfo, logWarningForTaskAfterMillis, + shardConsumerArgument, initialState, t -> t, 1, taskExecutionListener, 0); + + mockSuccessfulInitialize(null); + + mockSuccessfulProcessing(taskBarrier); + + when(shardConsumerArgument.evictLeaseOnShutdown()).thenReturn(false); + + when(processingState.shutdownTransition(eq(ShutdownReason.REQUESTED))).thenReturn(shutdownRequestedState); + when(shutdownRequestedState.requiresDataAvailability()).thenReturn(false); + when(shutdownRequestedState.createTask(any(), any(), any())).thenReturn(shutdownRequestedTask); + when(shutdownRequestedState.taskType()).thenReturn(TaskType.SHUTDOWN_NOTIFICATION); + when(shutdownRequestedTask.call()).thenReturn(new TaskResult(null)); + + when(shutdownRequestedState.shutdownTransition(eq(ShutdownReason.REQUESTED))) + .thenReturn(shutdownRequestedAwaitState); + when(shutdownRequestedState.shutdownTransition(eq(ShutdownReason.LEASE_LOST))).thenReturn(shutdownState); + when(shutdownRequestedAwaitState.requiresDataAvailability()).thenReturn(false); + when(shutdownRequestedAwaitState.createTask(any(), any(), any())).thenReturn(null); + when(shutdownRequestedAwaitState.shutdownTransition(eq(ShutdownReason.REQUESTED))) + .thenReturn(shutdownRequestedState); + when(shutdownRequestedAwaitState.shutdownTransition(eq(ShutdownReason.LEASE_LOST))).thenReturn(shutdownState); + when(shutdownRequestedAwaitState.taskType()).thenReturn(TaskType.SHUTDOWN_COMPLETE); + + mockSuccessfulShutdown(null); + + boolean init = consumer.initializeComplete().get(); + while (!init) { + init = consumer.initializeComplete().get(); + } + + consumer.subscribe(); + cache.awaitInitialSetup(); + + cache.publish(); + awaitAndResetBarrier(taskBarrier); + cache.awaitRequest(); + + cache.publish(); + awaitAndResetBarrier(taskBarrier); + cache.awaitRequest(); + + consumer.gracefulShutdown(shutdownNotification); + boolean shutdownComplete = consumer.shutdownComplete().get(); + assertThat(shutdownComplete, equalTo(false)); + shutdownComplete = consumer.shutdownComplete().get(); + assertThat(shutdownComplete, equalTo(false)); + + consumer.leaseLost(); + shutdownComplete = consumer.shutdownComplete().get(); + assertThat(shutdownComplete, equalTo(false)); + shutdownComplete = consumer.shutdownComplete().get(); + assertThat(shutdownComplete, equalTo(true)); + + verify(processingState, times(2)).createTask(any(), any(), any()); + verify(shutdownRequestedState, never()).shutdownTransition(eq(ShutdownReason.LEASE_LOST)); + verify(shutdownRequestedState).createTask(any(), any(), any()); + verify(shutdownRequestedState).shutdownTransition(eq(ShutdownReason.REQUESTED)); + verify(shutdownRequestedAwaitState).createTask(any(), any(), any()); + verify(shutdownRequestedAwaitState).shutdownTransition(eq(ShutdownReason.LEASE_LOST)); + verify(taskExecutionListener, times(1)).beforeTaskExecution(initialTaskInput); + verify(taskExecutionListener, times(2)).beforeTaskExecution(processTaskInput); + verify(taskExecutionListener, times(1)).beforeTaskExecution(shutdownRequestedTaskInput); + verify(taskExecutionListener, times(1)).beforeTaskExecution(shutdownRequestedAwaitTaskInput); + verify(taskExecutionListener, times(1)).beforeTaskExecution(shutdownTaskInput); + + initialTaskInput = initialTaskInput.toBuilder().taskOutcome(TaskOutcome.SUCCESSFUL).build(); + processTaskInput = processTaskInput.toBuilder().taskOutcome(TaskOutcome.SUCCESSFUL).build(); + shutdownRequestedTaskInput = shutdownRequestedTaskInput.toBuilder().taskOutcome(TaskOutcome.SUCCESSFUL).build(); + shutdownTaskInput = shutdownTaskInput.toBuilder().taskOutcome(TaskOutcome.SUCCESSFUL).build(); + // No task is created/run for this shutdownRequestedAwaitState, so there's no task outcome. + + verify(taskExecutionListener, times(1)).afterTaskExecution(initialTaskInput); + verify(taskExecutionListener, times(2)).afterTaskExecution(processTaskInput); + verify(taskExecutionListener, times(1)).afterTaskExecution(shutdownRequestedTaskInput); + verify(taskExecutionListener, times(1)).afterTaskExecution(shutdownRequestedAwaitTaskInput); + verify(taskExecutionListener, times(1)).afterTaskExecution(shutdownTaskInput); + verifyNoMoreInteractions(taskExecutionListener); + + verifyZeroInteractions(leaseCoordinator); + verifyZeroInteractions(leaseRefresher); + } + private void mockSuccessfulShutdown(CyclicBarrier taskCallBarrier) { mockSuccessfulShutdown(taskCallBarrier, null); }