diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java index 0f26c3d6..fd8dfcb1 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java @@ -588,7 +588,7 @@ public class Scheduler implements Runnable { checkpoint); ShardConsumerArgument argument = new ShardConsumerArgument(shardInfo, streamName, - leaseRefresher, + leaseCoordinator, executorService, cache, shardRecordProcessorFactory.shardRecordProcessor(), diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java index 5998ea62..578af465 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/HierarchicalShardSyncer.java @@ -78,10 +78,21 @@ public class HierarchicalShardSyncer { final boolean cleanupLeasesOfCompletedShards, final boolean ignoreUnexpectedChildShards, final MetricsScope scope) throws DependencyException, InvalidStateException, ProvisionedThroughputException, KinesisClientLibIOException { - final List shards = getShardList(shardDetector); - log.debug("Num shards: {}", shards.size()); + final List latestShards = getShardList(shardDetector); + checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, initialPosition, cleanupLeasesOfCompletedShards, + ignoreUnexpectedChildShards, scope, latestShards); + } - final Map shardIdToShardMap = constructShardIdToShardMap(shards); + //Provide a pre-collcted list of shards to avoid calling ListShards API + public synchronized void checkAndCreateLeaseForNewShards(@NonNull final ShardDetector shardDetector, + final LeaseRefresher leaseRefresher, final InitialPositionInStreamExtended initialPosition, final boolean cleanupLeasesOfCompletedShards, + final boolean ignoreUnexpectedChildShards, final MetricsScope scope, List latestShards)throws DependencyException, InvalidStateException, + ProvisionedThroughputException, KinesisClientLibIOException { + if (!CollectionUtils.isNullOrEmpty(latestShards)) { + log.debug("Num shards: {}", latestShards.size()); + } + + final Map shardIdToShardMap = constructShardIdToShardMap(latestShards); final Map> shardIdToChildShardIdsMap = constructShardIdToChildShardIdsMap( shardIdToShardMap); final Set inconsistentShardIds = findInconsistentShardIds(shardIdToChildShardIdsMap, shardIdToShardMap); @@ -91,8 +102,7 @@ public class HierarchicalShardSyncer { final List currentLeases = leaseRefresher.listLeases(); - final List newLeasesToCreate = determineNewLeasesToCreate(shards, currentLeases, initialPosition, - inconsistentShardIds); + final List newLeasesToCreate = determineNewLeasesToCreate(latestShards, currentLeases, initialPosition, inconsistentShardIds); log.debug("Num new leases to create: {}", newLeasesToCreate.size()); for (Lease lease : newLeasesToCreate) { long startTime = System.currentTimeMillis(); @@ -104,14 +114,13 @@ public class HierarchicalShardSyncer { MetricsUtil.addSuccessAndLatency(scope, "CreateLease", success, startTime, MetricsLevel.DETAILED); } } - final List trackedLeases = new ArrayList<>(currentLeases); trackedLeases.addAll(newLeasesToCreate); - cleanupGarbageLeases(shardDetector, shards, trackedLeases, leaseRefresher); + cleanupGarbageLeases(shardDetector, latestShards, trackedLeases, leaseRefresher); if (cleanupLeasesOfCompletedShards) { - cleanupLeasesOfFinishedShards(currentLeases, shardIdToShardMap, shardIdToChildShardIdsMap, trackedLeases, - leaseRefresher); + cleanupLeasesOfFinishedShards(currentLeases, shardIdToShardMap, shardIdToChildShardIdsMap, trackedLeases, leaseRefresher); } + } // CHECKSTYLE:ON CyclomaticComplexity diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java index df7ea6aa..bb1788b2 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ConsumerStates.java @@ -135,7 +135,7 @@ class ConsumerStates { @Override public ConsumerTask createTask(ShardConsumerArgument consumerArgument, ShardConsumer consumer, ProcessRecordsInput input) { return new BlockOnParentShardTask(consumerArgument.shardInfo(), - consumerArgument.leaseRefresher(), + consumerArgument.leaseCoordinator().leaseRefresher(), consumerArgument.parentShardPollIntervalMillis()); } @@ -492,7 +492,7 @@ class ConsumerStates { argument.initialPositionInStream(), argument.cleanupLeasesOfCompletedShards(), argument.ignoreUnexpectedChildShards(), - argument.leaseRefresher(), + argument.leaseCoordinator(), argument.taskBackoffTimeMillis(), argument.recordsPublisher(), argument.hierarchicalShardSyncer(), diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java index fcc49f63..4f1db733 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShardConsumerArgument.java @@ -21,6 +21,7 @@ import lombok.experimental.Accessors; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardInfo; @@ -42,7 +43,7 @@ public class ShardConsumerArgument { @NonNull private final String streamName; @NonNull - private final LeaseRefresher leaseRefresher; + private final LeaseCoordinator leaseCoordinator; @NonNull private final ExecutorService executorService; @NonNull diff --git a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java index edb69649..2bfcd358 100644 --- a/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java +++ b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/lifecycle/ShutdownTask.java @@ -16,12 +16,18 @@ package software.amazon.kinesis.lifecycle; import com.google.common.annotations.VisibleForTesting; +import com.sun.org.apache.bcel.internal.generic.LUSHR; import lombok.NonNull; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.StringUtils; +import software.amazon.awssdk.services.kinesis.model.Shard; +import software.amazon.awssdk.utils.CollectionUtils; import software.amazon.kinesis.annotations.KinesisClientInternalApi; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.leases.Lease; +import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardInfo; @@ -36,6 +42,10 @@ import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; + /** * Task for invoking the ShardRecordProcessor shutdown() callback. */ @@ -61,7 +71,7 @@ public class ShutdownTask implements ConsumerTask { private final boolean cleanupLeasesOfCompletedShards; private final boolean ignoreUnexpectedChildShards; @NonNull - private final LeaseRefresher leaseRefresher; + private final LeaseCoordinator leaseCoordinator; private final long backoffTimeMillis; @NonNull private final RecordsPublisher recordsPublisher; @@ -88,20 +98,38 @@ public class ShutdownTask implements ConsumerTask { try { try { + ShutdownReason localReason = reason; + List latestShards = null; + /* + * Revalidate if the current shard is closed before shutting down the shard consumer with reason SHARD_END + * If current shard is not closed, shut down the shard consumer with reason LEASE_LOST that allows + * active workers to contend for the lease of this shard. + */ + if (localReason == ShutdownReason.SHARD_END) { + latestShards = shardDetector.listShards(); + + //If latestShards is empty, should also shutdown the ShardConsumer without checkpoint with SHARD_END + if (CollectionUtils.isNullOrEmpty(latestShards) || !isShardInContextParentOfAny(latestShards)) { + localReason = ShutdownReason.LEASE_LOST; + dropLease(); + log.info("Forcing the lease to be lost before shutting down the consumer for Shard: " + shardInfo.shardId()); + } + } + // If we reached end of the shard, set sequence number to SHARD_END. - if (reason == ShutdownReason.SHARD_END) { + if (localReason == ShutdownReason.SHARD_END) { recordProcessorCheckpointer .sequenceNumberAtShardEnd(recordProcessorCheckpointer.largestPermittedCheckpointValue()); recordProcessorCheckpointer.largestPermittedCheckpointValue(ExtendedSequenceNumber.SHARD_END); } log.debug("Invoking shutdown() for shard {}, concurrencyToken {}. Shutdown reason: {}", - shardInfo.shardId(), shardInfo.concurrencyToken(), reason); - final ShutdownInput shutdownInput = ShutdownInput.builder().shutdownReason(reason) + shardInfo.shardId(), shardInfo.concurrencyToken(), localReason); + final ShutdownInput shutdownInput = ShutdownInput.builder().shutdownReason(localReason) .checkpointer(recordProcessorCheckpointer).build(); final long startTime = System.currentTimeMillis(); try { - if (reason == ShutdownReason.SHARD_END) { + if (localReason == ShutdownReason.SHARD_END) { shardRecordProcessor.shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); ExtendedSequenceNumber lastCheckpointValue = recordProcessorCheckpointer.lastCheckpointValue(); if (lastCheckpointValue == null @@ -123,11 +151,11 @@ public class ShutdownTask implements ConsumerTask { MetricsUtil.addLatency(scope, RECORD_PROCESSOR_SHUTDOWN_METRIC, startTime, MetricsLevel.SUMMARY); } - if (reason == ShutdownReason.SHARD_END) { + if (localReason == ShutdownReason.SHARD_END) { log.debug("Looking for child shards of shard {}", shardInfo.shardId()); // create leases for the child shards - hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, - initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope); + hierarchicalShardSyncer.checkAndCreateLeaseForNewShards(shardDetector, leaseCoordinator.leaseRefresher(), + initialPositionInStream, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, scope, latestShards); log.debug("Finished checking for child shards of shard {}", shardInfo.shardId()); } @@ -169,4 +197,26 @@ public class ShutdownTask implements ConsumerTask { return reason; } + private boolean isShardInContextParentOfAny(List shards) { + for(Shard shard : shards) { + if (isChildShardOfShardInContext(shard)) { + return true; + } + } + return false; + } + + private boolean isChildShardOfShardInContext(Shard shard) { + return (StringUtils.equals(shard.parentShardId(), shardInfo.shardId()) + || StringUtils.equals(shard.adjacentParentShardId(), shardInfo.shardId())); + } + + private void dropLease() { + Lease currentLease = leaseCoordinator.getCurrentlyHeldLease(shardInfo.shardId()); + leaseCoordinator.dropLease(currentLease); + if(currentLease != null) { + log.warn("Dropped lease for shutting down ShardConsumer: " + currentLease.leaseKey()); + } + } + } diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java index 85230c0d..23d2e423 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/leases/HierarchicalShardSyncerTest.java @@ -171,6 +171,9 @@ public class HierarchicalShardSyncerTest { testCheckAndCreateLeasesForShardsIfMissing(INITIAL_POSITION_LATEST); } + /** + * Test checkAndCreateLeaseForNewShards while not providing a pre-fetched list of shards + */ @Test public void testCheckAndCreateLeasesForShardsIfMissingAtLatest() throws Exception { final List shards = constructShardListForGraphA(); @@ -205,6 +208,74 @@ public class HierarchicalShardSyncerTest { } + /** + * Test checkAndCreateLeaseForNewShards with a pre-fetched list of shards. In this scenario, shardDetector.listShards() + * should never be called. + */ + @Test + public void testCheckAndCreateLeasesForShardsWithShardList() throws Exception { + final List latestShards = constructShardListForGraphA(); + + final ArgumentCaptor leaseCaptor = ArgumentCaptor.forClass(Lease.class); + when(shardDetector.listShards()).thenReturn(latestShards); + when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList()); + when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true); + + hierarchicalShardSyncer + .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, + cleanupLeasesOfCompletedShards, false, SCOPE, latestShards); + + final Set expectedShardIds = new HashSet<>( + Arrays.asList("shardId-4", "shardId-8", "shardId-9", "shardId-10")); + + final List requestLeases = leaseCaptor.getAllValues(); + final Set requestLeaseKeys = requestLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); + final Set extendedSequenceNumbers = requestLeases.stream().map(Lease::checkpoint) + .collect(Collectors.toSet()); + + assertThat(requestLeases.size(), equalTo(expectedShardIds.size())); + assertThat(requestLeaseKeys, equalTo(expectedShardIds)); + assertThat(extendedSequenceNumbers.size(), equalTo(1)); + + extendedSequenceNumbers.forEach(seq -> assertThat(seq, equalTo(ExtendedSequenceNumber.LATEST))); + + verify(shardDetector, never()).listShards(); + verify(dynamoDBLeaseRefresher, times(expectedShardIds.size())).createLeaseIfNotExists(any(Lease.class)); + verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); + } + + /** + * Test checkAndCreateLeaseForNewShards with an empty list of shards. In this scenario, shardDetector.listShards() + * should never be called. + */ + @Test + public void testCheckAndCreateLeasesForShardsWithEmptyShardList() throws Exception { + final List shards = constructShardListForGraphA(); + + final ArgumentCaptor leaseCaptor = ArgumentCaptor.forClass(Lease.class); + when(shardDetector.listShards()).thenReturn(shards); + when(dynamoDBLeaseRefresher.listLeases()).thenReturn(Collections.emptyList()); + when(dynamoDBLeaseRefresher.createLeaseIfNotExists(leaseCaptor.capture())).thenReturn(true); + + hierarchicalShardSyncer + .checkAndCreateLeaseForNewShards(shardDetector, dynamoDBLeaseRefresher, INITIAL_POSITION_LATEST, + cleanupLeasesOfCompletedShards, false, SCOPE, new ArrayList()); + + final Set expectedShardIds = new HashSet<>(); + + final List requestLeases = leaseCaptor.getAllValues(); + final Set requestLeaseKeys = requestLeases.stream().map(Lease::leaseKey).collect(Collectors.toSet()); + final Set extendedSequenceNumbers = requestLeases.stream().map(Lease::checkpoint) + .collect(Collectors.toSet()); + + assertThat(requestLeases.size(), equalTo(expectedShardIds.size())); + assertThat(extendedSequenceNumbers.size(), equalTo(0)); + + verify(shardDetector, never()).listShards(); + verify(dynamoDBLeaseRefresher, never()).createLeaseIfNotExists(any(Lease.class)); + verify(dynamoDBLeaseRefresher, never()).deleteLease(any(Lease.class)); + } + @Test public void testCheckAndCreateLeasesForNewShardsAtTrimHorizon() throws Exception { testCheckAndCreateLeaseForShardsIfMissing(constructShardListForGraphA(), INITIAL_POSITION_TRIM_HORIZON); @@ -1035,7 +1106,11 @@ public class HierarchicalShardSyncerTest { /* * Helper method to construct a shard list for graph A. Graph A is defined below. Shard structure (y-axis is - * epochs): 0 1 2 3 4 5- shards till epoch 102 \ / \ / | | 6 7 4 5- shards from epoch 103 - 205 \ / | /\ 8 4 9 10 - + * epochs): 0 1 2 3 4 5- shards till + * \ / \ / | | + * 6 7 4 5- shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - * shards from epoch 206 (open - no ending sequenceNumber) */ private List constructShardListForGraphA() { diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java index f9287701..16f5e9a4 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ConsumerStatesTest.java @@ -43,6 +43,7 @@ import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; +import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardInfo; @@ -55,6 +56,8 @@ import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.retrieval.AggregatorUtil; import software.amazon.kinesis.retrieval.RecordsPublisher; +import javax.swing.*; + @RunWith(MockitoJUnitRunner.class) public class ConsumerStatesTest { private static final String STREAM_NAME = "TestStream"; @@ -73,6 +76,8 @@ public class ConsumerStatesTest { @Mock private ShardInfo shardInfo; @Mock + private LeaseCoordinator leaseCoordinator; + @Mock private LeaseRefresher leaseRefresher; @Mock private Checkpointer checkpointer; @@ -109,7 +114,7 @@ public class ConsumerStatesTest { @Before public void setup() { - argument = new ShardConsumerArgument(shardInfo, STREAM_NAME, leaseRefresher, executorService, recordsPublisher, + argument = new ShardConsumerArgument(shardInfo, STREAM_NAME, leaseCoordinator, executorService, recordsPublisher, shardRecordProcessor, checkpointer, recordProcessorCheckpointer, parentShardPollIntervalMillis, taskBackoffTimeMillis, skipShardSyncAtWorkerInitializationIfLeasesExist, listShardsBackoffTimeInMillis, maxListShardsRetryAttempts, shouldCallProcessRecordsEvenForEmptyRecordList, idleTimeInMillis, @@ -127,6 +132,7 @@ public class ConsumerStatesTest { @Test public void blockOnParentStateTest() { ConsumerState state = ShardConsumerState.WAITING_ON_PARENT_SHARDS.consumerState(); + when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher); ConsumerTask task = state.createTask(argument, consumer, null); @@ -309,7 +315,7 @@ public class ConsumerStatesTest { assertThat(task, shutdownTask(ShardRecordProcessorCheckpointer.class, "recordProcessorCheckpointer", equalTo(recordProcessorCheckpointer))); assertThat(task, shutdownTask(ShutdownReason.class, "reason", equalTo(reason))); - assertThat(task, shutdownTask(LEASE_REFRESHER_CLASS, "leaseRefresher", equalTo(leaseRefresher))); + assertThat(task, shutdownTask(LeaseCoordinator.class, "leaseCoordinator", equalTo(leaseCoordinator))); assertThat(task, shutdownTask(InitialPositionInStreamExtended.class, "initialPositionInStream", equalTo(initialPositionInStream))); assertThat(task, diff --git a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java index 50f905f0..6af62edb 100644 --- a/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java +++ b/amazon-kinesis-client/src/test/java/software/amazon/kinesis/lifecycle/ShutdownTaskTest.java @@ -16,13 +16,18 @@ package software.amazon.kinesis.lifecycle; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.Arrays; import java.util.Collections; +import java.util.List; import org.junit.Before; import org.junit.Test; @@ -30,21 +35,27 @@ import org.junit.runner.RunWith; import org.mockito.Mock; import org.mockito.runners.MockitoJUnitRunner; +import software.amazon.awssdk.services.kinesis.model.SequenceNumberRange; +import software.amazon.awssdk.services.kinesis.model.Shard; import software.amazon.kinesis.checkpoint.ShardRecordProcessorCheckpointer; import software.amazon.kinesis.common.InitialPositionInStream; import software.amazon.kinesis.common.InitialPositionInStreamExtended; import software.amazon.kinesis.exceptions.internal.KinesisClientLibIOException; import software.amazon.kinesis.leases.HierarchicalShardSyncer; +import software.amazon.kinesis.leases.Lease; +import software.amazon.kinesis.leases.LeaseCoordinator; import software.amazon.kinesis.leases.LeaseRefresher; import software.amazon.kinesis.leases.ShardDetector; import software.amazon.kinesis.leases.ShardInfo; +import software.amazon.kinesis.leases.ShardObjectHelper; +import software.amazon.kinesis.lifecycle.events.LeaseLostInput; +import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.metrics.MetricsFactory; import software.amazon.kinesis.metrics.NullMetricsFactory; import software.amazon.kinesis.processor.Checkpointer; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.retrieval.RecordsPublisher; import software.amazon.kinesis.retrieval.kpl.ExtendedSequenceNumber; -import software.amazon.kinesis.utils.TestStreamlet; /** * @@ -54,14 +65,14 @@ public class ShutdownTaskTest { private static final long TASK_BACKOFF_TIME_MILLIS = 1L; private static final InitialPositionInStreamExtended INITIAL_POSITION_TRIM_HORIZON = InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON); - private static final ShutdownReason TERMINATE_SHUTDOWN_REASON = ShutdownReason.SHARD_END; + private static final ShutdownReason SHARD_END_SHUTDOWN_REASON = ShutdownReason.SHARD_END; + private static final ShutdownReason LEASE_LOST_SHUTDOWN_REASON = ShutdownReason.LEASE_LOST; private static final MetricsFactory NULL_METRICS_FACTORY = new NullMetricsFactory(); private final String concurrencyToken = "testToken4398"; - private final String shardId = "shardId-0000397840"; + private final String shardId = "shardId-0"; private boolean cleanupLeasesOfCompletedShards = false; private boolean ignoreUnexpectedChildShards = false; - private ShardRecordProcessor shardRecordProcessor; private ShardInfo shardInfo; private ShutdownTask task; @@ -74,9 +85,13 @@ public class ShutdownTaskTest { @Mock private LeaseRefresher leaseRefresher; @Mock + private LeaseCoordinator leaseCoordinator; + @Mock private ShardDetector shardDetector; @Mock private HierarchicalShardSyncer hierarchicalShardSyncer; + @Mock + private ShardRecordProcessor shardRecordProcessor; @Before public void setUp() throws Exception { @@ -85,20 +100,22 @@ public class ShutdownTaskTest { shardInfo = new ShardInfo(shardId, concurrencyToken, Collections.emptySet(), ExtendedSequenceNumber.LATEST); - shardRecordProcessor = new TestStreamlet(); task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, - TERMINATE_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, - ignoreUnexpectedChildShards, leaseRefresher, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, + SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, + ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, hierarchicalShardSyncer, NULL_METRICS_FACTORY); } /** * Test method for {@link ShutdownTask#call()}. + * This test is for the scenario that customer doesn't implement checkpoint in their implementation */ @Test public final void testCallWhenApplicationDoesNotCheckpoint() { + when(shardDetector.listShards()).thenReturn(constructShardListGraphA()); when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(new ExtendedSequenceNumber("3298")); + final TaskResult result = task.call(); assertNotNull(result.getException()); assertTrue(result.getException() instanceof IllegalArgumentException); @@ -106,22 +123,101 @@ public class ShutdownTaskTest { /** * Test method for {@link ShutdownTask#call()}. + * This test is for the scenario that checkAndCreateLeaseForNewShards throws an exception. */ @Test public final void testCallWhenSyncingShardsThrows() throws Exception { + List latestShards = constructShardListGraphA(); + when(shardDetector.listShards()).thenReturn(latestShards); when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END); - when(shardDetector.listShards()).thenReturn(null); + when(leaseCoordinator.leaseRefresher()).thenReturn(leaseRefresher); + doAnswer((invocation) -> { throw new KinesisClientLibIOException("KinesisClientLibIOException"); }).when(hierarchicalShardSyncer) .checkAndCreateLeaseForNewShards(shardDetector, leaseRefresher, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, ignoreUnexpectedChildShards, - NULL_METRICS_FACTORY.createMetrics()); + NULL_METRICS_FACTORY.createMetrics(), latestShards); - TaskResult result = task.call(); + final TaskResult result = task.call(); assertNotNull(result.getException()); assertTrue(result.getException() instanceof KinesisClientLibIOException); verify(recordsPublisher).shutdown(); + verify(shardRecordProcessor).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); + verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build()); + } + + /** + * Test method for {@link ShutdownTask#call()}. + * This test is for the scenario that ShutdownTask is created for ShardConsumer reaching the Shard End. + */ + @Test + public final void testCallWhenTrueShardEnd() { + shardInfo = new ShardInfo("shardId-0", concurrencyToken, Collections.emptySet(), + ExtendedSequenceNumber.LATEST); + task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, + SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, + ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, + hierarchicalShardSyncer, NULL_METRICS_FACTORY); + + when(shardDetector.listShards()).thenReturn(constructShardListGraphA()); + when(recordProcessorCheckpointer.lastCheckpointValue()).thenReturn(ExtendedSequenceNumber.SHARD_END); + + final TaskResult result = task.call(); + assertNull(result.getException()); + verify(recordsPublisher).shutdown(); + verify(shardRecordProcessor).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); + verify(shardRecordProcessor, never()).leaseLost(LeaseLostInput.builder().build()); + verify(shardDetector, times(1)).listShards(); + verify(leaseCoordinator, never()).getAssignments(); + } + + /** + * Test method for {@link ShutdownTask#call()}. + * This test is for the scenario that a ShutdownTask is created for detecting a false Shard End. + */ + @Test + public final void testCallWhenFalseShardEnd() { + shardInfo = new ShardInfo("shardId-4", concurrencyToken, Collections.emptySet(), + ExtendedSequenceNumber.LATEST); + task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, + SHARD_END_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, + ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, + hierarchicalShardSyncer, NULL_METRICS_FACTORY); + + when(shardDetector.listShards()).thenReturn(constructShardListGraphA()); + + final TaskResult result = task.call(); + assertNull(result.getException()); + verify(recordsPublisher).shutdown(); + verify(shardRecordProcessor, never()).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); + verify(shardRecordProcessor).leaseLost(LeaseLostInput.builder().build()); + verify(shardDetector, times(1)).listShards(); + verify(leaseCoordinator).getCurrentlyHeldLease(shardInfo.shardId()); + } + + /** + * Test method for {@link ShutdownTask#call()}. + * This test is for the scenario that a ShutdownTask is created for the ShardConsumer losing the lease. + */ + @Test + public final void testCallWhenLeaseLost() { + shardInfo = new ShardInfo("shardId-4", concurrencyToken, Collections.emptySet(), + ExtendedSequenceNumber.LATEST); + task = new ShutdownTask(shardInfo, shardDetector, shardRecordProcessor, recordProcessorCheckpointer, + LEASE_LOST_SHUTDOWN_REASON, INITIAL_POSITION_TRIM_HORIZON, cleanupLeasesOfCompletedShards, + ignoreUnexpectedChildShards, leaseCoordinator, TASK_BACKOFF_TIME_MILLIS, recordsPublisher, + hierarchicalShardSyncer, NULL_METRICS_FACTORY); + + when(shardDetector.listShards()).thenReturn(constructShardListGraphA()); + + final TaskResult result = task.call(); + assertNull(result.getException()); + verify(recordsPublisher).shutdown(); + verify(shardRecordProcessor, never()).shardEnded(ShardEndedInput.builder().checkpointer(recordProcessorCheckpointer).build()); + verify(shardRecordProcessor).leaseLost(LeaseLostInput.builder().build()); + verify(shardDetector, never()).listShards(); + verify(leaseCoordinator, never()).getAssignments(); } /** @@ -132,4 +228,45 @@ public class ShutdownTaskTest { assertEquals(TaskType.SHUTDOWN, task.taskType()); } + + /* + * Helper method to construct a shard list for graph A. Graph A is defined below. Shard structure (y-axis is + * epochs): 0 1 2 3 4 5 - shards till + * \ / \ / | | + * 6 7 4 5 - shards from epoch 103 - 205 + * \ / | /\ + * 8 4 9 10 - shards from epoch 206 (open - no ending sequenceNumber) + */ + private List constructShardListGraphA() { + final SequenceNumberRange range0 = ShardObjectHelper.newSequenceNumberRange("11", "102"); + final SequenceNumberRange range1 = ShardObjectHelper.newSequenceNumberRange("11", null); + final SequenceNumberRange range2 = ShardObjectHelper.newSequenceNumberRange("11", "205"); + final SequenceNumberRange range3 = ShardObjectHelper.newSequenceNumberRange("103", "205"); + final SequenceNumberRange range4 = ShardObjectHelper.newSequenceNumberRange("206", null); + + return Arrays.asList( + ShardObjectHelper.newShard("shardId-0", null, null, range0, + ShardObjectHelper.newHashKeyRange("0", "99")), + ShardObjectHelper.newShard("shardId-1", null, null, range0, + ShardObjectHelper.newHashKeyRange("100", "199")), + ShardObjectHelper.newShard("shardId-2", null, null, range0, + ShardObjectHelper.newHashKeyRange("200", "299")), + ShardObjectHelper.newShard("shardId-3", null, null, range0, + ShardObjectHelper.newHashKeyRange("300", "399")), + ShardObjectHelper.newShard("shardId-4", null, null, range1, + ShardObjectHelper.newHashKeyRange("400", "499")), + ShardObjectHelper.newShard("shardId-5", null, null, range2, + ShardObjectHelper.newHashKeyRange("500", ShardObjectHelper.MAX_HASH_KEY)), + ShardObjectHelper.newShard("shardId-6", "shardId-0", "shardId-1", range3, + ShardObjectHelper.newHashKeyRange("0", "199")), + ShardObjectHelper.newShard("shardId-7", "shardId-2", "shardId-3", range3, + ShardObjectHelper.newHashKeyRange("200", "399")), + ShardObjectHelper.newShard("shardId-8", "shardId-6", "shardId-7", range4, + ShardObjectHelper.newHashKeyRange("0", "399")), + ShardObjectHelper.newShard("shardId-9", "shardId-5", null, range4, + ShardObjectHelper.newHashKeyRange("500", "799")), + ShardObjectHelper.newShard("shardId-10", null, "shardId-5", range4, + ShardObjectHelper.newHashKeyRange("800", ShardObjectHelper.MAX_HASH_KEY))); + } + }