From 8a296a5aa3c50a012f3a1ad27bea1f4bf4ffe65e Mon Sep 17 00:00:00 2001 From: Chunxue Yang Date: Tue, 21 Jul 2020 11:08:56 -0700 Subject: [PATCH] change the retry logic --- .../lib/worker/ShutdownTask.java | 15 ++- .../lib/worker/ShutdownTaskTest.java | 96 +++++++++++++------ 2 files changed, 79 insertions(+), 32 deletions(-) diff --git a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java index ed9de7e2..9af4645f 100644 --- a/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java +++ b/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTask.java @@ -35,6 +35,7 @@ import com.google.common.annotations.VisibleForTesting; import java.util.List; import java.util.Objects; +import java.util.Random; import java.util.Set; import java.util.stream.Collectors; @@ -46,7 +47,8 @@ class ShutdownTask implements ITask { private static final Log LOG = LogFactory.getLog(ShutdownTask.class); private static final String RECORD_PROCESSOR_SHUTDOWN_METRIC = "RecordProcessor.shutdown"; - private int retryLeftForValidParentState = 10; + @VisibleForTesting + static final int RETRY_RANDOM_MAX_RANGE = 10; private final ShardInfo shardInfo; private final IRecordProcessor recordProcessor; @@ -208,7 +210,7 @@ class ShutdownTask implements ITask { boolean isValidLeaseTableState = Objects.isNull(leaseCoordinator.getLeaseManager().getLease(parentLeaseKeys.get(0))) == Objects.isNull(leaseCoordinator.getLeaseManager().getLease(parentLeaseKeys.get(1))); if (!isValidLeaseTableState) { - if(--retryLeftForValidParentState >= 0) { + if(!isOneInNProbability(RETRY_RANDOM_MAX_RANGE)) { throw new BlockedOnParentShardException( "Shard " + shardInfo.getShardId() + "'s only child shard " + childShard + " has partial parent information in lease table. Hence deferring lease creation of child shard."); @@ -230,6 +232,15 @@ class ShutdownTask implements ITask { } } + /** + * Returns true for 1 in N probability. + */ + @VisibleForTesting + boolean isOneInNProbability(int n) { + Random r = new Random(); + return 1 == r.nextInt((n - 1) + 1) + 1; + } + private void updateCurrentLeaseWithChildShards(KinesisClientLease currentLease) throws DependencyException, InvalidStateException, ProvisionedThroughputException { final Set childShardIds = childShards.stream().map(ChildShard::getShardId).collect(Collectors.toSet()); currentLease.setChildShardIds(childShardIds); diff --git a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java index 6400b839..d9226f44 100644 --- a/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java +++ b/src/test/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ShutdownTaskTest.java @@ -22,6 +22,7 @@ import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -60,6 +61,7 @@ import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager; import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; +import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownTask.RETRY_RANDOM_MAX_RANGE; /** * @@ -204,34 +206,51 @@ public class ShutdownTaskTest { when(leaseManager.getLease(defaultShardId)).thenReturn(currentLease); when(leaseManager.getLease("ShardId-1")).thenReturn(null, null, null, null, null, adjacentParentLease); - ShutdownTask task = new ShutdownTask(defaultShardInfo, - defaultRecordProcessor, - checkpointer, - ShutdownReason.TERMINATE, - kinesisProxy, - INITIAL_POSITION_TRIM_HORIZON, - cleanupLeasesOfCompletedShards, - ignoreUnexpectedChildShards, - leaseCoordinator, - TASK_BACKOFF_TIME_MILLIS, - getRecordsCache, - shardSyncer, - shardSyncStrategy, - constructMergeChildShards()); - // Make first 5 attempts with partial parent info in lease table for (int i = 0; i < 5; i++) { + ShutdownTask task = spy(new ShutdownTask(defaultShardInfo, + defaultRecordProcessor, + checkpointer, + ShutdownReason.TERMINATE, + kinesisProxy, + INITIAL_POSITION_TRIM_HORIZON, + cleanupLeasesOfCompletedShards, + ignoreUnexpectedChildShards, + leaseCoordinator, + TASK_BACKOFF_TIME_MILLIS, + getRecordsCache, + shardSyncer, + shardSyncStrategy, + constructMergeChildShards())); + when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(false); TaskResult result = task.call(); assertNotNull(result.getException()); assertTrue(result.getException() instanceof BlockedOnParentShardException); assertTrue(result.getException().getMessage().contains("has partial parent information in lease table")); + verify(task, times(1)).isOneInNProbability(RETRY_RANDOM_MAX_RANGE); verify(getRecordsCache, never()).shutdown(); verify(defaultRecordProcessor, never()).shutdown(any(ShutdownInput.class)); } // Make next attempt with complete parent info in lease table + ShutdownTask task = spy(new ShutdownTask(defaultShardInfo, + defaultRecordProcessor, + checkpointer, + ShutdownReason.TERMINATE, + kinesisProxy, + INITIAL_POSITION_TRIM_HORIZON, + cleanupLeasesOfCompletedShards, + ignoreUnexpectedChildShards, + leaseCoordinator, + TASK_BACKOFF_TIME_MILLIS, + getRecordsCache, + shardSyncer, + shardSyncStrategy, + constructMergeChildShards())); + when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(false); TaskResult result = task.call(); assertNull(result.getException()); + verify(task, never()).isOneInNProbability(RETRY_RANDOM_MAX_RANGE); verify(getRecordsCache).shutdown(); verify(defaultRecordProcessor).shutdown(any(ShutdownInput.class)); verify(leaseCoordinator, never()).dropLease(currentLease); @@ -250,32 +269,49 @@ public class ShutdownTaskTest { when(leaseManager.getLease(defaultShardId)).thenReturn(currentLease); when(leaseManager.getLease("ShardId-1")).thenReturn(null, null, null, null, null, null, null, null, null, null, null); - ShutdownTask task = new ShutdownTask(defaultShardInfo, - defaultRecordProcessor, - checkpointer, - ShutdownReason.TERMINATE, - kinesisProxy, - INITIAL_POSITION_TRIM_HORIZON, - cleanupLeasesOfCompletedShards, - ignoreUnexpectedChildShards, - leaseCoordinator, - TASK_BACKOFF_TIME_MILLIS, - getRecordsCache, - shardSyncer, - shardSyncStrategy, - constructMergeChildShards()); - for (int i = 0; i < 10; i++) { + ShutdownTask task = spy(new ShutdownTask(defaultShardInfo, + defaultRecordProcessor, + checkpointer, + ShutdownReason.TERMINATE, + kinesisProxy, + INITIAL_POSITION_TRIM_HORIZON, + cleanupLeasesOfCompletedShards, + ignoreUnexpectedChildShards, + leaseCoordinator, + TASK_BACKOFF_TIME_MILLIS, + getRecordsCache, + shardSyncer, + shardSyncStrategy, + constructMergeChildShards())); + when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(false); TaskResult result = task.call(); assertNotNull(result.getException()); assertTrue(result.getException() instanceof BlockedOnParentShardException); assertTrue(result.getException().getMessage().contains("has partial parent information in lease table")); + verify(task, times(1)).isOneInNProbability(RETRY_RANDOM_MAX_RANGE); verify(getRecordsCache, never()).shutdown(); verify(defaultRecordProcessor, never()).shutdown(any(ShutdownInput.class)); } + ShutdownTask task = spy(new ShutdownTask(defaultShardInfo, + defaultRecordProcessor, + checkpointer, + ShutdownReason.TERMINATE, + kinesisProxy, + INITIAL_POSITION_TRIM_HORIZON, + cleanupLeasesOfCompletedShards, + ignoreUnexpectedChildShards, + leaseCoordinator, + TASK_BACKOFF_TIME_MILLIS, + getRecordsCache, + shardSyncer, + shardSyncStrategy, + constructMergeChildShards())); + when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(true); TaskResult result = task.call(); assertNull(result.getException()); + verify(task, times(1)).isOneInNProbability(RETRY_RANDOM_MAX_RANGE); verify(getRecordsCache).shutdown(); verify(defaultRecordProcessor).shutdown(any(ShutdownInput.class)); verify(leaseCoordinator).dropLease(currentLease);