Revalidate if current shard is closed before shutting down the ShardConsumer

This commit is contained in:
Chunxue Yang 2019-10-02 23:58:13 +00:00
parent 98b016276b
commit 8847be997b
4 changed files with 247 additions and 15 deletions

View file

@ -79,6 +79,18 @@ public class HierarchicalShardSyncer {
final MetricsScope scope) throws DependencyException, InvalidStateException, final MetricsScope scope) throws DependencyException, InvalidStateException,
ProvisionedThroughputException, KinesisClientLibIOException { ProvisionedThroughputException, KinesisClientLibIOException {
final List<Shard> shards = getShardList(shardDetector); final List<Shard> shards = getShardList(shardDetector);
checkAndCreateLeaseForNewShards(shards, shardDetector, leaseRefresher, initialPosition, cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards, scope);
}
//Provide a pre-collcted list of shards to avoid calling ListShards API
public synchronized void checkAndCreateLeaseForNewShards(List<Shard> shards, @NonNull final ShardDetector shardDetector,
final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, final boolean cleanupLeasesOfCompletedShards,
final boolean ignoreUnexpectedChildShards, final MetricsScope scope)throws DependencyException, InvalidStateException,
ProvisionedThroughputException, KinesisClientLibIOException {
if(CollectionUtils.isNullOrEmpty(shards)) {
shards = getShardList(shardDetector);
}
log.debug("Num shards: {}", shards.size()); log.debug("Num shards: {}", shards.size());
final Map<String, Shard> shardIdToShardMap = constructShardIdToShardMap(shards); final Map<String, Shard> shardIdToShardMap = constructShardIdToShardMap(shards);
@ -92,7 +104,7 @@ public class HierarchicalShardSyncer {
final List<Lease> currentLeases = leaseRefresher.listLeases(); final List<Lease> currentLeases = leaseRefresher.listLeases();
final List<Lease> newLeasesToCreate = determineNewLeasesToCreate(shards, currentLeases, initialPosition, final List<Lease> newLeasesToCreate = determineNewLeasesToCreate(shards, currentLeases, initialPosition,
inconsistentShardIds); inconsistentShardIds);
log.debug("Num new leases to create: {}", newLeasesToCreate.size()); log.debug("Num new leases to create: {}", newLeasesToCreate.size());
for (Lease lease : newLeasesToCreate) { for (Lease lease : newLeasesToCreate) {
long startTime = System.currentTimeMillis(); long startTime = System.currentTimeMillis();
@ -110,8 +122,9 @@ public class HierarchicalShardSyncer {
cleanupGarbageLeases(shardDetector, shards, trackedLeases, leaseRefresher); cleanupGarbageLeases(shardDetector, shards, trackedLeases, leaseRefresher);
if (cleanupLeasesOfCompletedShards) { if (cleanupLeasesOfCompletedShards) {
cleanupLeasesOfFinishedShards(currentLeases, shardIdToShardMap, shardIdToChildShardIdsMap, trackedLeases, cleanupLeasesOfFinishedShards(currentLeases, shardIdToShardMap, shardIdToChildShardIdsMap, trackedLeases,
leaseRefresher); leaseRefresher);
} }
} }
// CHECKSTYLE:ON CyclomaticComplexity // CHECKSTYLE:ON CyclomaticComplexity

View file

@ -16,9 +16,11 @@ package software.amazon.kinesis.lifecycle;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.sun.org.apache.bcel.internal.generic.LUSHR;
import lombok.NonNull; import lombok.NonNull;
import lombok.RequiredArgsConstructor; import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
@ -36,6 +38,9 @@ import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import java.util.ArrayList;
import java.util.List;
/** /**
* Task for invoking the ShardRecordProcessor shutdown() callback. * Task for invoking the ShardRecordProcessor shutdown() callback.
*/ */
@ -55,7 +60,7 @@ public class ShutdownTask implements ConsumerTask {
@NonNull @NonNull
private final ShardRecordProcessorCheckpointer recordProcessorCheckpointer; private final ShardRecordProcessorCheckpointer recordProcessorCheckpointer;
@NonNull @NonNull
private final ShutdownReason reason; private ShutdownReason reason;
@NonNull @NonNull
private final InitialPositionInStreamExtended initialPositionInStream; private final InitialPositionInStreamExtended initialPositionInStream;
private final boolean cleanupLeasesOfCompletedShards; private final boolean cleanupLeasesOfCompletedShards;
@ -88,6 +93,15 @@ public class ShutdownTask implements ConsumerTask {
try { try {
try { try {
List<Shard> allShards = new ArrayList<>();
if(reason == ShutdownReason.SHARD_END) {
allShards = shardDetector.listShards();
if(!isRealShardEnd(allShards)) {
reason = ShutdownReason.LEASE_LOST;
}
}
// If we reached end of the shard, set sequence number to SHARD_END. // If we reached end of the shard, set sequence number to SHARD_END.
if (reason == ShutdownReason.SHARD_END) { if (reason == ShutdownReason.SHARD_END) {
recordProcessorCheckpointer recordProcessorCheckpointer
@ -126,7 +140,7 @@ public class ShutdownTask implements ConsumerTask {
if (reason == ShutdownReason.SHARD_END) { if (reason == ShutdownReason.SHARD_END) {
log.debug("Looking for child shards of shard {}", shardInfo.shardId()); log.debug("Looking for child shards of shard {}", shardInfo.shardId());
// create leases for the child shards // create leases for the child shards
hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(allShards, shardDetector, leaseRefresher,
initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope); initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope);
log.debug("Finished checking for child shards of shard {}", shardInfo.shardId()); log.debug("Finished checking for child shards of shard {}", shardInfo.shardId());
} }
@ -169,4 +183,17 @@ public class ShutdownTask implements ConsumerTask {
return reason; return reason;
} }
private boolean isRealShardEnd(List<Shard> shards) {
boolean realShardEnd = false;
for(Shard shard : shards) {
if(shard.parentShardId() != null && shard.parentShardId().equals(shardInfo.shardId())
|| shard.adjacentParentShardId() != null && shard.adjacentParentShardId().equals(shardInfo.shardId())) {
realShardEnd = true;
break;
}
}
return realShardEnd;
}
} }

View file

@ -205,6 +205,70 @@ public class HierarchicalShardSyncerTest {
} }
@Test
public void testCheckAndCreateLeasesForShardsWithShardList() throws Exception {
final List<Shard> shards = constructShardListForGraphA();
final ArgumentCaptor<Lease> leaseCaptor = ArgumentCaptor.forClass(Lease.class);
when(shardDetector.listShards()).thenReturn(shards);
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList());
when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true);
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(shards, shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
cleanupLeasesOfCompletedShards, false, SCOPE);
final Set<String> expectedShardIds = new HashSet<>(
Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10"));
final List<Lease> requestLeases = leaseCaptor.getAllValues();
final Set<String> requestLeaseKeys = requestLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final Set<ExtendedSequenceNumber> extendedSequenceNumbers = requestLeases.stream().map(Lease::checkpoint)
.collect(Collectors.toSet());
assertThat(requestLeases.size(), equalTo(expectedShardIds.size()));
assertThat(requestLeaseKeys, equalTo(expectedShardIds));
assertThat(extendedSequenceNumbers.size(), equalTo(1));
extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST)));
verify(shardDetector, never()).listShards();
verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
}
@Test
public void testCheckAndCreateLeasesForShardsWithEmptyShardList() throws Exception {
final List<Shard> shards = constructShardListForGraphA();
final ArgumentCaptor<Lease> leaseCaptor = ArgumentCaptor.forClass(Lease.class);
when(shardDetector.listShards()).thenReturn(shards);
when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList());
when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true);
hierarchicalShardSyncer
.checkAndCreateLeaseForNewShards(new ArrayList<Shard>(), shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST,
cleanupLeasesOfCompletedShards, false, SCOPE);
final Set<String> expectedShardIds = new HashSet<>(
Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10"));
final List<Lease> requestLeases = leaseCaptor.getAllValues();
final Set<String> requestLeaseKeys = requestLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet());
final Set<ExtendedSequenceNumber> extendedSequenceNumbers = requestLeases.stream().map(Lease::checkpoint)
.collect(Collectors.toSet());
assertThat(requestLeases.size(), equalTo(expectedShardIds.size()));
assertThat(requestLeaseKeys, equalTo(expectedShardIds));
assertThat(extendedSequenceNumbers.size(), equalTo(1));
extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST)));
verify(shardDetector).listShards();
verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class));
verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class));
}
@Test @Test
public void testCheckAndCreateLeasesForNewShardsAtTrimHorizon() throws Exception { public void testCheckAndCreateLeasesForNewShardsAtTrimHorizon() throws Exception {
testCheckAndCreateLeaseForShardsIfMissing(constructShardListForGraphA(), INITIAL_POSITION_TRIM_HORIZON); testCheckAndCreateLeaseForShardsIfMissing(constructShardListForGraphA(), INITIAL_POSITION_TRIM_HORIZON);
@ -1035,7 +1099,11 @@ public class HierarchicalShardSyncerTest {
/* /*
* Helper method to construct a shard list for graph A. Graph A is defined below. Shard structure (y-axis is * Helper method to construct a shard list for graph A. Graph A is defined below. Shard structure (y-axis is
* epochs): 0 1 2 3 4 5- shards till epoch 102 \ / \ / | | 6 7 4 5- shards from epoch 103 - 205 \ / | /\ 8 4 9 10 - * epochs): 0 1 2 3 4 5- shards till
* \ / \ / | |
* 6 7 4 5- shards from epoch 103 - 205
* \ / | /\
* 8 4 9 10 -
* shards from epoch 206 (open - no ending sequenceNumber) * shards from epoch 206 (open - no ending sequenceNumber)
*/ */
private List<Shard> constructShardListForGraphA() { private List<Shard> constructShardListForGraphA() {

View file

@ -16,13 +16,18 @@ package software.amazon.kinesis.lifecycle;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -30,6 +35,8 @@ import org.junit.runner.RunWith;
import org.mockito.Mock; import org.mockito.Mock;
import org.mockito.runners.MockitoJUnitRunner; import org.mockito.runners.MockitoJUnitRunner;
import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange;
import software.amazon.awssdk.services.kinesis.model.Shard;
import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer;
import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStream;
import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.common.InitialPositionInStreamExtended;
@ -38,13 +45,15 @@ import software.amazon.kinesis.leases.HierarchicalShardSyncer;
import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.LeaseRefresher;
import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardDetector;
import software.amazon.kinesis.leases.ShardInfo; import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.ShardObjectHelper;
import software.amazon.kinesis.lifecycle.events.LeaseLostInput;
import software.amazon.kinesis.lifecycle.events.ShardEndedInput;
import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.metrics.MetricsFactory;
import software.amazon.kinesis.metrics.NullMetricsFactory; import software.amazon.kinesis.metrics.NullMetricsFactory;
import software.amazon.kinesis.processor.Checkpointer; import software.amazon.kinesis.processor.Checkpointer;
import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor;
import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber;
import software.amazon.kinesis.utils.TestStreamlet;
/** /**
* *
@ -54,14 +63,14 @@ public class ShutdownTaskTest {
private static final long TASK_BACKOFF_TIME_MILLIS = 1L; private static final long TASK_BACKOFF_TIME_MILLIS = 1L;
private static final InitialPositionInStreamExtended INITIAL_POSITION_TRIM_HORIZON = private static final InitialPositionInStreamExtended INITIAL_POSITION_TRIM_HORIZON =
InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON); InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON);
private static final ShutdownReason TERMINATE_SHUTDOWN_REASON = ShutdownReason.SHARD_END; private static final ShutdownReason SHARD_END_SHUTDOWN_REASON = ShutdownReason.SHARD_END;
private static final ShutdownReason LEASE_LOST_SHUTDOWN_REASON = ShutdownReason.LEASE_LOST;
private static final MetricsFactory NULL_METRICS_FACTORY = new NullMetricsFactory(); private static final MetricsFactory NULL_METRICS_FACTORY = new NullMetricsFactory();
private final String concurrencyToken = "testToken4398"; private final String concurrencyToken = "testToken4398";
private final String shardId = "shardId-0000397840"; private final String shardId = "shardId-0";
private boolean cleanupLeasesOfCompletedShards = false; private boolean cleanupLeasesOfCompletedShards = false;
private boolean ignoreUnexpectedChildShards = false; private boolean ignoreUnexpectedChildShards = false;
private ShardRecordProcessor shardRecordProcessor;
private ShardInfo shardInfo; private ShardInfo shardInfo;
private ShutdownTask task; private ShutdownTask task;
@ -77,6 +86,8 @@ public class ShutdownTaskTest {
private ShardDetector shardDetector; private ShardDetector shardDetector;
@Mock @Mock
private HierarchicalShardSyncer hierarchicalShardSyncer; private HierarchicalShardSyncer hierarchicalShardSyncer;
@Mock
private ShardRecordProcessor shardRecordProcessor;
@Before @Before
public void setUp() throws Exception { public void setUp() throws Exception {
@ -85,10 +96,9 @@ public class ShutdownTaskTest {
shardInfo = new ShardInfo(shardId, concurrencyToken, Collections.emptySet(), shardInfo = new ShardInfo(shardId, concurrencyToken, Collections.emptySet(),
ExtendedSequenceNumber.LATEST); ExtendedSequenceNumber.LATEST);
shardRecordProcessor = new TestStreamlet();
task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
TERMINATE_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards, leaseRefresher, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, ignoreUnexpectedChildShards, leaseRefresher, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
hierarchicalShardSyncer, NULL_METRICS_FACTORY); hierarchicalShardSyncer, NULL_METRICS_FACTORY);
} }
@ -98,7 +108,9 @@ public class ShutdownTaskTest {
*/ */
@Test @Test
public final void testCallWhenApplicationDoesNotCheckpoint() { public final void testCallWhenApplicationDoesNotCheckpoint() {
when(shardDetector.listShards()).thenReturn(constructShardListGraphA());
when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(new ExtendedSequenceNumber("3298")); when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(new ExtendedSequenceNumber("3298"));
final TaskResult result = task.call(); final TaskResult result = task.call();
assertNotNull(result.getException()); assertNotNull(result.getException());
assertTrue(result.getException() instanceof IllegalArgumentException); assertTrue(result.getException() instanceof IllegalArgumentException);
@ -109,19 +121,90 @@ public class ShutdownTaskTest {
*/ */
@Test @Test
public final void testCallWhenSyncingShardsThrows() throws Exception { public final void testCallWhenSyncingShardsThrows() throws Exception {
List<Shard> shards = constructShardListGraphA();
when(shardDetector.listShards()).thenReturn(shards);
when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END); when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
when(shardDetector.listShards()).thenReturn(null);
doAnswer((invocation) -> { doAnswer((invocation) -> {
throw new KinesisClientLibIOException("KinesisClientLibIOException"); throw new KinesisClientLibIOException("KinesisClientLibIOException");
}).when(hierarchicalShardSyncer) }).when(hierarchicalShardSyncer)
.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, INITIAL_POSITION_TRIM_HORIZON, .checkAndCreateLeaseForNewShards(shards, shardDetector, leaseRefresher, INITIAL_POSITION_TRIM_HORIZON,
cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards,
NULL_METRICS_FACTORY.createMetrics()); NULL_METRICS_FACTORY.createMetrics());
TaskResult result = task.call(); final TaskResult result = task.call();
assertNotNull(result.getException()); assertNotNull(result.getException());
assertTrue(result.getException() instanceof KinesisClientLibIOException); assertTrue(result.getException() instanceof KinesisClientLibIOException);
verify(recordsPublisher).shutdown(); verify(recordsPublisher).shutdown();
verify(shardRecordProcessor).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build());
}
/**
* Test method for {@link ShutdownTask#call()}.
*/
@Test
public final void testCallWhenTrueShardEnd() {
shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(),
ExtendedSequenceNumber.LATEST);
task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards, leaseRefresher, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
hierarchicalShardSyncer, NULL_METRICS_FACTORY);
when(shardDetector.listShards()).thenReturn(constructShardListGraphA());
when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END);
final TaskResult result = task.call();
assertNull(result.getException());
verify(recordsPublisher).shutdown();
verify(shardRecordProcessor).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build());
verify(shardDetector, times(1)).listShards();
}
/**
* Test method for {@link ShutdownTask#call()}.
*/
@Test
public final void testCallWhenFalseShardEnd() {
shardInfo = new ShardInfo("shardId-4", concurrencyToken, Collections.emptySet(),
ExtendedSequenceNumber.LATEST);
task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards, leaseRefresher, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
hierarchicalShardSyncer, NULL_METRICS_FACTORY);
when(shardDetector.listShards()).thenReturn(constructShardListGraphA());
final TaskResult result = task.call();
assertNull(result.getException());
verify(recordsPublisher).shutdown();
verify(shardRecordProcessor, never()).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
verify(shardRecordProcessor).leaseLost(LeaseLostInput.builder().build());
verify(shardDetector, times(1)).listShards();
}
/**
* Test method for {@link ShutdownTask#call()}.
*/
@Test
public final void testCallWhenLeaseLost() {
shardInfo = new ShardInfo("shardId-4", concurrencyToken, Collections.emptySet(),
ExtendedSequenceNumber.LATEST);
task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer,
LEASE_LOST_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards,
ignoreUnexpectedChildShards, leaseRefresher, TASK_BACKOFF_TIME_MILLIS, recordsPublisher,
hierarchicalShardSyncer, NULL_METRICS_FACTORY);
when(shardDetector.listShards()).thenReturn(constructShardListGraphA());
final TaskResult result = task.call();
assertNull(result.getException());
verify(recordsPublisher).shutdown();
verify(shardRecordProcessor, never()).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build());
verify(shardRecordProcessor).leaseLost(LeaseLostInput.builder().build());
verify(shardDetector, never()).listShards();
} }
/** /**
@ -132,4 +215,45 @@ public class ShutdownTaskTest {
assertEquals(TaskType.SHUTDOWN, task.taskType()); assertEquals(TaskType.SHUTDOWN, task.taskType());
} }
/*
* Helper method to construct a shard list for graph A. Graph A is defined below. Shard structure (y-axis is
* epochs): 0 1 2 3 4 5 - shards till
* \ / \ / | |
* 6 7 4 5 - shards from epoch 103 - 205
* \ / | /\
* 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber)
*/
private List<Shard> constructShardListGraphA() {
final SequenceNumberRange range0 = ShardObjectHelper.newSequenceNumberRange("11", "102");
final SequenceNumberRange range1 = ShardObjectHelper.newSequenceNumberRange("11", null);
final SequenceNumberRange range2 = ShardObjectHelper.newSequenceNumberRange("11", "205");
final SequenceNumberRange range3 = ShardObjectHelper.newSequenceNumberRange("103", "205");
final SequenceNumberRange range4 = ShardObjectHelper.newSequenceNumberRange("206", null);
return Arrays.asList(
ShardObjectHelper.newShard("shardId-0", null, null, range0,
ShardObjectHelper.newHashKeyRange("0", "99")),
ShardObjectHelper.newShard("shardId-1", null, null, range0,
ShardObjectHelper.newHashKeyRange("100", "199")),
ShardObjectHelper.newShard("shardId-2", null, null, range0,
ShardObjectHelper.newHashKeyRange("200", "299")),
ShardObjectHelper.newShard("shardId-3", null, null, range0,
ShardObjectHelper.newHashKeyRange("300", "399")),
ShardObjectHelper.newShard("shardId-4", null, null, range1,
ShardObjectHelper.newHashKeyRange("400", "499")),
ShardObjectHelper.newShard("shardId-5", null, null, range2,
ShardObjectHelper.newHashKeyRange("500", ShardObjectHelper.MAX_HASH_KEY)),
ShardObjectHelper.newShard("shardId-6", "shardId-0", "shardId-1", range3,
ShardObjectHelper.newHashKeyRange("0", "199")),
ShardObjectHelper.newShard("shardId-7", "shardId-2", "shardId-3", range3,
ShardObjectHelper.newHashKeyRange("200", "399")),
ShardObjectHelper.newShard("shardId-8", "shardId-6", "shardId-7", range4,
ShardObjectHelper.newHashKeyRange("0", "399")),
ShardObjectHelper.newShard("shardId-9", "shardId-5", null, range4,
ShardObjectHelper.newHashKeyRange("500", "799")),
ShardObjectHelper.newShard("shardId-10", null, "shardId-5", range4,
ShardObjectHelper.newHashKeyRange("800", ShardObjectHelper.MAX_HASH_KEY)));
}
} }