Merge pull request #77 from ashwing/ltr_1_shutdown_probability_based_exception
Breaking loop on ShutdownException using 10-faced dice
This commit is contained in:
commit
afd0f7c08a
2 changed files with 40 additions and 11 deletions
|
|
@ -52,6 +52,7 @@ import software.amazon.kinesis.processor.ShardRecordProcessor;
|
||||||
import software.amazon.kinesis.retrieval.RecordsPublisher;
|
import software.amazon.kinesis.retrieval.RecordsPublisher;
|
||||||
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
|
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
|
||||||
|
|
||||||
|
import java.util.Random;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
@ -65,6 +66,8 @@ import java.util.stream.Collectors;
|
||||||
public class ShutdownTask implements ConsumerTask {
|
public class ShutdownTask implements ConsumerTask {
|
||||||
private static final String SHUTDOWN_TASK_OPERATION = "ShutdownTask";
|
private static final String SHUTDOWN_TASK_OPERATION = "ShutdownTask";
|
||||||
private static final String RECORD_PROCESSOR_SHUTDOWN_METRIC = "RecordProcessor.shutdown";
|
private static final String RECORD_PROCESSOR_SHUTDOWN_METRIC = "RecordProcessor.shutdown";
|
||||||
|
@VisibleForTesting
|
||||||
|
static final int RETRY_RANDOM_MAX_RANGE = 10;
|
||||||
|
|
||||||
@NonNull
|
@NonNull
|
||||||
private final ShardInfo shardInfo;
|
private final ShardInfo shardInfo;
|
||||||
|
|
@ -99,7 +102,6 @@ public class ShutdownTask implements ConsumerTask {
|
||||||
private final LeaseCleanupManager leaseCleanupManager;
|
private final LeaseCleanupManager leaseCleanupManager;
|
||||||
|
|
||||||
private static final Function<ShardInfo, String> leaseKeyProvider = shardInfo -> ShardInfo.getLeaseKey(shardInfo);
|
private static final Function<ShardInfo, String> leaseKeyProvider = shardInfo -> ShardInfo.getLeaseKey(shardInfo);
|
||||||
private int retryLeftForValidParentState = 10;
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Invokes ShardRecordProcessor shutdown() API.
|
* Invokes ShardRecordProcessor shutdown() API.
|
||||||
|
|
@ -253,7 +255,7 @@ public class ShutdownTask implements ConsumerTask {
|
||||||
Objects.isNull(leaseCoordinator.leaseRefresher().getLease(parentLeaseKeys.get(0))) == Objects
|
Objects.isNull(leaseCoordinator.leaseRefresher().getLease(parentLeaseKeys.get(0))) == Objects
|
||||||
.isNull(leaseCoordinator.leaseRefresher().getLease(parentLeaseKeys.get(1)));
|
.isNull(leaseCoordinator.leaseRefresher().getLease(parentLeaseKeys.get(1)));
|
||||||
if (!isValidLeaseTableState) {
|
if (!isValidLeaseTableState) {
|
||||||
if (--retryLeftForValidParentState >= 0) {
|
if (!isOneInNProbability(RETRY_RANDOM_MAX_RANGE)) {
|
||||||
throw new BlockedOnParentShardException(
|
throw new BlockedOnParentShardException(
|
||||||
"Shard " + shardInfo.shardId() + "'s only child shard " + childShard
|
"Shard " + shardInfo.shardId() + "'s only child shard " + childShard
|
||||||
+ " has partial parent information in lease table. Hence deferring lease creation of child shard.");
|
+ " has partial parent information in lease table. Hence deferring lease creation of child shard.");
|
||||||
|
|
@ -276,6 +278,15 @@ public class ShutdownTask implements ConsumerTask {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns true for 1 in N probability.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
boolean isOneInNProbability(int n) {
|
||||||
|
Random r = new Random();
|
||||||
|
return 1 == r.nextInt((n - 1) + 1) + 1;
|
||||||
|
}
|
||||||
|
|
||||||
private void updateLeaseWithChildShards(Lease currentLease)
|
private void updateLeaseWithChildShards(Lease currentLease)
|
||||||
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
||||||
Set<String> childShardIds = childShards.stream().map(ChildShard::shardId).collect(Collectors.toSet());
|
Set<String> childShardIds = childShards.stream().map(ChildShard::shardId).collect(Collectors.toSet());
|
||||||
|
|
|
||||||
|
|
@ -20,11 +20,14 @@ import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertThat;
|
import static org.junit.Assert.assertThat;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Matchers.anyInt;
|
||||||
import static org.mockito.Mockito.doNothing;
|
import static org.mockito.Mockito.doNothing;
|
||||||
import static org.mockito.Mockito.never;
|
import static org.mockito.Mockito.never;
|
||||||
|
import static org.mockito.Mockito.spy;
|
||||||
import static org.mockito.Mockito.times;
|
import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
import static software.amazon.kinesis.lifecycle.ShutdownTask.RETRY_RANDOM_MAX_RANGE;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
|
@ -201,11 +204,6 @@ public class ShutdownTaskTest {
|
||||||
public final void testCallThrowsUntilParentInfoNotPresentInLease() throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
public final void testCallThrowsUntilParentInfoNotPresentInLease() throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
||||||
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(),
|
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(),
|
||||||
ExtendedSequenceNumber.LATEST);
|
ExtendedSequenceNumber.LATEST);
|
||||||
task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
|
|
||||||
SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards,
|
|
||||||
ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
|
|
||||||
hierarchicalShardSyncer, NULL_METRICS_FACTORY, constructChildShard(), streamIdentifier, leaseCleanupManager);
|
|
||||||
|
|
||||||
when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
|
when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
|
||||||
Lease heldLease = LeaseHelper.createLease("shardId-0", "leaseOwner", ImmutableList.of("parent1", "parent2"));
|
Lease heldLease = LeaseHelper.createLease("shardId-0", "leaseOwner", ImmutableList.of("parent1", "parent2"));
|
||||||
Lease parentLease = LeaseHelper.createLease("shardId-1", "leaseOwner", Collections.emptyList());
|
Lease parentLease = LeaseHelper.createLease("shardId-1", "leaseOwner", Collections.emptyList());
|
||||||
|
|
@ -221,6 +219,11 @@ public class ShutdownTaskTest {
|
||||||
|
|
||||||
// Make first 5 attempts with partial parent info in lease table
|
// Make first 5 attempts with partial parent info in lease table
|
||||||
for (int i = 0; i < 5; i++) {
|
for (int i = 0; i < 5; i++) {
|
||||||
|
ShutdownTask task = spy(new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
|
||||||
|
SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards,
|
||||||
|
ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
|
||||||
|
hierarchicalShardSyncer, NULL_METRICS_FACTORY, constructChildShard(), streamIdentifier, leaseCleanupManager));
|
||||||
|
when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(false);
|
||||||
TaskResult result = task.call();
|
TaskResult result = task.call();
|
||||||
assertNotNull(result.getException());
|
assertNotNull(result.getException());
|
||||||
assertTrue(result.getException() instanceof BlockedOnParentShardException);
|
assertTrue(result.getException() instanceof BlockedOnParentShardException);
|
||||||
|
|
@ -232,11 +235,17 @@ public class ShutdownTaskTest {
|
||||||
verify(leaseCoordinator, never())
|
verify(leaseCoordinator, never())
|
||||||
.updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString());
|
.updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString());
|
||||||
verify(leaseRefresher, never()).createLeaseIfNotExists(Matchers.any(Lease.class));
|
verify(leaseRefresher, never()).createLeaseIfNotExists(Matchers.any(Lease.class));
|
||||||
|
verify(task, times(1)).isOneInNProbability(RETRY_RANDOM_MAX_RANGE);
|
||||||
verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class));
|
verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class));
|
||||||
verify(leaseCleanupManager, never()).enqueueForDeletion(any(LeasePendingDeletion.class));
|
verify(leaseCleanupManager, never()).enqueueForDeletion(any(LeasePendingDeletion.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
// make next attempt with complete parent info in lease table
|
// make next attempt with complete parent info in lease table
|
||||||
|
ShutdownTask task = spy(new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
|
||||||
|
SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards,
|
||||||
|
ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
|
||||||
|
hierarchicalShardSyncer, NULL_METRICS_FACTORY, constructChildShard(), streamIdentifier, leaseCleanupManager));
|
||||||
|
when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(false);
|
||||||
TaskResult result = task.call();
|
TaskResult result = task.call();
|
||||||
assertNull(result.getException());
|
assertNull(result.getException());
|
||||||
verify(recordsPublisher).shutdown();
|
verify(recordsPublisher).shutdown();
|
||||||
|
|
@ -244,6 +253,7 @@ public class ShutdownTaskTest {
|
||||||
verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build());
|
verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build());
|
||||||
verify(leaseRefresher).updateLeaseWithMetaInfo(Matchers.any(Lease.class), Matchers.any(UpdateField.class));
|
verify(leaseRefresher).updateLeaseWithMetaInfo(Matchers.any(Lease.class), Matchers.any(UpdateField.class));
|
||||||
verify(leaseRefresher, times(1)).createLeaseIfNotExists(Matchers.any(Lease.class));
|
verify(leaseRefresher, times(1)).createLeaseIfNotExists(Matchers.any(Lease.class));
|
||||||
|
verify(task, never()).isOneInNProbability(RETRY_RANDOM_MAX_RANGE);
|
||||||
verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class));
|
verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class));
|
||||||
verify(leaseCleanupManager, times(1)).enqueueForDeletion(any(LeasePendingDeletion.class));
|
verify(leaseCleanupManager, times(1)).enqueueForDeletion(any(LeasePendingDeletion.class));
|
||||||
}
|
}
|
||||||
|
|
@ -252,10 +262,6 @@ public class ShutdownTaskTest {
|
||||||
public final void testCallTriggersLeaseLossWhenParentInfoNotPresentInLease() throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
public final void testCallTriggersLeaseLossWhenParentInfoNotPresentInLease() throws DependencyException, InvalidStateException, ProvisionedThroughputException {
|
||||||
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(),
|
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(),
|
||||||
ExtendedSequenceNumber.LATEST);
|
ExtendedSequenceNumber.LATEST);
|
||||||
task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
|
|
||||||
SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards,
|
|
||||||
ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
|
|
||||||
hierarchicalShardSyncer, NULL_METRICS_FACTORY, constructChildShard(), streamIdentifier, leaseCleanupManager);
|
|
||||||
|
|
||||||
when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
|
when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
|
||||||
Lease heldLease = LeaseHelper.createLease("shardId-0", "leaseOwner", ImmutableList.of("parent1", "parent2"));
|
Lease heldLease = LeaseHelper.createLease("shardId-0", "leaseOwner", ImmutableList.of("parent1", "parent2"));
|
||||||
|
|
@ -272,6 +278,11 @@ public class ShutdownTaskTest {
|
||||||
|
|
||||||
// Make first 10 attempts with partial parent info in lease table
|
// Make first 10 attempts with partial parent info in lease table
|
||||||
for (int i = 0; i < 10; i++) {
|
for (int i = 0; i < 10; i++) {
|
||||||
|
ShutdownTask task = spy(new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
|
||||||
|
SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards,
|
||||||
|
ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
|
||||||
|
hierarchicalShardSyncer, NULL_METRICS_FACTORY, constructChildShard(), streamIdentifier, leaseCleanupManager));
|
||||||
|
when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(false);
|
||||||
TaskResult result = task.call();
|
TaskResult result = task.call();
|
||||||
assertNotNull(result.getException());
|
assertNotNull(result.getException());
|
||||||
assertTrue(result.getException() instanceof BlockedOnParentShardException);
|
assertTrue(result.getException() instanceof BlockedOnParentShardException);
|
||||||
|
|
@ -283,11 +294,17 @@ public class ShutdownTaskTest {
|
||||||
verify(leaseCoordinator, never())
|
verify(leaseCoordinator, never())
|
||||||
.updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString());
|
.updateLease(Matchers.any(Lease.class), Matchers.any(UUID.class), Matchers.anyString(), Matchers.anyString());
|
||||||
verify(leaseRefresher, never()).createLeaseIfNotExists(Matchers.any(Lease.class));
|
verify(leaseRefresher, never()).createLeaseIfNotExists(Matchers.any(Lease.class));
|
||||||
|
verify(task, times(1)).isOneInNProbability(RETRY_RANDOM_MAX_RANGE);
|
||||||
verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class));
|
verify(leaseCoordinator, never()).dropLease(Matchers.any(Lease.class));
|
||||||
verify(leaseCleanupManager, never()).enqueueForDeletion(any(LeasePendingDeletion.class));
|
verify(leaseCleanupManager, never()).enqueueForDeletion(any(LeasePendingDeletion.class));
|
||||||
}
|
}
|
||||||
|
|
||||||
// make final attempt with incomplete parent info in lease table
|
// make final attempt with incomplete parent info in lease table
|
||||||
|
ShutdownTask task = spy(new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
|
||||||
|
SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards,
|
||||||
|
ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
|
||||||
|
hierarchicalShardSyncer, NULL_METRICS_FACTORY, constructChildShard(), streamIdentifier, leaseCleanupManager));
|
||||||
|
when(task.isOneInNProbability(RETRY_RANDOM_MAX_RANGE)).thenReturn(true);
|
||||||
TaskResult result = task.call();
|
TaskResult result = task.call();
|
||||||
assertNull(result.getException());
|
assertNull(result.getException());
|
||||||
verify(recordsPublisher).shutdown();
|
verify(recordsPublisher).shutdown();
|
||||||
|
|
@ -295,6 +312,7 @@ public class ShutdownTaskTest {
|
||||||
verify(shardRecordProcessor).leaseLost(LeaseLostInput.builder().build());
|
verify(shardRecordProcessor).leaseLost(LeaseLostInput.builder().build());
|
||||||
verify(leaseRefresher, never()).updateLeaseWithMetaInfo(Matchers.any(Lease.class), Matchers.any(UpdateField.class));
|
verify(leaseRefresher, never()).updateLeaseWithMetaInfo(Matchers.any(Lease.class), Matchers.any(UpdateField.class));
|
||||||
verify(leaseRefresher, never()).createLeaseIfNotExists(Matchers.any(Lease.class));
|
verify(leaseRefresher, never()).createLeaseIfNotExists(Matchers.any(Lease.class));
|
||||||
|
verify(task, times(1)).isOneInNProbability(RETRY_RANDOM_MAX_RANGE);
|
||||||
verify(leaseCoordinator).dropLease(Matchers.any(Lease.class));
|
verify(leaseCoordinator).dropLease(Matchers.any(Lease.class));
|
||||||
verify(leaseCleanupManager, never()).enqueueForDeletion(any(LeasePendingDeletion.class));
|
verify(leaseCleanupManager, never()).enqueueForDeletion(any(LeasePendingDeletion.class));
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue