Added metrics in ShutdownTask for scenarios when parent leases are missing. (#1080)

+ optimizations in `ShutdownTask` (e.g., `Random` static instance,
eliminated over-used Function)
+ DRY+KISS on `ShutdownTaskTest`
+ deleted some dead code
This commit is contained in:
stair 2023-03-21 19:52:17 -04:00 committed by GitHub
parent 177303d557
commit 6be92dc4ef
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 244 additions and 287 deletions

View file

@ -93,24 +93,11 @@ class ConsumerStates {
} }
} }
/** /**
* The initial state that any {@link ShardConsumer} should start in. * The initial state that any {@link ShardConsumer} should start in.
*/ */
static final ConsumerState INITIAL_STATE = ShardConsumerState.WAITING_ON_PARENT_SHARDS.consumerState(); static final ConsumerState INITIAL_STATE = ShardConsumerState.WAITING_ON_PARENT_SHARDS.consumerState();
private static ConsumerState shutdownStateFor(ShutdownReason reason) {
switch (reason) {
case REQUESTED:
return ShardConsumerState.SHUTDOWN_REQUESTED.consumerState();
case SHARD_END:
case LEASE_LOST:
return ShardConsumerState.SHUTTING_DOWN.consumerState();
default:
throw new IllegalArgumentException("Unknown reason: " + reason);
}
}
/** /**
* This is the initial state of a shard consumer. This causes the consumer to remain blocked until the all parent * This is the initial state of a shard consumer. This causes the consumer to remain blocked until the all parent
* shards have been completed. * shards have been completed.

View file

@ -243,28 +243,6 @@ public class ProcessTask implements ConsumerTask {
return (!records.isEmpty()) || shouldCallProcessRecordsEvenForEmptyRecordList; return (!records.isEmpty()) || shouldCallProcessRecordsEvenForEmptyRecordList;
} }
/**
* Emits metrics, and sleeps if there are no records available
*
* @param startTimeMillis
* the time when the task started
*/
private void handleNoRecords(long startTimeMillis) {
log.debug("Kinesis didn't return any records for shard {}", shardInfoId);
long sleepTimeMillis = idleTimeInMilliseconds - (System.currentTimeMillis() - startTimeMillis);
if (sleepTimeMillis > 0) {
sleepTimeMillis = Math.max(sleepTimeMillis, idleTimeInMilliseconds);
try {
log.debug("Sleeping for {} ms since there were no new records in shard {}", sleepTimeMillis,
shardInfoId);
Thread.sleep(sleepTimeMillis);
} catch (InterruptedException e) {
log.debug("ShardId {}: Sleep was interrupted", shardInfoId);
}
}
}
@Override @Override
public TaskType taskType() { public TaskType taskType() {
return taskType; return taskType;

View file

@ -59,7 +59,13 @@ public class ShardConsumer {
private final ShardConsumerArgument shardConsumerArgument; private final ShardConsumerArgument shardConsumerArgument;
@NonNull @NonNull
private final Optional<Long> logWarningForTaskAfterMillis; private final Optional<Long> logWarningForTaskAfterMillis;
/**
* @deprecated unused; to be removed in a "major" version bump
*/
@Deprecated
private final Function<ConsumerTask, ConsumerTask> taskMetricsDecorator; private final Function<ConsumerTask, ConsumerTask> taskMetricsDecorator;
private final int bufferSize; private final int bufferSize;
private final TaskExecutionListener taskExecutionListener; private final TaskExecutionListener taskExecutionListener;
private final String streamIdentifier; private final String streamIdentifier;
@ -179,7 +185,6 @@ public class ShardConsumer {
} }
stateChangeFuture = initializeComplete(); stateChangeFuture = initializeComplete();
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
// //
// Ignored should be handled by scheduler // Ignored should be handled by scheduler
@ -199,7 +204,6 @@ public class ShardConsumer {
throw (Error) t; throw (Error) t;
} }
} }
} }
@VisibleForTesting @VisibleForTesting

View file

