From 6be92dc4ef30b7d9745263508b039eadc93d4671 Mon Sep 17 00:00:00 2001 From: stair <123031771+stair-aws@users.noreply.github.com> Date: Tue, 21 Mar 2023 19:52:17 -0400 Subject: [PATCH] Added metrics in `ShutdownTask` for scenarios when parent leases are missing. (#1080) + optimizations in `ShutdownTask` (e.g., `Random` static instance, eliminated over-used Function) + DRY+KISS on `ShutdownTaskTest` + deleted some dead code --- .../kinesis/lifecycle/ConsumerStates.java | 13 - .../amazon/kinesis/lifecycle/ProcessTask.java | 22 -- .../kinesis/lifecycle/ShardConsumer.java | 8 +- .../kinesis/lifecycle/ShutdownTask.java | 130 ++++--- .../kinesis/lifecycle/ShutdownTaskTest.java | 358 ++++++++---------- 5 files changed, 244 insertions(+), 287 deletions(-) diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java index c4a87082..8a0cd358 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java @@ -93,24 +93,11 @@ class ConsumerStates { } } - /** * The initial state that any {@link ShardConsumer} should start in. */ static final ConsumerState INITIAL_STATE = ShardConsumerState.WAITING_ON_PARENT_SHARDS.consumerState(); - private static ConsumerState shutdownStateFor(ShutdownReason reason) { - switch (reason) { - case REQUESTED: - return ShardConsumerState.SHUTDOWN_REQUESTED.consumerState(); - case SHARD_END: - case LEASE_LOST: - return ShardConsumerState.SHUTTING_DOWN.consumerState(); - default: - throw new IllegalArgumentException("Unknown reason: " + reason); - } - } - /** * This is the initial state of a shard consumer. This causes the consumer to remain blocked until the all parent * shards have been completed. diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java index 9f616b0d..c3f9523d 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ProcessTask.java @@ -243,28 +243,6 @@ public class ProcessTask implements ConsumerTask { return (!records.isEmpty()) || shouldCallProcessRecordsEvenForEmptyRecordList; } - /** - * Emits metrics, and sleeps if there are no records available - * - * @param startTimeMillis - * the time when the task started - */ - private void handleNoRecords(long startTimeMillis) { - log.debug("Kinesis didn't return any records for shard {}", shardInfoId); - - long sleepTimeMillis = idleTimeInMilliseconds - (System.currentTimeMillis() - startTimeMillis); - if (sleepTimeMillis > 0) { - sleepTimeMillis = Math.max(sleepTimeMillis, idleTimeInMilliseconds); - try { - log.debug("Sleeping for {} ms since there were no new records in shard {}", sleepTimeMillis, - shardInfoId); - Thread.sleep(sleepTimeMillis); - } catch (InterruptedException e) { - log.debug("ShardId {}: Sleep was interrupted", shardInfoId); - } - } - } - @Override public TaskType taskType() { return taskType; diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java index a575a953..4162ea81 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumer.java @@ -59,7 +59,13 @@ public class ShardConsumer { private final ShardConsumerArgument shardConsumerArgument; @NonNull private final Optional logWarningForTaskAfterMillis; + + /** + * @deprecated unused; to be removed in a "major" version bump + */ + @Deprecated private final Function taskMetricsDecorator; + private final int bufferSize; private final TaskExecutionListener taskExecutionListener; private final String streamIdentifier; @@ -179,7 +185,6 @@ public class ShardConsumer { } stateChangeFuture = initializeComplete(); } - } catch (InterruptedException e) { // // Ignored should be handled by scheduler @@ -199,7 +204,6 @@ public class ShardConsumer { throw (Error) t; } } - } @VisibleForTesting diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index 0322c0e2..31bc8f88 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -34,6 +34,7 @@ import software.amazon.kinesis.leases.HierarchicalShardSyncer; import software.amazon.kinesis.leases.Lease; import software.amazon.kinesis.leases.LeaseCleanupManager; import software.amazon.kinesis.leases.LeaseCoordinator; +import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.UpdateField; @@ -54,7 +55,6 @@ import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import java.util.Random; import java.util.Set; -import java.util.function.Function; import java.util.stream.Collectors; /** @@ -66,6 +66,14 @@ import java.util.stream.Collectors; public class ShutdownTask implements ConsumerTask { private static final String SHUTDOWN_TASK_OPERATION = "ShutdownTask"; private static final String RECORD_PROCESSOR_SHUTDOWN_METRIC = "RecordProcessor.shutdown"; + + /** + * Reusable, immutable {@link LeaseLostInput}. + */ + private static final LeaseLostInput LEASE_LOST_INPUT = LeaseLostInput.builder().build(); + + private static final Random RANDOM = new Random(); + @VisibleForTesting static final int RETRY_RANDOM_MAX_RANGE = 30; @@ -101,8 +109,6 @@ public class ShutdownTask implements ConsumerTask { @NonNull private final LeaseCleanupManager leaseCleanupManager; - private static final Function leaseKeyProvider = shardInfo -> ShardInfo.getLeaseKey(shardInfo); - /* * Invokes ShardRecordProcessor shutdown() API. * (non-Javadoc) @@ -114,61 +120,61 @@ public class ShutdownTask implements ConsumerTask { recordProcessorCheckpointer.checkpointer().operation(SHUTDOWN_TASK_OPERATION); final MetricsScope scope = MetricsUtil.createMetricsWithOperation(metricsFactory, SHUTDOWN_TASK_OPERATION); - Exception exception; - + final String leaseKey = ShardInfo.getLeaseKey(shardInfo); try { try { log.debug("Invoking shutdown() for shard {} with childShards {}, concurrencyToken {}. Shutdown reason: {}", - leaseKeyProvider.apply(shardInfo), childShards, shardInfo.concurrencyToken(), reason); + leaseKey, childShards, shardInfo.concurrencyToken(), reason); final long startTime = System.currentTimeMillis(); - final Lease currentShardLease = leaseCoordinator.getCurrentlyHeldLease(leaseKeyProvider.apply(shardInfo)); - final Runnable leaseLostAction = () -> shardRecordProcessor.leaseLost(LeaseLostInput.builder().build()); + final Lease currentShardLease = leaseCoordinator.getCurrentlyHeldLease(leaseKey); + final Runnable leaseLostAction = () -> shardRecordProcessor.leaseLost(LEASE_LOST_INPUT); if (reason == ShutdownReason.SHARD_END) { try { - takeShardEndAction(currentShardLease, scope, startTime); + takeShardEndAction(currentShardLease, leaseKey, scope, startTime); } catch (InvalidStateException e) { // If InvalidStateException happens, it indicates we have a non recoverable error in short term. - // In this scenario, we should shutdown the shardConsumer with LEASE_LOST reason to allow other worker to take the lease and retry shutting down. + // In this scenario, we should shutdown the shardConsumer with LEASE_LOST reason to allow + // other worker to take the lease and retry shutting down. log.warn("Lease {}: Invalid state encountered while shutting down shardConsumer with SHARD_END reason. " + - "Dropping the lease and shutting down shardConsumer using LEASE_LOST reason. ", leaseKeyProvider.apply(shardInfo), e); - dropLease(currentShardLease); - throwOnApplicationException(leaseLostAction, scope, startTime); + "Dropping the lease and shutting down shardConsumer using LEASE_LOST reason.", + leaseKey, e); + dropLease(currentShardLease, leaseKey); + throwOnApplicationException(leaseKey, leaseLostAction, scope, startTime); } } else { - throwOnApplicationException(leaseLostAction, scope, startTime); + throwOnApplicationException(leaseKey, leaseLostAction, scope, startTime); } - log.debug("Shutting down retrieval strategy for shard {}.", leaseKeyProvider.apply(shardInfo)); + log.debug("Shutting down retrieval strategy for shard {}.", leaseKey); recordsPublisher.shutdown(); - log.debug("Record processor completed shutdown() for shard {}", leaseKeyProvider.apply(shardInfo)); + log.debug("Record processor completed shutdown() for shard {}", leaseKey); return new TaskResult(null); } catch (Exception e) { if (e instanceof CustomerApplicationException) { - log.error("Shard {}: Application exception. ", leaseKeyProvider.apply(shardInfo), e); + log.error("Shard {}: Application exception.", leaseKey, e); } else { - log.error("Shard {}: Caught exception: ", leaseKeyProvider.apply(shardInfo), e); + log.error("Shard {}: Caught exception:", leaseKey, e); } - exception = e; // backoff if we encounter an exception. try { Thread.sleep(this.backoffTimeMillis); } catch (InterruptedException ie) { - log.debug("Shard {}: Interrupted sleep", leaseKeyProvider.apply(shardInfo), ie); + log.debug("Shard {}: Interrupted sleep", leaseKey, ie); } + + return new TaskResult(e); } } finally { MetricsUtil.endScope(scope); } - - return new TaskResult(exception); } // Involves persisting child shard info, attempt to checkpoint and enqueueing lease for cleanup. private void takeShardEndAction(Lease currentShardLease, - MetricsScope scope, long startTime) + final String leaseKey, MetricsScope scope, long startTime) throws DependencyException, ProvisionedThroughputException, InvalidStateException, CustomerApplicationException { // Create new lease for the child shards if they don't exist. @@ -177,7 +183,7 @@ public class ShutdownTask implements ConsumerTask { // In this case, KinesisDataFetcher and FanOutRecordsPublisher will send out SHARD_END signal to trigger a shutdown task with empty list of childShards. // This scenario could happen when customer deletes the stream while leaving the KCL application running. if (currentShardLease == null) { - throw new InvalidStateException(leaseKeyProvider.apply(shardInfo) + throw new InvalidStateException(leaseKey + " : Lease not owned by the current worker. Leaving ShardEnd handling to new owner."); } if (!CollectionUtils.isNullOrEmpty(childShards)) { @@ -189,7 +195,7 @@ public class ShutdownTask implements ConsumerTask { if (!leaseCleanupManager.isEnqueuedForDeletion(leasePendingDeletion)) { boolean isSuccess = false; try { - isSuccess = attemptShardEndCheckpointing(scope, startTime); + isSuccess = attemptShardEndCheckpointing(leaseKey, scope, startTime); } finally { // Check if either the shard end ddb persist is successful or // if childshards is empty. When child shards is empty then either it is due to @@ -202,38 +208,41 @@ public class ShutdownTask implements ConsumerTask { } } - private boolean attemptShardEndCheckpointing(MetricsScope scope, long startTime) + private boolean attemptShardEndCheckpointing(final String leaseKey, MetricsScope scope, long startTime) throws DependencyException, ProvisionedThroughputException, InvalidStateException, CustomerApplicationException { - final Lease leaseFromDdb = Optional.ofNullable(leaseCoordinator.leaseRefresher().getLease(leaseKeyProvider.apply(shardInfo))) - .orElseThrow(() -> new InvalidStateException("Lease for shard " + leaseKeyProvider.apply(shardInfo) + " does not exist.")); + final Lease leaseFromDdb = Optional.ofNullable(leaseCoordinator.leaseRefresher().getLease(leaseKey)) + .orElseThrow(() -> new InvalidStateException("Lease for shard " + leaseKey + " does not exist.")); if (!leaseFromDdb.checkpoint().equals(ExtendedSequenceNumber.SHARD_END)) { // Call the shardRecordsProcessor to checkpoint with SHARD_END sequence number. - // The shardEnded is implemented by customer. We should validate if the SHARD_END checkpointing is successful after calling shardEnded. - throwOnApplicationException(() -> applicationCheckpointAndVerification(), scope, startTime); + // The shardEnded is implemented by customer. We should validate if the SHARD_END checkpointing is + // successful after calling shardEnded. + throwOnApplicationException(leaseKey, () -> applicationCheckpointAndVerification(leaseKey), + scope, startTime); } return true; } - private void applicationCheckpointAndVerification() { + private void applicationCheckpointAndVerification(final String leaseKey) { recordProcessorCheckpointer .sequenceNumberAtShardEnd(recordProcessorCheckpointer.largestPermittedCheckpointValue()); recordProcessorCheckpointer.largestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END); shardRecordProcessor.shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); final ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.lastCheckpointValue(); - if (lastCheckpointValue == null - || !lastCheckpointValue.equals(ExtendedSequenceNumber.SHARD_END)) { + if (!ExtendedSequenceNumber.SHARD_END.equals(lastCheckpointValue)) { throw new IllegalArgumentException("Application didn't checkpoint at end of shard " - + leaseKeyProvider.apply(shardInfo) + ". Application must checkpoint upon shard end. " + + + leaseKey + ". Application must checkpoint upon shard end. " + "See ShardRecordProcessor.shardEnded javadocs for more information."); } } - private void throwOnApplicationException(Runnable action, MetricsScope metricsScope, final long startTime) throws CustomerApplicationException { + private void throwOnApplicationException(final String leaseKey, Runnable action, MetricsScope metricsScope, + final long startTime) + throws CustomerApplicationException { try { action.run(); } catch (Exception e) { - throw new CustomerApplicationException("Customer application throws exception for shard " + leaseKeyProvider.apply(shardInfo) +": ", e); + throw new CustomerApplicationException("Customer application throws exception for shard " + leaseKey + ": ", e); } finally { MetricsUtil.addLatency(metricsScope, RECORD_PROCESSOR_SHUTDOWN_METRIC, startTime, MetricsLevel.SUMMARY); } @@ -241,41 +250,48 @@ public class ShutdownTask implements ConsumerTask { private void createLeasesForChildShardsIfNotExist(MetricsScope scope) throws DependencyException, InvalidStateException, ProvisionedThroughputException { + final LeaseRefresher leaseRefresher = leaseCoordinator.leaseRefresher(); + // For child shard resulted from merge of two parent shards, verify if both the parents are either present or // not present in the lease table before creating the lease entry. - if (!CollectionUtils.isNullOrEmpty(childShards) && childShards.size() == 1) { + if (childShards.size() == 1) { final ChildShard childShard = childShards.get(0); final List parentLeaseKeys = childShard.parentShards().stream() .map(parentShardId -> ShardInfo.getLeaseKey(shardInfo, parentShardId)).collect(Collectors.toList()); if (parentLeaseKeys.size() != 2) { + MetricsUtil.addCount(scope, "MissingMergeParent", 1, MetricsLevel.SUMMARY); throw new InvalidStateException("Shard " + shardInfo.shardId() + "'s only child shard " + childShard + " does not contain other parent information."); - } else { - boolean isValidLeaseTableState = - Objects.isNull(leaseCoordinator.leaseRefresher().getLease(parentLeaseKeys.get(0))) == Objects - .isNull(leaseCoordinator.leaseRefresher().getLease(parentLeaseKeys.get(1))); - if (!isValidLeaseTableState) { - if (!isOneInNProbability(RETRY_RANDOM_MAX_RANGE)) { - throw new BlockedOnParentShardException( - "Shard " + shardInfo.shardId() + "'s only child shard " + childShard - + " has partial parent information in lease table. Hence deferring lease creation of child shard."); - } else { - throw new InvalidStateException( - "Shard " + shardInfo.shardId() + "'s only child shard " + childShard - + " has partial parent information in lease table. Hence deferring lease creation of child shard."); - } + } + + final Lease parentLease0 = leaseRefresher.getLease(parentLeaseKeys.get(0)); + final Lease parentLease1 = leaseRefresher.getLease(parentLeaseKeys.get(1)); + if (Objects.isNull(parentLease0) != Objects.isNull(parentLease1)) { + MetricsUtil.addCount(scope, "MissingMergeParentLease", 1, MetricsLevel.SUMMARY); + final String message = "Shard " + shardInfo.shardId() + "'s only child shard " + childShard + + " has partial parent information in lease table: [parent0=" + parentLease0 + + ", parent1=" + parentLease1 + "]. Hence deferring lease creation of child shard."; + if (isOneInNProbability(RETRY_RANDOM_MAX_RANGE)) { + // abort further attempts and drop the lease; lease will + // be reassigned + throw new InvalidStateException(message); + } else { + // initiate a Thread.sleep(...) and keep the lease; + // keeping the lease decreases churn of lease reassignments + throw new BlockedOnParentShardException(message); } } } + for(ChildShard childShard : childShards) { final String leaseKey = ShardInfo.getLeaseKey(shardInfo, childShard.shardId()); - if(leaseCoordinator.leaseRefresher().getLease(leaseKey) == null) { + if (leaseRefresher.getLease(leaseKey) == null) { log.debug("{} - Shard {} - Attempting to create lease for child shard {}", shardDetector.streamIdentifier(), shardInfo.shardId(), leaseKey); final Lease leaseToCreate = hierarchicalShardSyncer.createLeaseForChildShard(childShard, shardDetector.streamIdentifier()); final long startTime = System.currentTimeMillis(); boolean success = false; try { - leaseCoordinator.leaseRefresher().createLeaseIfNotExists(leaseToCreate); + leaseRefresher.createLeaseIfNotExists(leaseToCreate); success = true; } finally { MetricsUtil.addSuccessAndLatency(scope, "CreateLease", success, startTime, MetricsLevel.DETAILED); @@ -295,8 +311,7 @@ public class ShutdownTask implements ConsumerTask { */ @VisibleForTesting boolean isOneInNProbability(int n) { - Random r = new Random(); - return 1 == r.nextInt((n - 1) + 1) + 1; + return 0 == RANDOM.nextInt(n); } private void updateLeaseWithChildShards(Lease currentLease) @@ -324,10 +339,9 @@ public class ShutdownTask implements ConsumerTask { return reason; } - private void dropLease(Lease currentLease) { + private void dropLease(Lease currentLease, final String leaseKey) { if (currentLease == null) { - log.warn("Shard {}: Unable to find the lease for shard. Will shutdown the shardConsumer directly.", leaseKeyProvider.apply(shardInfo)); - return; + log.warn("Shard {}: Unable to find the lease for shard. Will shutdown the shardConsumer directly.", leaseKey); } else { leaseCoordinator.dropLease(currentLease); log.info("Dropped lease for shutting down ShardConsumer: " + currentLease.leaseKey()); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java index 6617984d..5967ea2a 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java @@ -18,24 +18,29 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.atLeast; +import static org.mockito.Mockito.eq; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static software.amazon.kinesis.lifecycle.ShutdownTask.RETRY_RANDOM_MAX_RANGE; +import static software.amazon.kinesis.lifecycle.ShutdownReason.LEASE_LOST; +import static software.amazon.kinesis.lifecycle.ShutdownReason.SHARD_END; -import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.List; -import java.util.UUID; +import java.util.Set; -import com.google.common.collect.ImmutableList; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; import org.mockito.Matchers; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; @@ -78,18 +83,19 @@ public class ShutdownTaskTest { private static final long TASK_BACKOFF_TIME_MILLIS = 1L; private static final InitialPositionInStreamExtended INITIAL_POSITION_TRIM_HORIZON = InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON); - private static final ShutdownReason SHARD_END_SHUTDOWN_REASON = ShutdownReason.SHARD_END; - private static final ShutdownReason LEASE_LOST_SHUTDOWN_REASON = ShutdownReason.LEASE_LOST; private static final MetricsFactory NULL_METRICS_FACTORY = new NullMetricsFactory(); - private final String concurrencyToken = "0-1-2-3-4"; - private final String shardId = "shardId-0"; - private boolean cleanupLeasesOfCompletedShards = false; - private boolean ignoreUnexpectedChildShards = false; - private ShardInfo shardInfo; + private static final StreamIdentifier STREAM_IDENTIFIER = StreamIdentifier.singleStreamInstance("streamName"); + + /** + * Shard id for the default-provided {@link ShardInfo} and {@link Lease}. + */ + private static final String SHARD_ID = "shardId-0"; + private static final ShardInfo SHARD_INFO = new ShardInfo(SHARD_ID, "concurrencyToken", + Collections.emptySet(), ExtendedSequenceNumber.LATEST); + private ShutdownTask task; - private StreamIdentifier streamIdentifier = StreamIdentifier.singleStreamInstance("streamName"); - + @Mock private RecordsPublisher recordsPublisher; @Mock @@ -111,20 +117,18 @@ public class ShutdownTaskTest { @Before public void setUp() throws Exception { - doNothing().when(recordsPublisher).shutdown(); when(recordProcessorCheckpointer.checkpointer()).thenReturn(checkpointer); + when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END); final Lease childLease = new Lease(); childLease.leaseKey("childShardLeaseKey"); when(hierarchicalShardSyncer.createLeaseForChildShard(Matchers.any(ChildShard.class), Matchers.any(StreamIdentifier.class))) .thenReturn(childLease); + setupLease(SHARD_ID, Collections.emptyList()); - shardInfo = new ShardInfo(shardId, concurrencyToken, Collections.emptySet(), - ExtendedSequenceNumber.LATEST); + when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher); + when(shardDetector.streamIdentifier()).thenReturn(STREAM_IDENTIFIER); - task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, - SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, - ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, - hierarchicalShardSyncer, NULL_METRICS_FACTORY, constructChildShards(), streamIdentifier, leaseCleanupManager); + task = createShutdownTask(SHARD_END, constructChildrenFromSplit()); } /** @@ -132,13 +136,8 @@ public class ShutdownTaskTest { * This test is for the scenario that customer doesn't implement checkpoint in their implementation */ @Test - public final void testCallWhenApplicationDoesNotCheckpoint() throws Exception { + public final void testCallWhenApplicationDoesNotCheckpoint() { when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(new ExtendedSequenceNumber("3298")); - Lease heldLease = LeaseHelper.createLease("shardId-0", "leaseOwner", Collections.singleton("parentShardId"), Collections.emptyList(), ExtendedSequenceNumber.LATEST); - when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(heldLease); - when(leaseRefresher.getLease("shardId-0")).thenReturn(heldLease); - when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher); - when(leaseCoordinator.updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString())).thenReturn(true); final TaskResult result = task.call(); assertNotNull(result.getException()); @@ -151,17 +150,13 @@ public class ShutdownTaskTest { */ @Test public final void testCallWhenCreatingNewLeasesThrows() throws Exception { - when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END); - Lease heldLease = LeaseHelper.createLease("shardId-0", "leaseOwner", Collections.singleton("parentShardId")); - when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(heldLease); - when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher); when(hierarchicalShardSyncer.createLeaseForChildShard(Matchers.any(ChildShard.class), Matchers.any(StreamIdentifier.class))) .thenThrow(new InvalidStateException("InvalidStateException is thrown")); final TaskResult result = task.call(); assertNull(result.getException()); verify(recordsPublisher).shutdown(); - verify(shardRecordProcessor, never()).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); + verify(shardRecordProcessor, never()).shardEnded(any(ShardEndedInput.class)); verify(shardRecordProcessor).leaseLost(LeaseLostInput.builder().build()); verify(leaseCoordinator).dropLease(Matchers.any(Lease.class)); } @@ -172,145 +167,101 @@ public class ShutdownTaskTest { */ @Test public final void testCallWhenTrueShardEnd() throws DependencyException, InvalidStateException, ProvisionedThroughputException { - shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(), - ExtendedSequenceNumber.LATEST); - task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, - SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, - ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, - hierarchicalShardSyncer, NULL_METRICS_FACTORY, constructChildShards(), streamIdentifier, leaseCleanupManager); - - when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END); - Lease heldLease = LeaseHelper.createLease("shardId-0", "leaseOwner", Collections.singleton("parentShardId")); - when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(heldLease); - when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher); - when(leaseCoordinator.updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString())).thenReturn(true); - when(leaseRefresher.getLease("shardId-0")).thenReturn(heldLease); - final TaskResult result = task.call(); assertNull(result.getException()); - verify(recordsPublisher).shutdown(); + verifyShutdownAndNoDrop(); verify(shardRecordProcessor).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); - verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build()); verify(leaseRefresher).updateLeaseWithMetaInfo(Matchers.any(Lease.class), Matchers.any(UpdateField.class)); verify(leaseRefresher, times(2)).createLeaseIfNotExists(Matchers.any(Lease.class)); - verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class)); - verify(leaseCleanupManager, times(1)).enqueueForDeletion(any(LeasePendingDeletion.class)); + verify(leaseCleanupManager).enqueueForDeletion(any(LeasePendingDeletion.class)); } + /** + * Tests the scenario when one, but not both, parent shards are accessible. + * This test should drop the lease so another worker can make an attempt. + */ @Test - public final void testCallThrowsUntilParentInfoNotPresentInLease() throws DependencyException, InvalidStateException, ProvisionedThroughputException { - shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(), - ExtendedSequenceNumber.LATEST); - when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END); - Lease heldLease = LeaseHelper.createLease("shardId-0", "leaseOwner", ImmutableList.of("parent1", "parent2")); - Lease parentLease = LeaseHelper.createLease("shardId-1", "leaseOwner", Collections.emptyList()); - when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(heldLease); - when(leaseCoordinator.getCurrentlyHeldLease("shardId-1")) - .thenReturn(null, null, null, null, null, parentLease); - when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher); - when(leaseCoordinator.updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString())).thenReturn(true); - when(leaseRefresher.getLease("shardId-0")).thenReturn(heldLease); - // Return null lease first time to simulate partial parent lease info - when(leaseRefresher.getLease("shardId-1")) - .thenReturn(null, null, null, null, null, parentLease); - - // Make first 5 attempts with partial parent info in lease table - for (int i = 0; i < 5; i++) { - ShutdownTask task = spy(new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, - SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, - ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, - hierarchicalShardSyncer, NULL_METRICS_FACTORY, constructChildShard(), streamIdentifier, leaseCleanupManager)); - when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(false); - TaskResult result = task.call(); - assertNotNull(result.getException()); - assertTrue(result.getException() instanceof BlockedOnParentShardException); - assertTrue(result.getException().getMessage().contains("has partial parent information in lease table")); - verify(recordsPublisher, never()).shutdown(); - verify(shardRecordProcessor, never()) - .shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); - verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build()); - verify(leaseCoordinator, never()) - .updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString()); - verify(leaseRefresher, never()).createLeaseIfNotExists(Matchers.any(Lease.class)); - verify(task, times(1)).isOneInNProbability(RETRY_RANDOM_MAX_RANGE); - verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class)); - verify(leaseCleanupManager, never()).enqueueForDeletion(any(LeasePendingDeletion.class)); - } - - // make next attempt with complete parent info in lease table - ShutdownTask task = spy(new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, - SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, - ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, - hierarchicalShardSyncer, NULL_METRICS_FACTORY, constructChildShard(), streamIdentifier, leaseCleanupManager)); - when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(false); - TaskResult result = task.call(); - assertNull(result.getException()); - verify(recordsPublisher).shutdown(); - verify(shardRecordProcessor).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); - verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build()); - verify(leaseRefresher).updateLeaseWithMetaInfo(Matchers.any(Lease.class), Matchers.any(UpdateField.class)); - verify(leaseRefresher, times(1)).createLeaseIfNotExists(Matchers.any(Lease.class)); - verify(task, never()).isOneInNProbability(RETRY_RANDOM_MAX_RANGE); - verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class)); - verify(leaseCleanupManager, times(1)).enqueueForDeletion(any(LeasePendingDeletion.class)); + public void testMergeChildWhereOneParentHasLeaseAndInvalidState() throws Exception { + testMergeChildWhereOneParentHasLease(false); } + /** + * Tests the scenario when one, but not both, parent shards are accessible. + * This test should retain the lease. + */ @Test - public final void testCallTriggersLeaseLossWhenParentInfoNotPresentInLease() throws DependencyException, InvalidStateException, ProvisionedThroughputException { - shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(), - ExtendedSequenceNumber.LATEST); + public void testMergeChildWhereOneParentHasLeaseAndBlockOnParent() throws Exception { + testMergeChildWhereOneParentHasLease(true); + } - when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END); - Lease heldLease = LeaseHelper.createLease("shardId-0", "leaseOwner", ImmutableList.of("parent1", "parent2")); - when(leaseCoordinator.getCurrentlyHeldLease("shardId-0")).thenReturn(heldLease); - when(leaseCoordinator.getCurrentlyHeldLease("shardId-1")) - .thenReturn(null, null, null, null, null, null, null, null, null, null, null); - when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher); - when(leaseCoordinator.updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString())).thenReturn(true); - when(leaseRefresher.getLease("shardId-0")).thenReturn(heldLease); - // Return null lease first time to simulate partial parent lease info - when(leaseRefresher.getLease("shardId-1")) - .thenReturn(null, null, null, null, null, null, null, null, null, null, null); + private void testMergeChildWhereOneParentHasLease(final boolean blockOnParent) throws Exception { + // the @Before setup makes the `SHARD_ID` parent accessible + final ChildShard mergeChild = constructChildFromMerge(); + final TaskResult result = createShutdownTaskSpy(blockOnParent, Collections.singletonList(mergeChild)).call(); - // Make first 10 attempts with partial parent info in lease table - for (int i = 0; i < 10; i++) { - ShutdownTask task = spy(new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, - SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, - ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, - hierarchicalShardSyncer, NULL_METRICS_FACTORY, constructChildShard(), streamIdentifier, leaseCleanupManager)); - when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(false); - TaskResult result = task.call(); + if (blockOnParent) { assertNotNull(result.getException()); - assertTrue(result.getException() instanceof BlockedOnParentShardException); - assertTrue(result.getException().getMessage().contains("has partial parent information in lease table")); + assertEquals(BlockedOnParentShardException.class, result.getException().getClass()); + + verify(leaseCoordinator, never()).dropLease(any(Lease.class)); + verify(shardRecordProcessor, never()).leaseLost(any(LeaseLostInput.class)); verify(recordsPublisher, never()).shutdown(); - verify(shardRecordProcessor, never()) - .shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); - verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build()); - verify(leaseCoordinator, never()) - .updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString()); - verify(leaseRefresher, never()).createLeaseIfNotExists(Matchers.any(Lease.class)); - verify(task, times(1)).isOneInNProbability(RETRY_RANDOM_MAX_RANGE); - verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class)); - verify(leaseCleanupManager, never()).enqueueForDeletion(any(LeasePendingDeletion.class)); + } else { + assertNull(result.getException()); + + // verify that only the accessible parent was dropped + final ArgumentCaptor leaseCaptor = ArgumentCaptor.forClass(Lease.class); + verify(leaseCoordinator).dropLease(leaseCaptor.capture()); + assertEquals(SHARD_ID, leaseCaptor.getValue().leaseKey()); + + verify(shardRecordProcessor).leaseLost(any(LeaseLostInput.class)); + verify(recordsPublisher).shutdown(); } - // make final attempt with incomplete parent info in lease table - ShutdownTask task = spy(new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, - SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, - ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, - hierarchicalShardSyncer, NULL_METRICS_FACTORY, constructChildShard(), streamIdentifier, leaseCleanupManager)); - when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(true); - TaskResult result = task.call(); - assertNull(result.getException()); - verify(recordsPublisher).shutdown(); - verify(shardRecordProcessor, never()).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); - verify(shardRecordProcessor).leaseLost(LeaseLostInput.builder().build()); - verify(leaseRefresher, never()).updateLeaseWithMetaInfo(Matchers.any(Lease.class), Matchers.any(UpdateField.class)); - verify(leaseRefresher, never()).createLeaseIfNotExists(Matchers.any(Lease.class)); - verify(task, times(1)).isOneInNProbability(RETRY_RANDOM_MAX_RANGE); - verify(leaseCoordinator).dropLease(Matchers.any(Lease.class)); + // verify that an attempt was made to retrieve both parents + final ArgumentCaptor leaseKeyCaptor = ArgumentCaptor.forClass(String.class); + verify(leaseRefresher, times(mergeChild.parentShards().size())).getLease(leaseKeyCaptor.capture()); + assertEquals(mergeChild.parentShards(), leaseKeyCaptor.getAllValues()); + verify(leaseCleanupManager, never()).enqueueForDeletion(any(LeasePendingDeletion.class)); + verify(leaseRefresher, never()).updateLeaseWithMetaInfo(any(Lease.class), any(UpdateField.class)); + verify(leaseRefresher, never()).createLeaseIfNotExists(any(Lease.class)); + verify(shardRecordProcessor, never()).shardEnded(any(ShardEndedInput.class)); + } + + @Test + public final void testMergeChildWhereBothParentsHaveLeases() throws Exception { + // the @Before test setup makes the `SHARD_ID` parent accessible + final ChildShard mergeChild = constructChildFromMerge(); + // make second parent accessible + setupLease(mergeChild.parentShards().get(1), Collections.emptyList()); + + final Lease mockChildLease = mock(Lease.class); + when(hierarchicalShardSyncer.createLeaseForChildShard(mergeChild, STREAM_IDENTIFIER)) + .thenReturn(mockChildLease); + + final TaskResult result = createShutdownTask(SHARD_END, Collections.singletonList(mergeChild)).call(); + + assertNull(result.getException()); + verify(leaseCleanupManager).enqueueForDeletion(any(LeasePendingDeletion.class)); + + final ArgumentCaptor updateLeaseCaptor = ArgumentCaptor.forClass(Lease.class); + verify(leaseRefresher).updateLeaseWithMetaInfo(updateLeaseCaptor.capture(), eq(UpdateField.CHILD_SHARDS)); + final Lease updatedLease = updateLeaseCaptor.getValue(); + assertEquals(SHARD_ID, updatedLease.leaseKey()); + assertEquals(Collections.singleton(mergeChild.shardId()), updatedLease.childShardIds()); + + verify(leaseRefresher).createLeaseIfNotExists(mockChildLease); + + // verify all parent+child leases were retrieved + final Set expectedShardIds = new HashSet<>(mergeChild.parentShards()); + expectedShardIds.add(mergeChild.shardId()); + final ArgumentCaptor leaseKeyCaptor = ArgumentCaptor.forClass(String.class); + verify(leaseRefresher, atLeast(expectedShardIds.size())).getLease(leaseKeyCaptor.capture()); + assertEquals(expectedShardIds, new HashSet<>(leaseKeyCaptor.getAllValues())); + + verifyShutdownAndNoDrop(); + verify(shardRecordProcessor).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); } /** @@ -319,25 +270,15 @@ public class ShutdownTaskTest { */ @Test public final void testCallWhenShardNotFound() throws Exception { - final Lease heldLease = LeaseHelper.createLease("shardId-4", "leaseOwner", Collections.emptyList()); - shardInfo = new ShardInfo("shardId-4", concurrencyToken, Collections.emptySet(), - ExtendedSequenceNumber.LATEST); - task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, - SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, - ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, - hierarchicalShardSyncer, NULL_METRICS_FACTORY, new ArrayList<>(), streamIdentifier, leaseCleanupManager); + final Lease lease = setupLease("shardId-4", Collections.emptyList()); + final ShardInfo shardInfo = new ShardInfo(lease.leaseKey(), "concurrencyToken", Collections.emptySet(), + ExtendedSequenceNumber.LATEST); - when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END); - when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher); - when(leaseRefresher.getLease("shardId-4")).thenReturn(heldLease); - when(leaseCoordinator.getCurrentlyHeldLease("shardId-4")).thenReturn(heldLease); + final TaskResult result = createShutdownTask(SHARD_END, Collections.emptyList(), shardInfo).call(); - final TaskResult result = task.call(); assertNull(result.getException()); - verify(recordsPublisher).shutdown(); - verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build()); - verify(leaseRefresher, never()).createLeaseIfNotExists(Matchers.any(Lease.class)); - verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class)); + verifyShutdownAndNoDrop(); + verify(leaseRefresher, never()).createLeaseIfNotExists(any(Lease.class)); } /** @@ -346,14 +287,8 @@ public class ShutdownTaskTest { */ @Test public final void testCallWhenLeaseLost() throws DependencyException, InvalidStateException, ProvisionedThroughputException { - shardInfo = new ShardInfo("shardId-4", concurrencyToken, Collections.emptySet(), - ExtendedSequenceNumber.LATEST); - task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, - LEASE_LOST_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, - ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, - hierarchicalShardSyncer, NULL_METRICS_FACTORY, new ArrayList<>(), streamIdentifier, leaseCleanupManager); + final TaskResult result = createShutdownTask(LEASE_LOST, Collections.emptyList()).call(); - final TaskResult result = task.call(); assertNull(result.getException()); verify(recordsPublisher).shutdown(); verify(shardRecordProcessor, never()).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); @@ -362,6 +297,17 @@ public class ShutdownTaskTest { verify(leaseRefresher, never()).createLeaseIfNotExists(any(Lease.class)); verify(leaseCoordinator, never()).dropLease(any(Lease.class)); } + + @Test + public void testNullChildShards() throws Exception { + final TaskResult result = createShutdownTask(SHARD_END, null).call(); + + assertNull(result.getException()); + verifyShutdownAndNoDrop(); + verify(leaseCleanupManager).enqueueForDeletion(any(LeasePendingDeletion.class)); + verify(leaseRefresher, never()).createLeaseIfNotExists(any(Lease.class)); + } + /** * Test method for {@link ShutdownTask#taskType()}. */ @@ -370,10 +316,24 @@ public class ShutdownTaskTest { assertEquals(TaskType.SHUTDOWN, task.taskType()); } - private List constructChildShards() { - List childShards = new ArrayList<>(); - List parentShards = new ArrayList<>(); - parentShards.add(shardId); + private void verifyShutdownAndNoDrop() { + verify(recordsPublisher).shutdown(); + verify(leaseCoordinator, never()).dropLease(any(Lease.class)); + verify(shardRecordProcessor, never()).leaseLost(any(LeaseLostInput.class)); + } + + private Lease setupLease(final String leaseKey, final Collection parentShardIds) throws Exception { + final Lease lease = LeaseHelper.createLease(leaseKey, "leaseOwner", parentShardIds); + when(leaseCoordinator.getCurrentlyHeldLease(lease.leaseKey())).thenReturn(lease); + when(leaseRefresher.getLease(lease.leaseKey())).thenReturn(lease); + return lease; + } + + /** + * Constructs two {@link ChildShard}s that mimic a shard split operation. + */ + private List constructChildrenFromSplit() { + List parentShards = Collections.singletonList(SHARD_ID); ChildShard leftChild = ChildShard.builder() .shardId("ShardId-1") .parentShards(parentShards) @@ -384,22 +344,36 @@ public class ShutdownTaskTest { .parentShards(parentShards) .hashKeyRange(ShardObjectHelper.newHashKeyRange("50", "99")) .build(); - childShards.add(leftChild); - childShards.add(rightChild); - return childShards; + return Arrays.asList(leftChild, rightChild); } - private List constructChildShard() { - List childShards = new ArrayList<>(); - List parentShards = new ArrayList<>(); - parentShards.add(shardId); - parentShards.add("shardId-1"); - ChildShard leftChild = ChildShard.builder() + /** + * Constructs a {@link ChildShard} that mimics a shard merge operation. + */ + private ChildShard constructChildFromMerge() { + List parentShards = Arrays.asList(SHARD_ID, "shardId-1"); + return ChildShard.builder() .shardId("shardId-2") .parentShards(parentShards) .hashKeyRange(ShardObjectHelper.newHashKeyRange("0", "49")) .build(); - childShards.add(leftChild); - return childShards; + } + + private ShutdownTask createShutdownTaskSpy(final boolean blockOnParent, final List childShards) { + final ShutdownTask spy = spy(createShutdownTask(SHARD_END, childShards)); + when(spy.isOneInNProbability(ShutdownTask.RETRY_RANDOM_MAX_RANGE)).thenReturn(!blockOnParent); + return spy; + } + + private ShutdownTask createShutdownTask(final ShutdownReason reason, final List childShards) { + return createShutdownTask(reason, childShards, SHARD_INFO); + } + + private ShutdownTask createShutdownTask(final ShutdownReason reason, final List childShards, + final ShardInfo shardInfo) { + return new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, + reason, INITIAL_POSITION_TRIM_HORIZON, false, false, + leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, hierarchicalShardSyncer, + NULL_METRICS_FACTORY, childShards, STREAM_IDENTIFIER, leaseCleanupManager); } }