change the retry logic

This commit is contained in:
Chunxue Yang 2020-07-21 11:08:56 -07:00
parent 5bf24bda43
commit 8a296a5aa3
2 changed files with 79 additions and 32 deletions

View file

@ -35,6 +35,7 @@ import com.google.common.annotations.VisibleForTesting;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -46,7 +47,8 @@ class ShutdownTask implements ITask {
private static final Log LOG = LogFactory.getLog(ShutdownTask.class); private static final Log LOG = LogFactory.getLog(ShutdownTask.class);
private static final String RECORD_PROCESSOR_SHUTDOWN_METRIC = "RecordProcessor.shutdown"; private static final String RECORD_PROCESSOR_SHUTDOWN_METRIC = "RecordProcessor.shutdown";
private int retryLeftForValidParentState = 10; @VisibleForTesting
static final int RETRY_RANDOM_MAX_RANGE = 10;
private final ShardInfo shardInfo; private final ShardInfo shardInfo;
private final IRecordProcessor recordProcessor; private final IRecordProcessor recordProcessor;
@ -208,7 +210,7 @@ class ShutdownTask implements ITask {
boolean isValidLeaseTableState = Objects.isNull(leaseCoordinator.getLeaseManager().getLease(parentLeaseKeys.get(0))) == boolean isValidLeaseTableState = Objects.isNull(leaseCoordinator.getLeaseManager().getLease(parentLeaseKeys.get(0))) ==
Objects.isNull(leaseCoordinator.getLeaseManager().getLease(parentLeaseKeys.get(1))); Objects.isNull(leaseCoordinator.getLeaseManager().getLease(parentLeaseKeys.get(1)));
if (!isValidLeaseTableState) { if (!isValidLeaseTableState) {
if(--retryLeftForValidParentState >= 0) { if(!isOneInNProbability(RETRY_RANDOM_MAX_RANGE)) {
throw new BlockedOnParentShardException( throw new BlockedOnParentShardException(
"Shard " + shardInfo.getShardId() + "'s only child shard " + childShard "Shard " + shardInfo.getShardId() + "'s only child shard " + childShard
+ " has partial parent information in lease table. Hence deferring lease creation of child shard."); + " has partial parent information in lease table. Hence deferring lease creation of child shard.");
@ -230,6 +232,15 @@ class ShutdownTask implements ITask {
} }
} }
/**
* Returns true for 1 in N probability.
*/
@VisibleForTesting
boolean isOneInNProbability(int n) {
Random r = new Random();
return 1 == r.nextInt((n - 1) + 1) + 1;
}
private void updateCurrentLeaseWithChildShards(KinesisClientLease currentLease) throws DependencyException, InvalidStateException, ProvisionedThroughputException { private void updateCurrentLeaseWithChildShards(KinesisClientLease currentLease) throws DependencyException, InvalidStateException, ProvisionedThroughputException {
final Set<String> childShardIds = childShards.stream().map(ChildShard::getShardId).collect(Collectors.toSet()); final Set<String> childShardIds = childShards.stream().map(ChildShard::getShardId).collect(Collectors.toSet());
currentLease.setChildShardIds(childShardIds); currentLease.setChildShardIds(childShardIds);

View file

@ -22,6 +22,7 @@ import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never; import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times; import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -60,6 +61,7 @@ import com.amazonaws.services.kinesis.leases.interfaces.ILeaseManager;
import org.junit.runner.RunWith; import org.junit.runner.RunWith;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner; import org.mockito.runners.MockitoJUnitRunner;
import static com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownTask.RETRY_RANDOM_MAX_RANGE;
/** /**
* *
@ -204,7 +206,9 @@ public class ShutdownTaskTest {
when(leaseManager.getLease(defaultShardId)).thenReturn(currentLease); when(leaseManager.getLease(defaultShardId)).thenReturn(currentLease);
when(leaseManager.getLease("ShardId-1")).thenReturn(null, null, null, null, null, adjacentParentLease); when(leaseManager.getLease("ShardId-1")).thenReturn(null, null, null, null, null, adjacentParentLease);
ShutdownTask task = new ShutdownTask(defaultShardInfo, // Make first 5 attempts with partial parent info in lease table
for (int i = 0; i < 5; i++) {
ShutdownTask task = spy(new ShutdownTask(defaultShardInfo,
defaultRecordProcessor, defaultRecordProcessor,
checkpointer, checkpointer,
ShutdownReason.TERMINATE, ShutdownReason.TERMINATE,
@ -217,21 +221,36 @@ public class ShutdownTaskTest {
getRecordsCache, getRecordsCache,
shardSyncer, shardSyncer,
shardSyncStrategy, shardSyncStrategy,
constructMergeChildShards()); constructMergeChildShards()));
when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(false);
// Make first 5 attempts with partial parent info in lease table
for (int i = 0; i < 5; i++) {
TaskResult result = task.call(); TaskResult result = task.call();
assertNotNull(result.getException()); assertNotNull(result.getException());
assertTrue(result.getException() instanceof BlockedOnParentShardException); assertTrue(result.getException() instanceof BlockedOnParentShardException);
assertTrue(result.getException().getMessage().contains("has partial parent information in lease table")); assertTrue(result.getException().getMessage().contains("has partial parent information in lease table"));
verify(task, times(1)).isOneInNProbability(RETRY_RANDOM_MAX_RANGE);
verify(getRecordsCache, never()).shutdown(); verify(getRecordsCache, never()).shutdown();
verify(defaultRecordProcessor, never()).shutdown(any(ShutdownInput.class)); verify(defaultRecordProcessor, never()).shutdown(any(ShutdownInput.class));
} }
// Make next attempt with complete parent info in lease table // Make next attempt with complete parent info in lease table
ShutdownTask task = spy(new ShutdownTask(defaultShardInfo,
defaultRecordProcessor,
checkpointer,
ShutdownReason.TERMINATE,
kinesisProxy,
INITIAL_POSITION_TRIM_HORIZON,
cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards,
leaseCoordinator,
TASK_BACKOFF_TIME_MILLIS,
getRecordsCache,
shardSyncer,
shardSyncStrategy,
constructMergeChildShards()));
when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(false);
TaskResult result = task.call(); TaskResult result = task.call();
assertNull(result.getException()); assertNull(result.getException());
verify(task, never()).isOneInNProbability(RETRY_RANDOM_MAX_RANGE);
verify(getRecordsCache).shutdown(); verify(getRecordsCache).shutdown();
verify(defaultRecordProcessor).shutdown(any(ShutdownInput.class)); verify(defaultRecordProcessor).shutdown(any(ShutdownInput.class));
verify(leaseCoordinator, never()).dropLease(currentLease); verify(leaseCoordinator, never()).dropLease(currentLease);
@ -250,7 +269,8 @@ public class ShutdownTaskTest {
when(leaseManager.getLease(defaultShardId)).thenReturn(currentLease); when(leaseManager.getLease(defaultShardId)).thenReturn(currentLease);
when(leaseManager.getLease("ShardId-1")).thenReturn(null, null, null, null, null, null, null, null, null, null, null); when(leaseManager.getLease("ShardId-1")).thenReturn(null, null, null, null, null, null, null, null, null, null, null);
ShutdownTask task = new ShutdownTask(defaultShardInfo, for (int i = 0; i < 10; i++) {
ShutdownTask task = spy(new ShutdownTask(defaultShardInfo,
defaultRecordProcessor, defaultRecordProcessor,
checkpointer, checkpointer,
ShutdownReason.TERMINATE, ShutdownReason.TERMINATE,
@ -263,19 +283,35 @@ public class ShutdownTaskTest {
getRecordsCache, getRecordsCache,
shardSyncer, shardSyncer,
shardSyncStrategy, shardSyncStrategy,
constructMergeChildShards()); constructMergeChildShards()));
when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(false);
for (int i = 0; i < 10; i++) {
TaskResult result = task.call(); TaskResult result = task.call();
assertNotNull(result.getException()); assertNotNull(result.getException());
assertTrue(result.getException() instanceof BlockedOnParentShardException); assertTrue(result.getException() instanceof BlockedOnParentShardException);
assertTrue(result.getException().getMessage().contains("has partial parent information in lease table")); assertTrue(result.getException().getMessage().contains("has partial parent information in lease table"));
verify(task, times(1)).isOneInNProbability(RETRY_RANDOM_MAX_RANGE);
verify(getRecordsCache, never()).shutdown(); verify(getRecordsCache, never()).shutdown();
verify(defaultRecordProcessor, never()).shutdown(any(ShutdownInput.class)); verify(defaultRecordProcessor, never()).shutdown(any(ShutdownInput.class));
} }
ShutdownTask task = spy(new ShutdownTask(defaultShardInfo,
defaultRecordProcessor,
checkpointer,
ShutdownReason.TERMINATE,
kinesisProxy,
INITIAL_POSITION_TRIM_HORIZON,
cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards,
leaseCoordinator,
TASK_BACKOFF_TIME_MILLIS,
getRecordsCache,
shardSyncer,
shardSyncStrategy,
constructMergeChildShards()));
when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(true);
TaskResult result = task.call(); TaskResult result = task.call();
assertNull(result.getException()); assertNull(result.getException());
verify(task, times(1)).isOneInNProbability(RETRY_RANDOM_MAX_RANGE);
verify(getRecordsCache).shutdown(); verify(getRecordsCache).shutdown();
verify(defaultRecordProcessor).shutdown(any(ShutdownInput.class)); verify(defaultRecordProcessor).shutdown(any(ShutdownInput.class));
verify(leaseCoordinator).dropLease(currentLease); verify(leaseCoordinator).dropLease(currentLease);