@ -34,6 +34,7 @@ import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.Lease;
import software.amazon.kinesis.leases.LeaseCleanupManager; import software.amazon.kinesis.leases.LeaseCleanupManager;
import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseCoordinator;
import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.UpdateField; import software.amazon.kinesis.leases.UpdateField;
@ -54,7 +55,6 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
/** /**
@ -66,6 +66,14 @@ import java.util.stream.Collectors;
public class ShutdownTask implements ConsumerTask { public class ShutdownTask implements ConsumerTask {
private static final String SHUTDOWN_TASK_OPERATION = "ShutdownTask"; private static final String SHUTDOWN_TASK_OPERATION = "ShutdownTask";
private static final String RECORD_PROCESSOR_SHUTDOWN_METRIC = "RecordProcessor.shutdown"; private static final String RECORD_PROCESSOR_SHUTDOWN_METRIC = "RecordProcessor.shutdown";
/**
* Reusable, immutable {@link LeaseLostInput}.
*/
private static final LeaseLostInput LEASE_LOST_INPUT = LeaseLostInput.builder().build();
private static final Random RANDOM = new Random();
@VisibleForTesting @VisibleForTesting
static final int RETRY_RANDOM_MAX_RANGE = 30; static final int RETRY_RANDOM_MAX_RANGE = 30;
@ -101,8 +109,6 @@ public class ShutdownTask implements ConsumerTask {
@NonNull @NonNull
private final LeaseCleanupManager leaseCleanupManager; private final LeaseCleanupManager leaseCleanupManager;
private static final Function<ShardInfo, String> leaseKeyProvider = shardInfo -> ShardInfo.getLeaseKey(shardInfo);
/* /*
* Invokes ShardRecordProcessor shutdown() API. * Invokes ShardRecordProcessor shutdown() API.
* (non-Javadoc) * (non-Javadoc)
@ -114,61 +120,61 @@ public class ShutdownTask implements ConsumerTask {
recordProcessorCheckpointer.checkpointer().operation(SHUTDOWN_TASK_OPERATION); recordProcessorCheckpointer.checkpointer().operation(SHUTDOWN_TASK_OPERATION);
final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, SHUTDOWN_TASK_OPERATION); final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, SHUTDOWN_TASK_OPERATION);
Exception exception; final String leaseKey = ShardInfo.getLeaseKey(shardInfo);
try { try {
try { try {
log.debug("Invoking shutdown() for shard {} with childShards {}, concurrencyToken {}. Shutdown reason: {}", log.debug("Invoking shutdown() for shard {} with childShards {}, concurrencyToken {}. Shutdown reason: {}",
leaseKeyProvider.apply(shardInfo), childShards, shardInfo.concurrencyToken(), reason); leaseKey, childShards, shardInfo.concurrencyToken(), reason);
final long startTime = System.currentTimeMillis(); final long startTime = System.currentTimeMillis();
final Lease currentShardLease = leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(shardInfo)); final Lease currentShardLease = leaseCoordinator.getCurrentlyHeldLease(leaseKey);
final Runnable leaseLostAction = () -> shardRecordProcessor.leaseLost(LeaseLostInput.builder().build()); final Runnable leaseLostAction = () -> shardRecordProcessor.leaseLost(LEASE_LOST_INPUT);
if (reason == ShutdownReason.SHARD_END) { if (reason == ShutdownReason.SHARD_END) {
try { try {
takeShardEndAction(currentShardLease, scope, startTime); takeShardEndAction(currentShardLease, leaseKey, scope, startTime);
} catch (InvalidStateException e) { } catch (InvalidStateException e) {
// If InvalidStateException happens, it indicates we have a non recoverable error in short term. // If InvalidStateException happens, it indicates we have a non recoverable error in short term.
// In this scenario, we should shutdown the shardConsumer with LEASE_LOST reason to allow other worker to take the lease and retry shutting down. // In this scenario, we should shutdown the shardConsumer with LEASE_LOST reason to allow
// other worker to take the lease and retry shutting down.
log.warn("Lease {}: Invalid state encountered while shutting down shardConsumer with SHARD_END reason. " + log.warn("Lease {}: Invalid state encountered while shutting down shardConsumer with SHARD_END reason. " +
"Dropping the lease and shutting down shardConsumer using LEASE_LOST reason. ", leaseKeyProvider.apply(shardInfo), e); "Dropping the lease and shutting down shardConsumer using LEASE_LOST reason.",
dropLease(currentShardLease); leaseKey, e);
throwOnApplicationException(leaseLostAction, scope, startTime); dropLease(currentShardLease, leaseKey);
throwOnApplicationException(leaseKey, leaseLostAction, scope, startTime);
} }
} else { } else {
throwOnApplicationException(leaseLostAction, scope, startTime); throwOnApplicationException(leaseKey, leaseLostAction, scope, startTime);
} }
log.debug("Shutting down retrieval strategy for shard {}.", leaseKeyProvider.apply(shardInfo)); log.debug("Shutting down retrieval strategy for shard {}.", leaseKey);
recordsPublisher.shutdown(); recordsPublisher.shutdown();
log.debug("Record processor completed shutdown() for shard {}", leaseKeyProvider.apply(shardInfo)); log.debug("Record processor completed shutdown() for shard {}", leaseKey);
return new TaskResult(null); return new TaskResult(null);
} catch (Exception e) { } catch (Exception e) {
if (e instanceof CustomerApplicationException) { if (e instanceof CustomerApplicationException) {
log.error("Shard {}: Application exception. ", leaseKeyProvider.apply(shardInfo), e); log.error("Shard {}: Application exception.", leaseKey, e);
} else { } else {
log.error("Shard {}: Caught exception: ", leaseKeyProvider.apply(shardInfo), e); log.error("Shard {}: Caught exception:", leaseKey, e);
} }
exception = e;
// backoff if we encounter an exception. // backoff if we encounter an exception.
try { try {
Thread.sleep(this.backoffTimeMillis); Thread.sleep(this.backoffTimeMillis);
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
log.debug("Shard {}: Interrupted sleep", leaseKeyProvider.apply(shardInfo), ie); log.debug("Shard {}: Interrupted sleep", leaseKey, ie);
} }
return new TaskResult(e);
} }
} finally { } finally {
MetricsUtil.endScope(scope); MetricsUtil.endScope(scope);
} }
return new TaskResult(exception);
} }
// Involves persisting child shard info, attempt to checkpoint and enqueueing lease for cleanup. // Involves persisting child shard info, attempt to checkpoint and enqueueing lease for cleanup.
private void takeShardEndAction(Lease currentShardLease, private void takeShardEndAction(Lease currentShardLease,
MetricsScope scope, long startTime) final String leaseKey, MetricsScope scope, long startTime)
throws DependencyException, ProvisionedThroughputException, InvalidStateException, throws DependencyException, ProvisionedThroughputException, InvalidStateException,
CustomerApplicationException { CustomerApplicationException {
// Create new lease for the child shards if they don't exist. // Create new lease for the child shards if they don't exist.
@ -177,7 +183,7 @@ public class ShutdownTask implements ConsumerTask {
// In this case, KinesisDataFetcher and FanOutRecordsPublisher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards. // In this case, KinesisDataFetcher and FanOutRecordsPublisher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards.
// This scenario could happen when customer deletes the stream while leaving the KCL application running. // This scenario could happen when customer deletes the stream while leaving the KCL application running.
if (currentShardLease == null) { if (currentShardLease == null) {
throw new InvalidStateException(leaseKeyProvider.apply(shardInfo) throw new InvalidStateException(leaseKey
+ " : Lease not owned by the current worker. Leaving ShardEnd handling to new owner."); + " : Lease not owned by the current worker. Leaving ShardEnd handling to new owner.");
} }
if (!CollectionUtils.isNullOrEmpty(childShards)) { if (!CollectionUtils.isNullOrEmpty(childShards)) {
@ -189,7 +195,7 @@ public class ShutdownTask implements ConsumerTask {
if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) { if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) {
boolean isSuccess = false; boolean isSuccess = false;
try { try {
isSuccess = attemptShardEndCheckpointing(scope, startTime); isSuccess = attemptShardEndCheckpointing(leaseKey, scope, startTime);
} finally { } finally {
// Check if either the shard end ddb persist is successful or // Check if either the shard end ddb persist is successful or
// if childshards is empty. When child shards is empty then either it is due to // if childshards is empty. When child shards is empty then either it is due to
@ -202,38 +208,41 @@ public class ShutdownTask implements ConsumerTask {
} }
} }
private boolean attemptShardEndCheckpointing(MetricsScope scope, long startTime) private boolean attemptShardEndCheckpointing(final String leaseKey, MetricsScope scope, long startTime)
throws DependencyException, ProvisionedThroughputException, InvalidStateException, throws DependencyException, ProvisionedThroughputException, InvalidStateException,
CustomerApplicationException { CustomerApplicationException {
final Lease leaseFromDdb = Optional.ofNullable(leaseCoordinator.leaseRefresher().getLease(leaseKeyProvider.apply(shardInfo))) final Lease leaseFromDdb = Optional.ofNullable(leaseCoordinator.leaseRefresher().getLease(leaseKey))
.orElseThrow(() -> new InvalidStateException("Lease for shard " + leaseKeyProvider.apply(shardInfo) + " does not exist.")); .orElseThrow(() -> new InvalidStateException("Lease for shard " + leaseKey + " does not exist."));
if (!leaseFromDdb.checkpoint().equals(ExtendedSequenceNumber.SHARD_END)) { if (!leaseFromDdb.checkpoint().equals(ExtendedSequenceNumber.SHARD_END)) {
// Call the shardRecordsProcessor to checkpoint with SHARD_END sequence number. // Call the shardRecordsProcessor to checkpoint with SHARD_END sequence number.
// The shardEnded is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling shardEnded. // The shardEnded is implemented by customer. We should validate if the SHARD_END checkpointing is
throwOnApplicationException(() -> applicationCheckpointAndVerification(), scope, startTime); // successful after calling shardEnded.
throwOnApplicationException(leaseKey, () -> applicationCheckpointAndVerification(leaseKey),
scope, startTime);
} }
return true; return true;
} }
private void applicationCheckpointAndVerification() { private void applicationCheckpointAndVerification(final String leaseKey) {
recordProcessorCheckpointer recordProcessorCheckpointer
.sequenceNumberAtShardEnd(recordProcessorCheckpointer.largestPermittedCheckpointValue()); .sequenceNumberAtShardEnd(recordProcessorCheckpointer.largestPermittedCheckpointValue());
recordProcessorCheckpointer.largestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END); recordProcessorCheckpointer.largestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END);
shardRecordProcessor.shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); shardRecordProcessor.shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
final ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.lastCheckpointValue(); final ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.lastCheckpointValue();
if (lastCheckpointValue == null if (!ExtendedSequenceNumber.SHARD_END.equals(lastCheckpointValue)) {
|| !lastCheckpointValue.equals(ExtendedSequenceNumber.SHARD_END)) {
throw new IllegalArgumentException("Application didn't checkpoint at end of shard " throw new IllegalArgumentException("Application didn't checkpoint at end of shard "
+ leaseKeyProvider.apply(shardInfo) + ". Application must checkpoint upon shard end. " + + leaseKey + ". Application must checkpoint upon shard end. " +
"See ShardRecordProcessor.shardEnded javadocs for more information."); "See ShardRecordProcessor.shardEnded javadocs for more information.");
} }
} }
private void throwOnApplicationException(Runnable action, MetricsScope metricsScope, final long startTime) throws CustomerApplicationException { private void throwOnApplicationException(final String leaseKey, Runnable action, MetricsScope metricsScope,
final long startTime)
throws CustomerApplicationException {
try { try {
action.run(); action.run();
} catch (Exception e) { } catch (Exception e) {
throw new CustomerApplicationException("Customer application throws exception for shard " + leaseKeyProvider.apply(shardInfo) +": ", e); throw new CustomerApplicationException("Customer application throws exception for shard " + leaseKey + ": ", e);
} finally { } finally {
MetricsUtil.addLatency(metricsScope, RECORD_PROCESSOR_SHUTDOWN_METRIC, startTime, MetricsLevel.SUMMARY); MetricsUtil.addLatency(metricsScope, RECORD_PROCESSOR_SHUTDOWN_METRIC, startTime, MetricsLevel.SUMMARY);
} }
@ -241,41 +250,48 @@ public class ShutdownTask implements ConsumerTask {
private void createLeasesForChildShardsIfNotExist(MetricsScope scope) private void createLeasesForChildShardsIfNotExist(MetricsScope scope)
throws DependencyException, InvalidStateException, ProvisionedThroughputException { throws DependencyException, InvalidStateException, ProvisionedThroughputException {
final LeaseRefresher leaseRefresher = leaseCoordinator.leaseRefresher();
// For child shard resulted from merge of two parent shards, verify if both the parents are either present or // For child shard resulted from merge of two parent shards, verify if both the parents are either present or
// not present in the lease table before creating the lease entry. // not present in the lease table before creating the lease entry.
if (!CollectionUtils.isNullOrEmpty(childShards) && childShards.size() == 1) { if (childShards.size() == 1) {
final ChildShard childShard = childShards.get(0); final ChildShard childShard = childShards.get(0);
final List<String> parentLeaseKeys = childShard.parentShards().stream() final List<String> parentLeaseKeys = childShard.parentShards().stream()
.map(parentShardId -> ShardInfo.getLeaseKey(shardInfo, parentShardId)).collect(Collectors.toList()); .map(parentShardId -> ShardInfo.getLeaseKey(shardInfo, parentShardId)).collect(Collectors.toList());
if (parentLeaseKeys.size() != 2) { if (parentLeaseKeys.size() != 2) {
MetricsUtil.addCount(scope, "MissingMergeParent", 1, MetricsLevel.SUMMARY);
throw new InvalidStateException("Shard " + shardInfo.shardId() + "'s only child shard " + childShard throw new InvalidStateException("Shard " + shardInfo.shardId() + "'s only child shard " + childShard
+ " does not contain other parent information."); + " does not contain other parent information.");
} else { }
boolean isValidLeaseTableState =
Objects.isNull(leaseCoordinator.leaseRefresher().getLease(parentLeaseKeys.get(0))) == Objects final Lease parentLease0 = leaseRefresher.getLease(parentLeaseKeys.get(0));
.isNull(leaseCoordinator.leaseRefresher().getLease(parentLeaseKeys.get(1))); final Lease parentLease1 = leaseRefresher.getLease(parentLeaseKeys.get(1));
if (!isValidLeaseTableState) { if (Objects.isNull(parentLease0) != Objects.isNull(parentLease1)) {
if (!isOneInNProbability(RETRY_RANDOM_MAX_RANGE)) { MetricsUtil.addCount(scope, "MissingMergeParentLease", 1, MetricsLevel.SUMMARY);
throw new BlockedOnParentShardException( final String message = "Shard " + shardInfo.shardId() + "'s only child shard " + childShard +
"Shard " + shardInfo.shardId() + "'s only child shard " + childShard " has partial parent information in lease table: [parent0=" + parentLease0 +
+ " has partial parent information in lease table. Hence deferring lease creation of child shard."); ", parent1=" + parentLease1 + "]. Hence deferring lease creation of child shard.";
} else { if (isOneInNProbability(RETRY_RANDOM_MAX_RANGE)) {
throw new InvalidStateException( // abort further attempts and drop the lease; lease will
"Shard " + shardInfo.shardId() + "'s only child shard " + childShard // be reassigned
+ " has partial parent information in lease table. Hence deferring lease creation of child shard."); throw new InvalidStateException(message);
} } else {
// initiate a Thread.sleep(...) and keep the lease;
// keeping the lease decreases churn of lease reassignments
throw new BlockedOnParentShardException(message);
} }
} }
} }
for(ChildShard childShard : childShards) { for(ChildShard childShard : childShards) {
final String leaseKey = ShardInfo.getLeaseKey(shardInfo, childShard.shardId()); final String leaseKey = ShardInfo.getLeaseKey(shardInfo, childShard.shardId());
if(leaseCoordinator.leaseRefresher().getLease(leaseKey) == null) { if (leaseRefresher.getLease(leaseKey) == null) {
log.debug("{} - Shard {} - Attempting to create lease for child shard {}", shardDetector.streamIdentifier(), shardInfo.shardId(), leaseKey); log.debug("{} - Shard {} - Attempting to create lease for child shard {}", shardDetector.streamIdentifier(), shardInfo.shardId(), leaseKey);
final Lease leaseToCreate = hierarchicalShardSyncer.createLeaseForChildShard(childShard, shardDetector.streamIdentifier()); final Lease leaseToCreate = hierarchicalShardSyncer.createLeaseForChildShard(childShard, shardDetector.streamIdentifier());
final long startTime = System.currentTimeMillis(); final long startTime = System.currentTimeMillis();
boolean success = false; boolean success = false;
try { try {
leaseCoordinator.leaseRefresher().createLeaseIfNotExists(leaseToCreate); leaseRefresher.createLeaseIfNotExists(leaseToCreate);
success = true; success = true;
} finally { } finally {
MetricsUtil.addSuccessAndLatency(scope, "CreateLease", success, startTime, MetricsLevel.DETAILED); MetricsUtil.addSuccessAndLatency(scope, "CreateLease", success, startTime, MetricsLevel.DETAILED);
@ -295,8 +311,7 @@ public class ShutdownTask implements ConsumerTask {
*/ */
@VisibleForTesting @VisibleForTesting
boolean isOneInNProbability(int n) { boolean isOneInNProbability(int n) {
Random r = new Random(); return 0 == RANDOM.nextInt(n);
return 1 == r.nextInt((n - 1) + 1) + 1;
} }
private void updateLeaseWithChildShards(Lease currentLease) private void updateLeaseWithChildShards(Lease currentLease)
@ -324,10 +339,9 @@ public class ShutdownTask implements ConsumerTask {
return reason; return reason;
} }
private void dropLease(Lease currentLease) { private void dropLease(Lease currentLease, final String leaseKey) {
if (currentLease == null) { if (currentLease == null) {
log.warn("Shard {}: Unable to find the lease for shard. Will shutdown the shardConsumer directly.", leaseKeyProvider.apply(shardInfo)); log.warn("Shard {}: Unable to find the lease for shard. Will shutdown the shardConsumer directly.", leaseKey);
return;
} else { } else {
leaseCoordinator.dropLease(currentLease); leaseCoordinator.dropLease(currentLease);
log.info("Dropped lease for shutting down ShardConsumer: " + currentLease.leaseKey()); log.info("Dropped lease for shutting down ShardConsumer: " + currentLease.leaseKey());

View file

@ -18,24 +18,29 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any; import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy; import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import static software.amazon.kinesis.lifecycle.ShutdownTask.RETRY_RANDOM_MAX_RANGE; import static software.amazon.kinesis.lifecycle.ShutdownReason.LEASE_LOST;
import static software.amazon.kinesis.lifecycle.ShutdownReason.SHARD_END;
import java.util.ArrayList; import java.util.Arrays;
import java.util.Collection;
import java.util.Collections; import java.util.Collections;
import java.util.HashSet;
import java.util.List; import java.util.List;
import java.util.UUID; import java.util.Set;
import com.google.common.collect.ImmutableList;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.Matchers; import org.mockito.Matchers;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner; import org.mockito.runners.MockitoJUnitRunner;
@ -78,17 +83,18 @@ public class ShutdownTaskTest {
private static final long TASK_BACKOFF_TIME_MILLIS = 1L; private static final long TASK_BACKOFF_TIME_MILLIS = 1L;
private static final InitialPositionInStreamExtended INITIAL_POSITION_TRIM_HORIZON = private static final InitialPositionInStreamExtended INITIAL_POSITION_TRIM_HORIZON =
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON); InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON);
private static final ShutdownReason SHARD_END_SHUTDOWN_REASON = ShutdownReason.SHARD_END;
private static final ShutdownReason LEASE_LOST_SHUTDOWN_REASON = ShutdownReason.LEASE_LOST;
private static final MetricsFactory NULL_METRICS_FACTORY = new NullMetricsFactory(); private static final MetricsFactory NULL_METRICS_FACTORY = new NullMetricsFactory();
private final String concurrencyToken = "0-1-2-3-4"; private static final StreamIdentifier STREAM_IDENTIFIER = StreamIdentifier.singleStreamInstance("streamName");
private final String shardId = "shardId-0";
private boolean cleanupLeasesOfCompletedShards = false; /**
private boolean ignoreUnexpectedChildShards = false; * Shard id for the default-provided {@link ShardInfo} and {@link Lease}.
private ShardInfo shardInfo; */
private static final String SHARD_ID = "shardId-0";
private static final ShardInfo SHARD_INFO = new ShardInfo(SHARD_ID, "concurrencyToken",
Collections.emptySet(), ExtendedSequenceNumber.LATEST);
private ShutdownTask task; private ShutdownTask task;
private StreamIdentifier streamIdentifier = StreamIdentifier.singleStreamInstance("streamName");
@Mock @Mock
private RecordsPublisher recordsPublisher; private RecordsPublisher recordsPublisher;
@ -111,20 +117,18 @@ public class ShutdownTaskTest {
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
doNothing().when(recordsPublisher).shutdown();
when(recordProcessorCheckpointer.checkpointer()).thenReturn(checkpointer); when(recordProcessorCheckpointer.checkpointer()).thenReturn(checkpointer);
when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
final Lease childLease = new Lease(); final Lease childLease = new Lease();
childLease.leaseKey("childShardLeaseKey"); childLease.leaseKey("childShardLeaseKey");
when(hierarchicalShardSyncer.createLeaseForChildShard(Matchers.any(ChildShard.class), Matchers.any(StreamIdentifier.class))) when(hierarchicalShardSyncer.createLeaseForChildShard(Matchers.any(ChildShard.class), Matchers.any(StreamIdentifier.class)))
.thenReturn(childLease); .thenReturn(childLease);
setupLease(SHARD_ID, Collections.emptyList());
shardInfo = new ShardInfo(shardId, concurrencyToken, Collections.emptySet(), when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
ExtendedSequenceNumber.LATEST); when(shardDetector.streamIdentifier()).thenReturn(STREAM_IDENTIFIER);
task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, task = createShutdownTask(SHARD_END, constructChildrenFromSplit());
SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
hierarchicalShardSyncer, NULL_METRICS_FACTORY, constructChildShards(), streamIdentifier, leaseCleanupManager);
} }
/** /**
@ -132,13 +136,8 @@ public class ShutdownTaskTest {
* This test is for the scenario that customer doesn't implement checkpoint in their implementation * This test is for the scenario that customer doesn't implement checkpoint in their implementation
*/ */
@Test @Test
public final void testCallWhenApplicationDoesNotCheckpoint() throws Exception { public final void testCallWhenApplicationDoesNotCheckpoint() {
when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(new ExtendedSequenceNumber("3298")); when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(new ExtendedSequenceNumber("3298"));
Lease heldLease = LeaseHelper.createLease("shardId-0", "leaseOwner", Collections.singleton("parentShardId"), Collections.emptyList(), ExtendedSequenceNumber.LATEST);
when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(heldLease);
when(leaseRefresher.getLease("shardId-0")).thenReturn(heldLease);
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
when(leaseCoordinator.updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString())).thenReturn(true);
final TaskResult result = task.call(); final TaskResult result = task.call();
assertNotNull(result.getException()); assertNotNull(result.getException());
@ -151,17 +150,13 @@ public class ShutdownTaskTest {
*/ */
@Test @Test
public final void testCallWhenCreatingNewLeasesThrows() throws Exception { public final void testCallWhenCreatingNewLeasesThrows() throws Exception {
when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
Lease heldLease = LeaseHelper.createLease("shardId-0", "leaseOwner", Collections.singleton("parentShardId"));
when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(heldLease);
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
when(hierarchicalShardSyncer.createLeaseForChildShard(Matchers.any(ChildShard.class), Matchers.any(StreamIdentifier.class))) when(hierarchicalShardSyncer.createLeaseForChildShard(Matchers.any(ChildShard.class), Matchers.any(StreamIdentifier.class)))
.thenThrow(new InvalidStateException("InvalidStateException is thrown")); .thenThrow(new InvalidStateException("InvalidStateException is thrown"));
final TaskResult result = task.call(); final TaskResult result = task.call();
assertNull(result.getException()); assertNull(result.getException());
verify(recordsPublisher).shutdown(); verify(recordsPublisher).shutdown();
verify(shardRecordProcessor, never()).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); verify(shardRecordProcessor, never()).shardEnded(any(ShardEndedInput.class));
verify(shardRecordProcessor).leaseLost(LeaseLostInput.builder().build()); verify(shardRecordProcessor).leaseLost(LeaseLostInput.builder().build());
verify(leaseCoordinator).dropLease(Matchers.any(Lease.class)); verify(leaseCoordinator).dropLease(Matchers.any(Lease.class));
} }
@ -172,145 +167,101 @@ public class ShutdownTaskTest {
*/ */
@Test @Test
public final void testCallWhenTrueShardEnd() throws DependencyException, InvalidStateException, ProvisionedThroughputException { public final void testCallWhenTrueShardEnd() throws DependencyException, InvalidStateException, ProvisionedThroughputException {
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(),
ExtendedSequenceNumber.LATEST);
task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
hierarchicalShardSyncer, NULL_METRICS_FACTORY, constructChildShards(), streamIdentifier, leaseCleanupManager);
when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
Lease heldLease = LeaseHelper.createLease("shardId-0", "leaseOwner", Collections.singleton("parentShardId"));
when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(heldLease);
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
when(leaseCoordinator.updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString())).thenReturn(true);
when(leaseRefresher.getLease("shardId-0")).thenReturn(heldLease);
final TaskResult result = task.call(); final TaskResult result = task.call();
assertNull(result.getException()); assertNull(result.getException());
verify(recordsPublisher).shutdown(); verifyShutdownAndNoDrop();
verify(shardRecordProcessor).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); verify(shardRecordProcessor).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build());
verify(leaseRefresher).updateLeaseWithMetaInfo(Matchers.any(Lease.class), Matchers.any(UpdateField.class)); verify(leaseRefresher).updateLeaseWithMetaInfo(Matchers.any(Lease.class), Matchers.any(UpdateField.class));
verify(leaseRefresher, times(2)).createLeaseIfNotExists(Matchers.any(Lease.class)); verify(leaseRefresher, times(2)).createLeaseIfNotExists(Matchers.any(Lease.class));
verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class)); verify(leaseCleanupManager).enqueueForDeletion(any(LeasePendingDeletion.class));
verify(leaseCleanupManager, times(1)).enqueueForDeletion(any(LeasePendingDeletion.class));
} }
/**
* Tests the scenario when one, but not both, parent shards are accessible.
* This test should drop the lease so another worker can make an attempt.
*/
@Test @Test
public final void testCallThrowsUntilParentInfoNotPresentInLease() throws DependencyException, InvalidStateException, ProvisionedThroughputException { public void testMergeChildWhereOneParentHasLeaseAndInvalidState() throws Exception {
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(), testMergeChildWhereOneParentHasLease(false);
ExtendedSequenceNumber.LATEST);
when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
Lease heldLease = LeaseHelper.createLease("shardId-0", "leaseOwner", ImmutableList.of("parent1", "parent2"));
Lease parentLease = LeaseHelper.createLease("shardId-1", "leaseOwner", Collections.emptyList());
when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(heldLease);
when(leaseCoordinator.getCurrentlyHeldLease("shardId-1"))
.thenReturn(null, null, null, null, null, parentLease);
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
when(leaseCoordinator.updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString())).thenReturn(true);
when(leaseRefresher.getLease("shardId-0")).thenReturn(heldLease);
// Return null lease first time to simulate partial parent lease info
when(leaseRefresher.getLease("shardId-1"))
.thenReturn(null, null, null, null, null, parentLease);
// Make first 5 attempts with partial parent info in lease table
for (int i = 0; i < 5; i++) {
ShutdownTask task = spy(new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
hierarchicalShardSyncer, NULL_METRICS_FACTORY, constructChildShard(), streamIdentifier, leaseCleanupManager));
when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(false);
TaskResult result = task.call();
assertNotNull(result.getException());
assertTrue(result.getException() instanceof BlockedOnParentShardException);
assertTrue(result.getException().getMessage().contains("has partial parent information in lease table"));
verify(recordsPublisher, never()).shutdown();
verify(shardRecordProcessor, never())
.shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build());
verify(leaseCoordinator, never())
.updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString());
verify(leaseRefresher, never()).createLeaseIfNotExists(Matchers.any(Lease.class));
verify(task, times(1)).isOneInNProbability(RETRY_RANDOM_MAX_RANGE);
verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class));
verify(leaseCleanupManager, never()).enqueueForDeletion(any(LeasePendingDeletion.class));
}
// make next attempt with complete parent info in lease table
ShutdownTask task = spy(new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
hierarchicalShardSyncer, NULL_METRICS_FACTORY, constructChildShard(), streamIdentifier, leaseCleanupManager));
when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(false);
TaskResult result = task.call();
assertNull(result.getException());
verify(recordsPublisher).shutdown();
verify(shardRecordProcessor).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build());
verify(leaseRefresher).updateLeaseWithMetaInfo(Matchers.any(Lease.class), Matchers.any(UpdateField.class));
verify(leaseRefresher, times(1)).createLeaseIfNotExists(Matchers.any(Lease.class));
verify(task, never()).isOneInNProbability(RETRY_RANDOM_MAX_RANGE);
verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class));
verify(leaseCleanupManager, times(1)).enqueueForDeletion(any(LeasePendingDeletion.class));
} }
/**
* Tests the scenario when one, but not both, parent shards are accessible.
* This test should retain the lease.
*/
@Test @Test
public final void testCallTriggersLeaseLossWhenParentInfoNotPresentInLease() throws DependencyException, InvalidStateException, ProvisionedThroughputException { public void testMergeChildWhereOneParentHasLeaseAndBlockOnParent() throws Exception {
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(), testMergeChildWhereOneParentHasLease(true);
ExtendedSequenceNumber.LATEST); }
when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END); private void testMergeChildWhereOneParentHasLease(final boolean blockOnParent) throws Exception {
Lease heldLease = LeaseHelper.createLease("shardId-0", "leaseOwner", ImmutableList.of("parent1", "parent2")); // the @Before setup makes the `SHARD_ID` parent accessible
when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(heldLease); final ChildShard mergeChild = constructChildFromMerge();
when(leaseCoordinator.getCurrentlyHeldLease("shardId-1")) final TaskResult result = createShutdownTaskSpy(blockOnParent, Collections.singletonList(mergeChild)).call();
.thenReturn(null, null, null, null, null, null, null, null, null, null, null);
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
when(leaseCoordinator.updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString())).thenReturn(true);
when(leaseRefresher.getLease("shardId-0")).thenReturn(heldLease);
// Return null lease first time to simulate partial parent lease info
when(leaseRefresher.getLease("shardId-1"))
.thenReturn(null, null, null, null, null, null, null, null, null, null, null);
// Make first 10 attempts with partial parent info in lease table if (blockOnParent) {
for (int i = 0; i < 10; i++) {
ShutdownTask task = spy(new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
hierarchicalShardSyncer, NULL_METRICS_FACTORY, constructChildShard(), streamIdentifier, leaseCleanupManager));
when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(false);
TaskResult result = task.call();
assertNotNull(result.getException()); assertNotNull(result.getException());
assertTrue(result.getException() instanceof BlockedOnParentShardException); assertEquals(BlockedOnParentShardException.class, result.getException().getClass());
assertTrue(result.getException().getMessage().contains("has partial parent information in lease table"));
verify(leaseCoordinator, never()).dropLease(any(Lease.class));
verify(shardRecordProcessor, never()).leaseLost(any(LeaseLostInput.class));
verify(recordsPublisher, never()).shutdown(); verify(recordsPublisher, never()).shutdown();
verify(shardRecordProcessor, never()) } else {
.shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); assertNull(result.getException());
verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build());
verify(leaseCoordinator, never()) // verify that only the accessible parent was dropped
.updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString()); final ArgumentCaptor<Lease> leaseCaptor = ArgumentCaptor.forClass(Lease.class);
verify(leaseRefresher, never()).createLeaseIfNotExists(Matchers.any(Lease.class)); verify(leaseCoordinator).dropLease(leaseCaptor.capture());
verify(task, times(1)).isOneInNProbability(RETRY_RANDOM_MAX_RANGE); assertEquals(SHARD_ID, leaseCaptor.getValue().leaseKey());
verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class));
verify(leaseCleanupManager, never()).enqueueForDeletion(any(LeasePendingDeletion.class)); verify(shardRecordProcessor).leaseLost(any(LeaseLostInput.class));
verify(recordsPublisher).shutdown();
} }
// make final attempt with incomplete parent info in lease table // verify that an attempt was made to retrieve both parents
ShutdownTask task = spy(new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, final ArgumentCaptor<String> leaseKeyCaptor = ArgumentCaptor.forClass(String.class);
SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, verify(leaseRefresher, times(mergeChild.parentShards().size())).getLease(leaseKeyCaptor.capture());
ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, assertEquals(mergeChild.parentShards(), leaseKeyCaptor.getAllValues());
hierarchicalShardSyncer, NULL_METRICS_FACTORY, constructChildShard(), streamIdentifier, leaseCleanupManager));
when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(true);
TaskResult result = task.call();
assertNull(result.getException());
verify(recordsPublisher).shutdown();
verify(shardRecordProcessor, never()).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
verify(shardRecordProcessor).leaseLost(LeaseLostInput.builder().build());
verify(leaseRefresher, never()).updateLeaseWithMetaInfo(Matchers.any(Lease.class), Matchers.any(UpdateField.class));
verify(leaseRefresher, never()).createLeaseIfNotExists(Matchers.any(Lease.class));
verify(task, times(1)).isOneInNProbability(RETRY_RANDOM_MAX_RANGE);
verify(leaseCoordinator).dropLease(Matchers.any(Lease.class));
verify(leaseCleanupManager, never()).enqueueForDeletion(any(LeasePendingDeletion.class)); verify(leaseCleanupManager, never()).enqueueForDeletion(any(LeasePendingDeletion.class));
verify(leaseRefresher, never()).updateLeaseWithMetaInfo(any(Lease.class), any(UpdateField.class));
verify(leaseRefresher, never()).createLeaseIfNotExists(any(Lease.class));
verify(shardRecordProcessor, never()).shardEnded(any(ShardEndedInput.class));
}
@Test
public final void testMergeChildWhereBothParentsHaveLeases() throws Exception {
// the @Before test setup makes the `SHARD_ID` parent accessible
final ChildShard mergeChild = constructChildFromMerge();
// make second parent accessible
setupLease(mergeChild.parentShards().get(1), Collections.emptyList());
final Lease mockChildLease = mock(Lease.class);
when(hierarchicalShardSyncer.createLeaseForChildShard(mergeChild, STREAM_IDENTIFIER))
.thenReturn(mockChildLease);
final TaskResult result = createShutdownTask(SHARD_END, Collections.singletonList(mergeChild)).call();
assertNull(result.getException());
verify(leaseCleanupManager).enqueueForDeletion(any(LeasePendingDeletion.class));
final ArgumentCaptor<Lease> updateLeaseCaptor = ArgumentCaptor.forClass(Lease.class);
verify(leaseRefresher).updateLeaseWithMetaInfo(updateLeaseCaptor.capture(), eq(UpdateField.CHILD_SHARDS));
final Lease updatedLease = updateLeaseCaptor.getValue();
assertEquals(SHARD_ID, updatedLease.leaseKey());
assertEquals(Collections.singleton(mergeChild.shardId()), updatedLease.childShardIds());
verify(leaseRefresher).createLeaseIfNotExists(mockChildLease);
// verify all parent+child leases were retrieved
final Set<String> expectedShardIds = new HashSet<>(mergeChild.parentShards());
expectedShardIds.add(mergeChild.shardId());
final ArgumentCaptor<String> leaseKeyCaptor = ArgumentCaptor.forClass(String.class);
verify(leaseRefresher, atLeast(expectedShardIds.size())).getLease(leaseKeyCaptor.capture());
assertEquals(expectedShardIds, new HashSet<>(leaseKeyCaptor.getAllValues()));
verifyShutdownAndNoDrop();
verify(shardRecordProcessor).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
} }
/** /**
@ -319,25 +270,15 @@ public class ShutdownTaskTest {
*/ */
@Test @Test
public final void testCallWhenShardNotFound() throws Exception { public final void testCallWhenShardNotFound() throws Exception {
final Lease heldLease = LeaseHelper.createLease("shardId-4", "leaseOwner", Collections.emptyList()); final Lease lease = setupLease("shardId-4", Collections.emptyList());
shardInfo = new ShardInfo("shardId-4", concurrencyToken, Collections.emptySet(), final ShardInfo shardInfo = new ShardInfo(lease.leaseKey(), "concurrencyToken", Collections.emptySet(),
ExtendedSequenceNumber.LATEST); ExtendedSequenceNumber.LATEST);
task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
hierarchicalShardSyncer, NULL_METRICS_FACTORY, new ArrayList<>(), streamIdentifier, leaseCleanupManager);
when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END); final TaskResult result = createShutdownTask(SHARD_END, Collections.emptyList(), shardInfo).call();
when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher);
when(leaseRefresher.getLease("shardId-4")).thenReturn(heldLease);
when(leaseCoordinator.getCurrentlyHeldLease("shardId-4")).thenReturn(heldLease);
final TaskResult result = task.call();
assertNull(result.getException()); assertNull(result.getException());
verify(recordsPublisher).shutdown(); verifyShutdownAndNoDrop();
verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build()); verify(leaseRefresher, never()).createLeaseIfNotExists(any(Lease.class));
verify(leaseRefresher, never()).createLeaseIfNotExists(Matchers.any(Lease.class));
verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class));
} }
/** /**
@ -346,14 +287,8 @@ public class ShutdownTaskTest {
*/ */
@Test @Test
public final void testCallWhenLeaseLost() throws DependencyException, InvalidStateException, ProvisionedThroughputException { public final void testCallWhenLeaseLost() throws DependencyException, InvalidStateException, ProvisionedThroughputException {
shardInfo = new ShardInfo("shardId-4", concurrencyToken, Collections.emptySet(), final TaskResult result = createShutdownTask(LEASE_LOST, Collections.emptyList()).call();
ExtendedSequenceNumber.LATEST);
task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
LEASE_LOST_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
hierarchicalShardSyncer, NULL_METRICS_FACTORY, new ArrayList<>(), streamIdentifier, leaseCleanupManager);
final TaskResult result = task.call();
assertNull(result.getException()); assertNull(result.getException());
verify(recordsPublisher).shutdown(); verify(recordsPublisher).shutdown();
verify(shardRecordProcessor, never()).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); verify(shardRecordProcessor, never()).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
@ -362,6 +297,17 @@ public class ShutdownTaskTest {
verify(leaseRefresher, never()).createLeaseIfNotExists(any(Lease.class)); verify(leaseRefresher, never()).createLeaseIfNotExists(any(Lease.class));
verify(leaseCoordinator, never()).dropLease(any(Lease.class)); verify(leaseCoordinator, never()).dropLease(any(Lease.class));
} }
@Test
public void testNullChildShards() throws Exception {
final TaskResult result = createShutdownTask(SHARD_END, null).call();
assertNull(result.getException());
verifyShutdownAndNoDrop();
verify(leaseCleanupManager).enqueueForDeletion(any(LeasePendingDeletion.class));
verify(leaseRefresher, never()).createLeaseIfNotExists(any(Lease.class));
}
/** /**
* Test method for {@link ShutdownTask#taskType()}. * Test method for {@link ShutdownTask#taskType()}.
*/ */
@ -370,10 +316,24 @@ public class ShutdownTaskTest {
assertEquals(TaskType.SHUTDOWN, task.taskType()); assertEquals(TaskType.SHUTDOWN, task.taskType());
} }
private List<ChildShard> constructChildShards() { private void verifyShutdownAndNoDrop() {
List<ChildShard> childShards = new ArrayList<>(); verify(recordsPublisher).shutdown();
List<String> parentShards = new ArrayList<>(); verify(leaseCoordinator, never()).dropLease(any(Lease.class));
parentShards.add(shardId); verify(shardRecordProcessor, never()).leaseLost(any(LeaseLostInput.class));
}
private Lease setupLease(final String leaseKey, final Collection<String> parentShardIds) throws Exception {
final Lease lease = LeaseHelper.createLease(leaseKey, "leaseOwner", parentShardIds);
when(leaseCoordinator.getCurrentlyHeldLease(lease.leaseKey())).thenReturn(lease);
when(leaseRefresher.getLease(lease.leaseKey())).thenReturn(lease);
return lease;
}
/**
* Constructs two {@link ChildShard}s that mimic a shard split operation.
*/
private List<ChildShard> constructChildrenFromSplit() {
List<String> parentShards = Collections.singletonList(SHARD_ID);
ChildShard leftChild = ChildShard.builder() ChildShard leftChild = ChildShard.builder()
.shardId("ShardId-1") .shardId("ShardId-1")
.parentShards(parentShards) .parentShards(parentShards)
@ -384,22 +344,36 @@ public class ShutdownTaskTest {
.parentShards(parentShards) .parentShards(parentShards)
.hashKeyRange(ShardObjectHelper.newHashKeyRange("50", "99")) .hashKeyRange(ShardObjectHelper.newHashKeyRange("50", "99"))
.build(); .build();
childShards.add(leftChild); return Arrays.asList(leftChild, rightChild);
childShards.add(rightChild);
return childShards;
} }
private List<ChildShard> constructChildShard() { /**
List<ChildShard> childShards = new ArrayList<>(); * Constructs a {@link ChildShard} that mimics a shard merge operation.
List<String> parentShards = new ArrayList<>(); */
parentShards.add(shardId); private ChildShard constructChildFromMerge() {
parentShards.add("shardId-1"); List<String> parentShards = Arrays.asList(SHARD_ID, "shardId-1");
ChildShard leftChild = ChildShard.builder() return ChildShard.builder()
.shardId("shardId-2") .shardId("shardId-2")
.parentShards(parentShards) .parentShards(parentShards)
.hashKeyRange(ShardObjectHelper.newHashKeyRange("0", "49")) .hashKeyRange(ShardObjectHelper.newHashKeyRange("0", "49"))
.build(); .build();
childShards.add(leftChild); }
return childShards;
private ShutdownTask createShutdownTaskSpy(final boolean blockOnParent, final List<ChildShard> childShards) {
final ShutdownTask spy = spy(createShutdownTask(SHARD_END, childShards));
when(spy.isOneInNProbability(ShutdownTask.RETRY_RANDOM_MAX_RANGE)).thenReturn(!blockOnParent);
return spy;
}
private ShutdownTask createShutdownTask(final ShutdownReason reason, final List<ChildShard> childShards) {
return createShutdownTask(reason, childShards, SHARD_INFO);
}
private ShutdownTask createShutdownTask(final ShutdownReason reason, final List<ChildShard> childShards,
final ShardInfo shardInfo) {
return new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
reason, INITIAL_POSITION_TRIM_HORIZON, false, false,
leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, hierarchicalShardSyncer,
NULL_METRICS_FACTORY, childShards, STREAM_IDENTIFIER, leaseCleanupManager);
} }
} }