Evict lease on shutdown

This commit is contained in:
Mathieu Fortin 2022-10-17 12:13:46 -04:00
parent 05ed537572
commit c5d62f1e90
18 changed files with 402 additions and 98 deletions

3
.gitignore vendored
View file

@ -2,4 +2,5 @@ target/
AwsCredentials.properties
.idea
*.iml
.sdkmanrc
.vscode

View file

@ -28,4 +28,22 @@ public class CommonCalculations {
public static long getRenewerTakerIntervalMillis(long leaseDurationMillis, long epsilonMillis) {
return leaseDurationMillis / 3 - epsilonMillis;
}
/**
* Convenience method for calculating lease taker intervals in milliseconds.
*
* @param leaseTakerIntervalMillis Current value for interval (from default or overriden).
* @param leaseDurationMillis Duration of a lease
* @param epsilonMillis Allow for some variance when calculating lease expirations
* @return lease taker interval.
*/
public static long getLeaseTakerIntervalMillis(
long leaseTakerIntervalMillis, long leaseDurationMillis, long epsilonMillis
) {
if (leaseTakerIntervalMillis > 0) {
return leaseTakerIntervalMillis;
}
return (leaseDurationMillis + epsilonMillis) * 2;
}
}

View file

@ -945,7 +945,8 @@ public class Scheduler implements Runnable {
hierarchicalShardSyncerProvider.apply(streamConfig),
metricsFactory,
leaseCleanupManager,
schemaRegistryDecoder
schemaRegistryDecoder,
leaseManagementConfig.evictLeaseOnShutdown()
);
return new ShardConsumer(cache, executorService, shardInfo, lifecycleConfig.logWarningForTaskAfterMillis(),
argument, lifecycleConfig.taskExecutionListener(), lifecycleConfig.readTimeoutsToIgnoreBeforeWarning());

View file

