diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index 208498bc..6b4d1839 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -52,6 +52,7 @@ import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; +import java.util.Random; import java.util.Set; import java.util.function.Function; import java.util.stream.Collectors; @@ -65,6 +66,8 @@ import java.util.stream.Collectors; public class ShutdownTask implements ConsumerTask { private static final String SHUTDOWN_TASK_OPERATION = "ShutdownTask"; private static final String RECORD_PROCESSOR_SHUTDOWN_METRIC = "RecordProcessor.shutdown"; + @VisibleForTesting + static final int RETRY_RANDOM_MAX_RANGE = 10; @NonNull private final ShardInfo shardInfo; @@ -99,7 +102,6 @@ public class ShutdownTask implements ConsumerTask { private final LeaseCleanupManager leaseCleanupManager; private static final Function leaseKeyProvider = shardInfo -> ShardInfo.getLeaseKey(shardInfo); - private int retryLeftForValidParentState = 10; /* * Invokes ShardRecordProcessor shutdown() API. @@ -253,7 +255,7 @@ public class ShutdownTask implements ConsumerTask { Objects.isNull(leaseCoordinator.leaseRefresher().getLease(parentLeaseKeys.get(0))) == Objects .isNull(leaseCoordinator.leaseRefresher().getLease(parentLeaseKeys.get(1))); if (!isValidLeaseTableState) { - if (--retryLeftForValidParentState >= 0) { + if (!isOneInNProbability(RETRY_RANDOM_MAX_RANGE)) { throw new BlockedOnParentShardException( "Shard " + shardInfo.shardId() + "'s only child shard " + childShard + " has partial parent information in lease table. Hence deferring lease creation of child shard."); @@ -276,6 +278,15 @@ public class ShutdownTask implements ConsumerTask { } } + /** + * Returns true for 1 in N probability. + */ + @VisibleForTesting + boolean isOneInNProbability(int n) { + Random r = new Random(); + return 1 == r.nextInt((n - 1) + 1) + 1; + } + private void updateLeaseWithChildShards(Lease currentLease) throws DependencyException, InvalidStateException, ProvisionedThroughputException { Set childShardIds = childShards.stream().map(ChildShard::shardId).collect(Collectors.toSet()); diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java index dc5d9763..688bd199 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java @@ -20,11 +20,14 @@ import static org.junit.Assert.assertNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static software.amazon.kinesis.lifecycle.ShutdownTask.RETRY_RANDOM_MAX_RANGE; import java.util.ArrayList; import java.util.Collections; @@ -201,11 +204,6 @@ public class ShutdownTaskTest { public final void testCallThrowsUntilParentInfoNotPresentInLease() throws DependencyException, InvalidStateException, ProvisionedThroughputException { shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(), ExtendedSequenceNumber.LATEST); - task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, - SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, - ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, - hierarchicalShardSyncer, NULL_METRICS_FACTORY, constructChildShard(), streamIdentifier, leaseCleanupManager); - when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END); Lease heldLease = LeaseHelper.createLease("shardId-0", "leaseOwner", ImmutableList.of("parent1", "parent2")); Lease parentLease = LeaseHelper.createLease("shardId-1", "leaseOwner", Collections.emptyList()); @@ -221,6 +219,11 @@ public class ShutdownTaskTest { // Make first 5 attempts with partial parent info in lease table for (int i = 0; i < 5; i++) { + ShutdownTask task = spy(new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, + SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, + ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, + hierarchicalShardSyncer, NULL_METRICS_FACTORY, constructChildShard(), streamIdentifier, leaseCleanupManager)); + when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(false); TaskResult result = task.call(); assertNotNull(result.getException()); assertTrue(result.getException() instanceof BlockedOnParentShardException); @@ -232,11 +235,17 @@ public class ShutdownTaskTest { verify(leaseCoordinator, never()) .updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString()); verify(leaseRefresher, never()).createLeaseIfNotExists(Matchers.any(Lease.class)); + verify(task, times(1)).isOneInNProbability(RETRY_RANDOM_MAX_RANGE); verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class)); verify(leaseCleanupManager, never()).enqueueForDeletion(any(LeasePendingDeletion.class)); } // make next attempt with complete parent info in lease table + ShutdownTask task = spy(new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, + SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, + ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, + hierarchicalShardSyncer, NULL_METRICS_FACTORY, constructChildShard(), streamIdentifier, leaseCleanupManager)); + when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(false); TaskResult result = task.call(); assertNull(result.getException()); verify(recordsPublisher).shutdown(); @@ -244,6 +253,7 @@ public class ShutdownTaskTest { verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build()); verify(leaseRefresher).updateLeaseWithMetaInfo(Matchers.any(Lease.class), Matchers.any(UpdateField.class)); verify(leaseRefresher, times(1)).createLeaseIfNotExists(Matchers.any(Lease.class)); + verify(task, never()).isOneInNProbability(RETRY_RANDOM_MAX_RANGE); verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class)); verify(leaseCleanupManager, times(1)).enqueueForDeletion(any(LeasePendingDeletion.class)); } @@ -252,10 +262,6 @@ public class ShutdownTaskTest { public final void testCallTriggersLeaseLossWhenParentInfoNotPresentInLease() throws DependencyException, InvalidStateException, ProvisionedThroughputException { shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(), ExtendedSequenceNumber.LATEST); - task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, - SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, - ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, - hierarchicalShardSyncer, NULL_METRICS_FACTORY, constructChildShard(), streamIdentifier, leaseCleanupManager); when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END); Lease heldLease = LeaseHelper.createLease("shardId-0", "leaseOwner", ImmutableList.of("parent1", "parent2")); @@ -272,6 +278,11 @@ public class ShutdownTaskTest { // Make first 10 attempts with partial parent info in lease table for (int i = 0; i < 10; i++) { + ShutdownTask task = spy(new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, + SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, + ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, + hierarchicalShardSyncer, NULL_METRICS_FACTORY, constructChildShard(), streamIdentifier, leaseCleanupManager)); + when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(false); TaskResult result = task.call(); assertNotNull(result.getException()); assertTrue(result.getException() instanceof BlockedOnParentShardException); @@ -283,11 +294,17 @@ public class ShutdownTaskTest { verify(leaseCoordinator, never()) .updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString()); verify(leaseRefresher, never()).createLeaseIfNotExists(Matchers.any(Lease.class)); + verify(task, times(1)).isOneInNProbability(RETRY_RANDOM_MAX_RANGE); verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class)); verify(leaseCleanupManager, never()).enqueueForDeletion(any(LeasePendingDeletion.class)); } // make final attempt with incomplete parent info in lease table + ShutdownTask task = spy(new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, + SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, + ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, + hierarchicalShardSyncer, NULL_METRICS_FACTORY, constructChildShard(), streamIdentifier, leaseCleanupManager)); + when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(true); TaskResult result = task.call(); assertNull(result.getException()); verify(recordsPublisher).shutdown(); @@ -295,6 +312,7 @@ public class ShutdownTaskTest { verify(shardRecordProcessor).leaseLost(LeaseLostInput.builder().build()); verify(leaseRefresher, never()).updateLeaseWithMetaInfo(Matchers.any(Lease.class), Matchers.any(UpdateField.class)); verify(leaseRefresher, never()).createLeaseIfNotExists(Matchers.any(Lease.class)); + verify(task, times(1)).isOneInNProbability(RETRY_RANDOM_MAX_RANGE); verify(leaseCoordinator).dropLease(Matchers.any(Lease.class)); verify(leaseCleanupManager, never()).enqueueForDeletion(any(LeasePendingDeletion.class)); }