@ -195,6 +195,13 @@ public class Lease {
}
}
/**
* @return true if this lease is unassigned (no assigned owner), false otherwise.
*/
public boolean isUnassigned() {
return leaseOwner == null;
}
/**
* Sets lastCounterIncrementNanos
*
@ -318,6 +325,4 @@ public class Lease {
public Lease copy() {
return new Lease(this);
}
}

View file

@ -224,6 +224,18 @@ public class LeaseManagementConfig {
private long listShardsCacheAllowedAgeInSeconds = 30;
private int cacheMissWarningModulus = 250;
/**
* Interval at which the lease taker will execute.
* If unspecified, an interval will be calculated based on the lease duration.
*/
private long leaseTakerIntervalMillis = -1L;
/**
* If leases should be evicted or not on shutdown requested.
* By default, leases are not evicted.
*/
private boolean evictLeaseOnShutdown = false;
private MetricsFactory metricsFactory = new NullMetricsFactory();
@Deprecated
@ -326,6 +338,7 @@ public class LeaseManagementConfig {
initialPositionInStream(),
failoverTimeMillis(),
epsilonMillis(),
leaseTakerIntervalMillis,
maxLeasesForWorker(),
maxLeasesToStealAtOneTime(),
maxLeaseRenewalThreads(),
@ -361,6 +374,7 @@ public class LeaseManagementConfig {
executorService(),
failoverTimeMillis(),
epsilonMillis(),
leaseTakerIntervalMillis,
maxLeasesForWorker(),
maxLeasesToStealAtOneTime(),
maxLeaseRenewalThreads(),

View file

@ -49,6 +49,7 @@ import software.amazon.kinesis.metrics.MetricsScope;
import software.amazon.kinesis.metrics.MetricsUtil;
import static software.amazon.kinesis.common.CommonCalculations.getRenewerTakerIntervalMillis;
import static software.amazon.kinesis.common.CommonCalculations.getLeaseTakerIntervalMillis;
/**
* LeaseCoordinator abstracts away LeaseTaker and LeaseRenewer from the application code that's using leasing. It owns
@ -108,12 +109,13 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
final String workerIdentifier,
final long leaseDurationMillis,
final long epsilonMillis,
final long leaseTakerIntervalMillis,
final int maxLeasesForWorker,
final int maxLeasesToStealAtOneTime,
final int maxLeaseRenewerThreadCount,
final MetricsFactory metricsFactory) {
this(leaseRefresher, workerIdentifier, leaseDurationMillis, epsilonMillis, maxLeasesForWorker,
maxLeasesToStealAtOneTime, maxLeaseRenewerThreadCount,
this(leaseRefresher, workerIdentifier, leaseDurationMillis, epsilonMillis, leaseTakerIntervalMillis,
maxLeasesForWorker, maxLeasesToStealAtOneTime, maxLeaseRenewerThreadCount,
TableConstants.DEFAULT_INITIAL_LEASE_TABLE_READ_CAPACITY,
TableConstants.DEFAULT_INITIAL_LEASE_TABLE_WRITE_CAPACITY, metricsFactory);
}
@ -144,6 +146,7 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
final String workerIdentifier,
final long leaseDurationMillis,
final long epsilonMillis,
final long leaseTakerIntervalMillis,
final int maxLeasesForWorker,
final int maxLeasesToStealAtOneTime,
final int maxLeaseRenewerThreadCount,
@ -158,7 +161,7 @@ public class DynamoDBLeaseCoordinator implements LeaseCoordinator {
this.leaseRenewer = new DynamoDBLeaseRenewer(
leaseRefresher, workerIdentifier, leaseDurationMillis, leaseRenewalThreadpool, metricsFactory);
this.renewerIntervalMillis = getRenewerTakerIntervalMillis(leaseDurationMillis, epsilonMillis);
this.takerIntervalMillis = (leaseDurationMillis + epsilonMillis) * 2;
this.takerIntervalMillis = getLeaseTakerIntervalMillis(leaseTakerIntervalMillis, leaseDurationMillis, epsilonMillis);
if (initialLeaseTableReadCapacity <= 0) {
throw new IllegalArgumentException("readCapacity should be >= 1");
}

View file

@ -68,6 +68,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
private final long failoverTimeMillis;
private final long epsilonMillis;
private final long leaseTakerIntervalMillis;
private final int maxLeasesForWorker;
private final int maxLeasesToStealAtOneTime;
private final int maxLeaseRenewalThreads;
@ -119,14 +120,14 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName,
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
final ExecutorService executorService, final InitialPositionInStreamExtended initialPositionInStream,
final long failoverTimeMillis, final long epsilonMillis, final int maxLeasesForWorker,
final long failoverTimeMillis, final long epsilonMillis, final long leaseTakerIntervalMillis, final int maxLeasesForWorker,
final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads,
final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards,
final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis,
final int maxListShardsRetryAttempts, final int maxCacheMissesBeforeReload,
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus) {
this(kinesisClient, streamName, dynamoDBClient, tableName, workerIdentifier, executorService,
initialPositionInStream, failoverTimeMillis, epsilonMillis, maxLeasesForWorker,
initialPositionInStream, failoverTimeMillis, epsilonMillis, leaseTakerIntervalMillis, maxLeasesForWorker,
maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
@ -169,7 +170,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName,
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
final ExecutorService executorService, final InitialPositionInStreamExtended initialPositionInStream,
final long failoverTimeMillis, final long epsilonMillis, final int maxLeasesForWorker,
final long failoverTimeMillis, final long epsilonMillis, final long leaseTakerIntervalMillis, final int maxLeasesForWorker,
final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads,
final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards,
final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis,
@ -177,7 +178,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
final long listShardsCacheAllowedAgeInSeconds, final int cacheMissWarningModulus,
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity) {
this(kinesisClient, streamName, dynamoDBClient, tableName, workerIdentifier, executorService,
initialPositionInStream, failoverTimeMillis, epsilonMillis, maxLeasesForWorker,
initialPositionInStream, failoverTimeMillis, epsilonMillis, leaseTakerIntervalMillis, maxLeasesForWorker,
maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
@ -219,7 +220,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName,
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
final ExecutorService executorService, final InitialPositionInStreamExtended initialPositionInStream,
final long failoverTimeMillis, final long epsilonMillis, final int maxLeasesForWorker,
final long failoverTimeMillis, final long epsilonMillis, final long leaseTakerIntervalMillis, final int maxLeasesForWorker,
final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads,
final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards,
final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis,
@ -228,7 +229,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
final long initialLeaseTableReadCapacity, final long initialLeaseTableWriteCapacity,
final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback) {
this(kinesisClient, streamName, dynamoDBClient, tableName, workerIdentifier, executorService,
initialPositionInStream, failoverTimeMillis, epsilonMillis, maxLeasesForWorker,
initialPositionInStream, failoverTimeMillis, epsilonMillis, leaseTakerIntervalMillis, maxLeasesForWorker,
maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
@ -270,7 +271,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName,
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
final ExecutorService executorService, final InitialPositionInStreamExtended initialPositionInStream,
final long failoverTimeMillis, final long epsilonMillis, final int maxLeasesForWorker,
final long failoverTimeMillis, final long epsilonMillis, final long leaseTakerIntervalMillis, final int maxLeasesForWorker,
final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads,
final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards,
final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis,
@ -280,7 +281,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
final HierarchicalShardSyncer hierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
Duration dynamoDbRequestTimeout) {
this(kinesisClient, streamName, dynamoDBClient, tableName, workerIdentifier, executorService,
initialPositionInStream, failoverTimeMillis, epsilonMillis, maxLeasesForWorker,
initialPositionInStream, failoverTimeMillis, epsilonMillis, leaseTakerIntervalMillis, maxLeasesForWorker,
maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
@ -323,7 +324,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final String streamName,
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
final ExecutorService executorService, final InitialPositionInStreamExtended initialPositionInStream,
final long failoverTimeMillis, final long epsilonMillis, final int maxLeasesForWorker,
final long failoverTimeMillis, final long epsilonMillis, final long leaseTakerIntervalMillis, final int maxLeasesForWorker,
final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads,
final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards,
final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis,
@ -334,7 +335,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
Duration dynamoDbRequestTimeout, BillingMode billingMode) {
this(kinesisClient, new StreamConfig(StreamIdentifier.singleStreamInstance(streamName), initialPositionInStream), dynamoDBClient, tableName,
workerIdentifier, executorService, failoverTimeMillis, epsilonMillis, maxLeasesForWorker,
workerIdentifier, executorService, failoverTimeMillis, epsilonMillis, leaseTakerIntervalMillis, maxLeasesForWorker,
maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
@ -374,7 +375,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
*/
private DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient, final StreamConfig streamConfig,
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
final ExecutorService executorService, final long failoverTimeMillis, final long epsilonMillis,
final ExecutorService executorService, final long failoverTimeMillis, final long epsilonMillis, final long leaseTakerIntervalMillis,
final int maxLeasesForWorker, final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads,
final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards,
final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis,
@ -384,7 +385,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
final HierarchicalShardSyncer deprecatedHierarchicalShardSyncer, final TableCreatorCallback tableCreatorCallback,
Duration dynamoDbRequestTimeout, BillingMode billingMode, LeaseSerializer leaseSerializer) {
this(kinesisClient, dynamoDBClient, tableName,
workerIdentifier, executorService, failoverTimeMillis, epsilonMillis, maxLeasesForWorker,
workerIdentifier, executorService, failoverTimeMillis, epsilonMillis, leaseTakerIntervalMillis, maxLeasesForWorker,
maxLeasesToStealAtOneTime, maxLeaseRenewalThreads, cleanupLeasesUponShardCompletion,
ignoreUnexpectedChildShards, shardSyncIntervalMillis, consistentReads, listShardsBackoffTimeMillis,
maxListShardsRetryAttempts, maxCacheMissesBeforeReload, listShardsCacheAllowedAgeInSeconds,
@ -428,7 +429,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
*/
public DynamoDBLeaseManagementFactory(final KinesisAsyncClient kinesisClient,
final DynamoDbAsyncClient dynamoDBClient, final String tableName, final String workerIdentifier,
final ExecutorService executorService, final long failoverTimeMillis, final long epsilonMillis,
final ExecutorService executorService, final long failoverTimeMillis, final long epsilonMillis, final long leaseTakerIntervalMillis,
final int maxLeasesForWorker, final int maxLeasesToStealAtOneTime, final int maxLeaseRenewalThreads,
final boolean cleanupLeasesUponShardCompletion, final boolean ignoreUnexpectedChildShards,
final long shardSyncIntervalMillis, final boolean consistentReads, final long listShardsBackoffTimeMillis,
@ -446,6 +447,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
this.executorService = executorService;
this.failoverTimeMillis = failoverTimeMillis;
this.epsilonMillis = epsilonMillis;
this.leaseTakerIntervalMillis = leaseTakerIntervalMillis;
this.maxLeasesForWorker = maxLeasesForWorker;
this.maxLeasesToStealAtOneTime = maxLeasesToStealAtOneTime;
this.maxLeaseRenewalThreads = maxLeaseRenewalThreads;
@ -476,6 +478,7 @@ public class DynamoDBLeaseManagementFactory implements LeaseManagementFactory {
workerIdentifier,
failoverTimeMillis,
epsilonMillis,
leaseTakerIntervalMillis,
maxLeasesForWorker,
maxLeasesToStealAtOneTime,
maxLeaseRenewalThreads,

View file

@ -191,9 +191,9 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
return takenLeases;
}
List<Lease> expiredLeases = getExpiredLeases();
List<Lease> availableLeases = getAvailableLeases();
Set<Lease> leasesToTake = computeLeasesToTake(expiredLeases);
Set<Lease> leasesToTake = computeLeasesToTake(availableLeases);
leasesToTake = updateStaleLeasesWithLatestState(updateAllLeasesTotalTimeMillis, leasesToTake);
Set<String> untakenLeaseKeys = new HashSet<>();
@ -358,34 +358,43 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
}
/**
* @return list of leases that were expired as of our last scan.
* @return list of leases that are available for the taking.
* Expired leases and orphan leases (without owner) are considered available.
*/
private List<Lease> getExpiredLeases() {
List<Lease> expiredLeases = new ArrayList<>();
private List<Lease> getAvailableLeases() {
List<Lease> availableLeases = new ArrayList<>();
for (Lease lease : allLeases.values()) {
if (lease.isExpired(leaseDurationNanos, lastScanTimeNanos)) {
expiredLeases.add(lease);
if (isExpired(lease) || isUnassigned(lease)) {
availableLeases.add(lease);
}
}
return expiredLeases;
return availableLeases;
}
private boolean isUnassigned(Lease lease) {
return lease.isUnassigned();
}
private boolean isExpired(Lease lease) {
return lease.isExpired(leaseDurationNanos, lastScanTimeNanos);
}
/**
* Compute the number of leases I should try to take based on the state of the system.
*
* @param expiredLeases list of leases we determined to be expired
* @param availableLeases list of leases we determined to be available
* @return set of leases to take.
*/
private Set<Lease> computeLeasesToTake(List<Lease> expiredLeases) {
Map<String, Integer> leaseCounts = computeLeaseCounts(expiredLeases);
private Set<Lease> computeLeasesToTake(List<Lease> availableLeases) {
Map<String, Integer> leaseCounts = computeLeaseCounts(availableLeases);
Set<Lease> leasesToTake = new HashSet<>();
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, TAKE_LEASES_DIMENSION);
MetricsUtil.addWorkerIdentifier(scope, workerIdentifier);
List<Lease> veryOldLeases = new ArrayList<>();
final int numAvailableLeases = expiredLeases.size();
final int numAvailableLeases = availableLeases.size();
int numLeases = 0;
int numWorkers = 0;
int numLeasesToReachTarget = 0;
@ -454,16 +463,16 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
return leasesToTake;
}
// Shuffle expiredLeases so workers don't all try to contend for the same leases.
Collections.shuffle(expiredLeases);
// Shuffle availableLeases so workers don't all try to contend for the same leases.
Collections.shuffle(availableLeases);
if (expiredLeases.size() > 0) {
// If we have expired leases, get up to <needed> leases from expiredLeases
for (; numLeasesToReachTarget > 0 && expiredLeases.size() > 0; numLeasesToReachTarget--) {
leasesToTake.add(expiredLeases.remove(0));
if (availableLeases.size() > 0) {
// If we have available leases, get up to <needed> leases from availableLeases
for (; numLeasesToReachTarget > 0 && availableLeases.size() > 0; numLeasesToReachTarget--) {
leasesToTake.add(availableLeases.remove(0));
}
} else {
// If there are no expired leases and we need a lease, consider stealing.
// If there are no available leases and we need a lease, consider stealing.
List<Lease> leasesToSteal = chooseLeasesToSteal(leaseCounts, numLeasesToReachTarget, target);
for (Lease leaseToSteal : leasesToSteal) {
log.info("Worker {} needed {} leases but none were expired, so it will steal lease {} from {}",
@ -482,7 +491,7 @@ public class DynamoDBLeaseTaker implements LeaseTaker {
}
} finally {
scope.addData("ExpiredLeases", expiredLeases.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY);
scope.addData("AvailableLeases", availableLeases.size(), StandardUnit.COUNT, MetricsLevel.SUMMARY);
scope.addData("LeaseSpillover", leaseSpillover, StandardUnit.COUNT, MetricsLevel.SUMMARY);
scope.addData("LeasesToTake", leasesToTake.size(), StandardUnit.COUNT, MetricsLevel.DETAILED);
scope.addData("NeededLeases", Math.max(numLeasesToReachTarget, 0), StandardUnit.COUNT, MetricsLevel.DETAILED);

View file

@ -16,7 +16,6 @@ package software.amazon.kinesis.lifecycle;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
@ -24,8 +23,6 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.function.Function;
import org.reactivestreams.Subscription;
import com.google.common.annotations.VisibleForTesting;
import lombok.AccessLevel;
@ -33,10 +30,18 @@ import lombok.Getter;
import lombok.NonNull;
import lombok.experimental.Accessors;
import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.kinesis.model.ChildShard;
import org.reactivestreams.Subscription;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.exceptions.internal.BlockedOnParentShardException;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.TaskExecutionListenerInput;
import software.amazon.kinesis.metrics.MetricsCollectingTaskDecorator;
@ -436,6 +441,21 @@ public class ShardConsumer {
if (shutdownReason == null || shutdownReason.canTransitionTo(reason)) {
shutdownReason = reason;
}
if (reason == ShutdownReason.REQUESTED && shardConsumerArgument.evictLeaseOnShutdown()) {
log.debug("{} : Shutdown({}): Evicting lease.", streamIdentifier, shardInfo.shardId());
LeaseCoordinator leaseCoordinator = shardConsumerArgument.leaseCoordinator();
LeaseRefresher leaseRefresher = leaseCoordinator.leaseRefresher();
Lease currentShardLease = leaseCoordinator.getCurrentlyHeldLease(ShardInfo.getLeaseKey(shardInfo));
try {
leaseRefresher.evictLease(currentShardLease);
} catch (DependencyException | InvalidStateException | ProvisionedThroughputException ex) {
// This is recoverable. This lease will expire and some other consumer will take it.
log.error("Could not evict lease on requested shutdown of {}", shardInfo.shardId(), ex);
}
}
}
}

View file

@ -75,4 +75,5 @@ public class ShardConsumerArgument {
private final MetricsFactory metricsFactory;
private final LeaseCleanupManager leaseCleanupManager;
private final SchemaRegistryDecoder schemaRegistryDecoder;
private final boolean evictLeaseOnShutdown;
}

View file

@ -0,0 +1,46 @@
/*
* Copyright 2019 Amazon.com, Inc. or its affiliates.
* Licensed under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package software.amazon.kinesis.common;
import static org.junit.Assert.assertEquals;
import org.junit.Test;
public class CommonCalculationTest {
@Test
public void testGetLeaseTakerIntervalMillisWithDefault() {
long leaseTakerIntervalMillis = -1L;
long leaseDurationMillis = 10000L;
long epsilonMillis = 25L;
long expected = (leaseDurationMillis + epsilonMillis) * 2;
assertEquals(expected, CommonCalculations.getLeaseTakerIntervalMillis(
leaseTakerIntervalMillis, leaseDurationMillis, epsilonMillis));
}
@Test
public void testGetLeaseTakerIntervalMillisWhenOverriden() {
long leaseTakerIntervalMillis = 200L;
long leaseDurationMillis = 10000L;
long epsilonMillis = 25L;
long expected = leaseTakerIntervalMillis;
assertEquals(expected, CommonCalculations.getLeaseTakerIntervalMillis(
leaseTakerIntervalMillis, leaseDurationMillis, epsilonMillis));
}
}

View file

@ -46,6 +46,7 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
@Slf4j
public class LeaseCoordinatorExerciser {
private static final long LEASE_TAKER_INTERVAL_MILLIS = -1L;
private static final int MAX_LEASES_FOR_WORKER = Integer.MAX_VALUE;
private static final int MAX_LEASES_TO_STEAL_AT_ONE_TIME = 1;
private static final int MAX_LEASE_RENEWER_THREAD_COUNT = 20;
@ -85,7 +86,7 @@ public class LeaseCoordinatorExerciser {
String workerIdentifier = "worker-" + Integer.toString(i);
LeaseCoordinator coord = new DynamoDBLeaseCoordinator(leaseRefresher, workerIdentifier, leaseDurationMillis,
epsilonMillis, MAX_LEASES_FOR_WORKER, MAX_LEASES_TO_STEAL_AT_ONE_TIME,
epsilonMillis, LEASE_TAKER_INTERVAL_MILLIS, MAX_LEASES_FOR_WORKER, MAX_LEASES_TO_STEAL_AT_ONE_TIME,
MAX_LEASE_RENEWER_THREAD_COUNT, INITIAL_LEASE_TABLE_READ_CAPACITY,
INITIAL_LEASE_TABLE_WRITE_CAPACITY, metricsFactory);

View file

@ -20,9 +20,7 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
@ -34,11 +32,10 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.awssdk.auth.credentials.DefaultCredentialsProvider;
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient;
import software.amazon.kinesis.checkpoint.dynamodb.DynamoDBCheckpointer;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseIntegrationTest;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.LeasingException;
@ -48,21 +45,19 @@ import software.amazon.kinesis.metrics.NullMetricsFactory;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
@RunWith(MockitoJUnitRunner.class)
public class DynamoDBLeaseCoordinatorIntegrationTest {
private static final int ATTEMPTS = 20;
public class DynamoDBLeaseCoordinatorIntegrationTest extends LeaseIntegrationTest {
private static final String OPERATION = "TestOperation";
private static final String TABLE_NAME = DynamoDBLeaseCoordinatorIntegrationTest.class.getSimpleName();
private static final String WORKER_ID = UUID.randomUUID().toString();
private static final long LEASE_DURATION_MILLIS = 5000L;
private static final long EPSILON_MILLIS = 25L;
private static final long LEASE_TAKER_INTERVAL_MILLIS = -1L;
private static final int MAX_LEASES_FOR_WORKER = Integer.MAX_VALUE;
private static final int MAX_LEASES_TO_STEAL_AT_ONE_TIME = 1;
private static final int MAX_LEASE_RENEWER_THREAD_COUNT = 20;
private static final long INITIAL_LEASE_TABLE_READ_CAPACITY = 10L;
private static final long INITIAL_LEASE_TABLE_WRITE_CAPACITY = 10L;
private static DynamoDBLeaseRefresher leaseRefresher;
private static DynamoDBCheckpointer dynamoDBCheckpointer;
private LeaseCoordinator coordinator;
@ -71,34 +66,8 @@ public class DynamoDBLeaseCoordinatorIntegrationTest {
@Before
public void setup() throws ProvisionedThroughputException, DependencyException, InvalidStateException {
final boolean useConsistentReads = true;
if (leaseRefresher == null) {
DynamoDbAsyncClient dynamoDBClient = DynamoDbAsyncClient.builder()
.credentialsProvider(DefaultCredentialsProvider.create()).build();
leaseRefresher = new DynamoDBLeaseRefresher(TABLE_NAME, dynamoDBClient, new DynamoDBLeaseSerializer(),
useConsistentReads, TableCreatorCallback.NOOP_TABLE_CREATOR_CALLBACK);
}
leaseRefresher.createLeaseTableIfNotExists(10L, 10L);
int retryLeft = ATTEMPTS;
while (!leaseRefresher.leaseTableExists()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Sleep called.
}
retryLeft--;
if (retryLeft == 0) {
if (!leaseRefresher.leaseTableExists()) {
fail("Failed to create table");
}
}
}
leaseRefresher.deleteAll();
coordinator = new DynamoDBLeaseCoordinator(leaseRefresher, WORKER_ID, LEASE_DURATION_MILLIS,
EPSILON_MILLIS, MAX_LEASES_FOR_WORKER, MAX_LEASES_TO_STEAL_AT_ONE_TIME, MAX_LEASE_RENEWER_THREAD_COUNT,
coordinator = new DynamoDBLeaseCoordinator(leaseRefresher, WORKER_ID, LEASE_DURATION_MILLIS, EPSILON_MILLIS,
LEASE_TAKER_INTERVAL_MILLIS, MAX_LEASES_FOR_WORKER, MAX_LEASES_TO_STEAL_AT_ONE_TIME, MAX_LEASE_RENEWER_THREAD_COUNT,
INITIAL_LEASE_TABLE_READ_CAPACITY, INITIAL_LEASE_TABLE_WRITE_CAPACITY, metricsFactory);
dynamoDBCheckpointer = new DynamoDBCheckpointer(coordinator, leaseRefresher);
dynamoDBCheckpointer.operation(OPERATION);

View file

@ -22,6 +22,7 @@ public class DynamoDBLeaseCoordinatorTest {
private static final String WORKER_ID = UUID.randomUUID().toString();
private static final long LEASE_DURATION_MILLIS = 5000L;
private static final long EPSILON_MILLIS = 25L;
private static final long LEASE_TAKER_INTERVAL_MILLIS = -1L;
private static final int MAX_LEASES_FOR_WORKER = Integer.MAX_VALUE;
private static final int MAX_LEASES_TO_STEAL_AT_ONE_TIME = 1;
private static final int MAX_LEASE_RENEWER_THREAD_COUNT = 20;
@ -39,8 +40,8 @@ public class DynamoDBLeaseCoordinatorTest {
@Before
public void setup() {
this.leaseCoordinator = new DynamoDBLeaseCoordinator(leaseRefresher, WORKER_ID, LEASE_DURATION_MILLIS,
EPSILON_MILLIS, MAX_LEASES_FOR_WORKER, MAX_LEASES_TO_STEAL_AT_ONE_TIME, MAX_LEASE_RENEWER_THREAD_COUNT,
this.leaseCoordinator = new DynamoDBLeaseCoordinator(leaseRefresher, WORKER_ID, LEASE_DURATION_MILLIS, EPSILON_MILLIS,
LEASE_TAKER_INTERVAL_MILLIS, MAX_LEASES_FOR_WORKER, MAX_LEASES_TO_STEAL_AT_ONE_TIME, MAX_LEASE_RENEWER_THREAD_COUNT,
INITIAL_LEASE_TABLE_READ_CAPACITY, INITIAL_LEASE_TABLE_WRITE_CAPACITY, metricsFactory);
}

View file

@ -88,6 +88,21 @@ public class DynamoDBLeaseTakerIntegrationTest extends LeaseIntegrationTest {
builder.takeMutateAssert(taker, "1");
}
@Test
public void testTakeEvictedLease() throws LeasingException, InterruptedException {
TestHarnessBuilder builder = new TestHarnessBuilder(leaseRefresher);
builder.withLease("1", "bar").build();
// This should not take anything because the lease is new and owned.
builder.takeMutateAssert(taker);
builder.evictLease("1");
// This should take because the lease has been evicted
builder.takeMutateAssert(taker, "1");
}
/**
* Verify that we take leases non-greedily by setting up an environment where there are 4 leases and 2 workers,
* only one of which holds a lease. This leaves 3 free leases, but LeaseTaker should decide it needs 2 leases and

View file

@ -30,6 +30,7 @@ import software.amazon.kinesis.leases.LeaseRenewer;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.leases.exceptions.InvalidStateException;
import software.amazon.kinesis.leases.exceptions.LeasingException;
import software.amazon.kinesis.leases.exceptions.ProvisionedThroughputException;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
public class TestHarnessBuilder {
@ -96,6 +97,11 @@ public class TestHarnessBuilder {
currentTimeNanos += millis * 1000000;
}
public void evictLease(String shardId) throws DependencyException, InvalidStateException, ProvisionedThroughputException {
Lease lease = leases.get(shardId);
leaseRefresher.evictLease(lease);
}
public Map<String, Lease> takeMutateAssert(DynamoDBLeaseTaker taker, int numToTake)
throws LeasingException {
Map<String, Lease> result = taker.takeLeases(timeProvider);

View file

@ -118,6 +118,7 @@ public class ConsumerStatesTest {
private long idleTimeInMillis = 1000L;
private Optional<Long> logWarningForTaskAfterMillis = Optional.empty();
private SchemaRegistryDecoder schemaRegistryDecoder = null;
private boolean evictLeaseOnShutdown = false;
@Before
public void setup() {
@ -126,7 +127,7 @@ public class ConsumerStatesTest {
taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, listShardsBackoffTimeInMillis,
maxListShardsRetryAttempts, shouldCallProcessRecordsEvenForEmptyRecordList, idleTimeInMillis,
INITIAL_POSITION_IN_STREAM, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, shardDetector,
new AggregatorUtil(), hierarchicalShardSyncer, metricsFactory, leaseCleanupManager, schemaRegistryDecoder);
new AggregatorUtil(), hierarchicalShardSyncer, metricsFactory, leaseCleanupManager, schemaRegistryDecoder, evictLeaseOnShutdown);
when(shardInfo.shardId()).thenReturn("shardId-000000000000");
when(shardInfo.streamIdentifierSerOpt()).thenReturn(Optional.of(StreamIdentifier.singleStreamInstance(STREAM_NAME).serialize()));
consumer = spy(new ShardConsumer(recordsPublisher, executorService, shardInfo, logWarningForTaskAfterMillis,

View file

@ -53,6 +53,10 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
@ -66,15 +70,15 @@ import org.mockito.runners.MockitoJUnitRunner;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import lombok.extern.slf4j.Slf4j;
import software.amazon.kinesis.common.InitialPositionInStreamExtended;
import software.amazon.kinesis.common.RequestDetails;
import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.lifecycle.ConsumerStates.ShardConsumerState;
import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
import software.amazon.kinesis.lifecycle.events.TaskExecutionListenerInput;
import software.amazon.kinesis.lifecycle.ConsumerStates.ShardConsumerState;
import software.amazon.kinesis.retrieval.RecordsDeliveryAck;
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RecordsRetrieved;
@ -136,6 +140,12 @@ public class ShardConsumerTest {
private ConsumerState shutdownRequestedAwaitState;
@Mock
private TaskExecutionListener taskExecutionListener;
@Mock
private LeaseCoordinator leaseCoordinator;
@Mock
private LeaseRefresher leaseRefresher;
@Mock
private Lease lease;
private ProcessRecordsInput processRecordsInput;
@ -869,6 +879,186 @@ public class ShardConsumerTest {
verifyNoMoreInteractions(taskExecutionListener);
}
@Test
public void testLeaseEvictedOnShutdownWhenEnabled() throws Exception {
CyclicBarrier taskBarrier = new CyclicBarrier(2);
TestPublisher cache = new TestPublisher();
ShardConsumer consumer = new ShardConsumer(cache, executorService, shardInfo, logWarningForTaskAfterMillis,
shardConsumerArgument, initialState, t -> t, 1, taskExecutionListener, 0);
mockSuccessfulInitialize(null);
mockSuccessfulProcessing(taskBarrier);
when(shardConsumerArgument.evictLeaseOnShutdown()).thenReturn(true);
when(shardConsumerArgument.leaseCoordinator()).thenReturn(leaseCoordinator);
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
when(leaseCoordinator.getCurrentlyHeldLease(ShardInfo.getLeaseKey(shardInfo))).thenReturn(lease);
when(processingState.shutdownTransition(eq(ShutdownReason.REQUESTED))).thenReturn(shutdownRequestedState);
when(shutdownRequestedState.requiresDataAvailability()).thenReturn(false);
when(shutdownRequestedState.createTask(any(), any(), any())).thenReturn(shutdownRequestedTask);
when(shutdownRequestedState.taskType()).thenReturn(TaskType.SHUTDOWN_NOTIFICATION);
when(shutdownRequestedTask.call()).thenReturn(new TaskResult(null));
when(shutdownRequestedState.shutdownTransition(eq(ShutdownReason.REQUESTED)))
.thenReturn(shutdownRequestedAwaitState);
when(shutdownRequestedState.shutdownTransition(eq(ShutdownReason.LEASE_LOST))).thenReturn(shutdownState);
when(shutdownRequestedAwaitState.requiresDataAvailability()).thenReturn(false);
when(shutdownRequestedAwaitState.createTask(any(), any(), any())).thenReturn(null);
when(shutdownRequestedAwaitState.shutdownTransition(eq(ShutdownReason.REQUESTED)))
.thenReturn(shutdownRequestedState);
when(shutdownRequestedAwaitState.shutdownTransition(eq(ShutdownReason.LEASE_LOST))).thenReturn(shutdownState);
when(shutdownRequestedAwaitState.taskType()).thenReturn(TaskType.SHUTDOWN_COMPLETE);
mockSuccessfulShutdown(null);
boolean init = consumer.initializeComplete().get();
while (!init) {
init = consumer.initializeComplete().get();
}
consumer.subscribe();
cache.awaitInitialSetup();
cache.publish();
awaitAndResetBarrier(taskBarrier);
cache.awaitRequest();
cache.publish();
awaitAndResetBarrier(taskBarrier);
cache.awaitRequest();
consumer.gracefulShutdown(shutdownNotification);
boolean shutdownComplete = consumer.shutdownComplete().get();
assertThat(shutdownComplete, equalTo(false));
shutdownComplete = consumer.shutdownComplete().get();
assertThat(shutdownComplete, equalTo(false));
consumer.leaseLost();
shutdownComplete = consumer.shutdownComplete().get();
assertThat(shutdownComplete, equalTo(false));
shutdownComplete = consumer.shutdownComplete().get();
assertThat(shutdownComplete, equalTo(true));
verify(processingState, times(2)).createTask(any(), any(), any());
verify(shutdownRequestedState, never()).shutdownTransition(eq(ShutdownReason.LEASE_LOST));
verify(shutdownRequestedState).createTask(any(), any(), any());
verify(shutdownRequestedState).shutdownTransition(eq(ShutdownReason.REQUESTED));
verify(shutdownRequestedAwaitState).createTask(any(), any(), any());
verify(shutdownRequestedAwaitState).shutdownTransition(eq(ShutdownReason.LEASE_LOST));
verify(taskExecutionListener, times(1)).beforeTaskExecution(initialTaskInput);
verify(taskExecutionListener, times(2)).beforeTaskExecution(processTaskInput);
verify(taskExecutionListener, times(1)).beforeTaskExecution(shutdownRequestedTaskInput);
verify(taskExecutionListener, times(1)).beforeTaskExecution(shutdownRequestedAwaitTaskInput);
verify(taskExecutionListener, times(1)).beforeTaskExecution(shutdownTaskInput);
initialTaskInput = initialTaskInput.toBuilder().taskOutcome(TaskOutcome.SUCCESSFUL).build();
processTaskInput = processTaskInput.toBuilder().taskOutcome(TaskOutcome.SUCCESSFUL).build();
shutdownRequestedTaskInput = shutdownRequestedTaskInput.toBuilder().taskOutcome(TaskOutcome.SUCCESSFUL).build();
shutdownTaskInput = shutdownTaskInput.toBuilder().taskOutcome(TaskOutcome.SUCCESSFUL).build();
// No task is created/run for this shutdownRequestedAwaitState, so there's no task outcome.
verify(taskExecutionListener, times(1)).afterTaskExecution(initialTaskInput);
verify(taskExecutionListener, times(2)).afterTaskExecution(processTaskInput);
verify(taskExecutionListener, times(1)).afterTaskExecution(shutdownRequestedTaskInput);
verify(taskExecutionListener, times(1)).afterTaskExecution(shutdownRequestedAwaitTaskInput);
verify(taskExecutionListener, times(1)).afterTaskExecution(shutdownTaskInput);
verifyNoMoreInteractions(taskExecutionListener);
verify(leaseRefresher).evictLease(lease);
}
@Test
public void testLeaseNotEvictedOnShutdownByDefault() throws Exception {
CyclicBarrier taskBarrier = new CyclicBarrier(2);
TestPublisher cache = new TestPublisher();
ShardConsumer consumer = new ShardConsumer(cache, executorService, shardInfo, logWarningForTaskAfterMillis,
shardConsumerArgument, initialState, t -> t, 1, taskExecutionListener, 0);
mockSuccessfulInitialize(null);
mockSuccessfulProcessing(taskBarrier);
when(shardConsumerArgument.evictLeaseOnShutdown()).thenReturn(false);
when(processingState.shutdownTransition(eq(ShutdownReason.REQUESTED))).thenReturn(shutdownRequestedState);
when(shutdownRequestedState.requiresDataAvailability()).thenReturn(false);
when(shutdownRequestedState.createTask(any(), any(), any())).thenReturn(shutdownRequestedTask);
when(shutdownRequestedState.taskType()).thenReturn(TaskType.SHUTDOWN_NOTIFICATION);
when(shutdownRequestedTask.call()).thenReturn(new TaskResult(null));
when(shutdownRequestedState.shutdownTransition(eq(ShutdownReason.REQUESTED)))
.thenReturn(shutdownRequestedAwaitState);
when(shutdownRequestedState.shutdownTransition(eq(ShutdownReason.LEASE_LOST))).thenReturn(shutdownState);
when(shutdownRequestedAwaitState.requiresDataAvailability()).thenReturn(false);
when(shutdownRequestedAwaitState.createTask(any(), any(), any())).thenReturn(null);
when(shutdownRequestedAwaitState.shutdownTransition(eq(ShutdownReason.REQUESTED)))
.thenReturn(shutdownRequestedState);
when(shutdownRequestedAwaitState.shutdownTransition(eq(ShutdownReason.LEASE_LOST))).thenReturn(shutdownState);
when(shutdownRequestedAwaitState.taskType()).thenReturn(TaskType.SHUTDOWN_COMPLETE);
mockSuccessfulShutdown(null);
boolean init = consumer.initializeComplete().get();
while (!init) {
init = consumer.initializeComplete().get();
}
consumer.subscribe();
cache.awaitInitialSetup();
cache.publish();
awaitAndResetBarrier(taskBarrier);
cache.awaitRequest();
cache.publish();
awaitAndResetBarrier(taskBarrier);
cache.awaitRequest();
consumer.gracefulShutdown(shutdownNotification);
boolean shutdownComplete = consumer.shutdownComplete().get();
assertThat(shutdownComplete, equalTo(false));
shutdownComplete = consumer.shutdownComplete().get();
assertThat(shutdownComplete, equalTo(false));
consumer.leaseLost();
shutdownComplete = consumer.shutdownComplete().get();
assertThat(shutdownComplete, equalTo(false));
shutdownComplete = consumer.shutdownComplete().get();
assertThat(shutdownComplete, equalTo(true));
verify(processingState, times(2)).createTask(any(), any(), any());
verify(shutdownRequestedState, never()).shutdownTransition(eq(ShutdownReason.LEASE_LOST));
verify(shutdownRequestedState).createTask(any(), any(), any());
verify(shutdownRequestedState).shutdownTransition(eq(ShutdownReason.REQUESTED));
verify(shutdownRequestedAwaitState).createTask(any(), any(), any());
verify(shutdownRequestedAwaitState).shutdownTransition(eq(ShutdownReason.LEASE_LOST));
verify(taskExecutionListener, times(1)).beforeTaskExecution(initialTaskInput);
verify(taskExecutionListener, times(2)).beforeTaskExecution(processTaskInput);
verify(taskExecutionListener, times(1)).beforeTaskExecution(shutdownRequestedTaskInput);
verify(taskExecutionListener, times(1)).beforeTaskExecution(shutdownRequestedAwaitTaskInput);
verify(taskExecutionListener, times(1)).beforeTaskExecution(shutdownTaskInput);
initialTaskInput = initialTaskInput.toBuilder().taskOutcome(TaskOutcome.SUCCESSFUL).build();
processTaskInput = processTaskInput.toBuilder().taskOutcome(TaskOutcome.SUCCESSFUL).build();
shutdownRequestedTaskInput = shutdownRequestedTaskInput.toBuilder().taskOutcome(TaskOutcome.SUCCESSFUL).build();
shutdownTaskInput = shutdownTaskInput.toBuilder().taskOutcome(TaskOutcome.SUCCESSFUL).build();
// No task is created/run for this shutdownRequestedAwaitState, so there's no task outcome.
verify(taskExecutionListener, times(1)).afterTaskExecution(initialTaskInput);
verify(taskExecutionListener, times(2)).afterTaskExecution(processTaskInput);
verify(taskExecutionListener, times(1)).afterTaskExecution(shutdownRequestedTaskInput);
verify(taskExecutionListener, times(1)).afterTaskExecution(shutdownRequestedAwaitTaskInput);
verify(taskExecutionListener, times(1)).afterTaskExecution(shutdownTaskInput);
verifyNoMoreInteractions(taskExecutionListener);
verifyZeroInteractions(leaseCoordinator);
verifyZeroInteractions(leaseRefresher);
}
private void mockSuccessfulShutdown(CyclicBarrier taskCallBarrier) {
mockSuccessfulShutdown(taskCallBarrier, null);
}